Whamcloud - gitweb
LU-11128 ptlrpc: new request vs disconnect race
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index ad994fb..a167146 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -34,6 +34,7 @@
 
 #define DEBUG_SUBSYSTEM S_RPC
 
+#include <linux/delay.h>
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_lib.h>
@@ -763,11 +764,11 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                                *fail2_t = ktime_get_real_seconds() +
                                           LONG_UNLINK;
 
-                       /* The RPC is infected, let the test to change the
-                        * fail_loc */
-                       set_current_state(TASK_UNINTERRUPTIBLE);
-                       schedule_timeout(cfs_time_seconds(2));
-                       set_current_state(TASK_RUNNING);
+                       /*
+                        * The RPC is infected, let the test to change the
+                        * fail_loc
+                        */
+                       msleep(4 * MSEC_PER_SEC);
                }
        }
 
@@ -864,10 +865,37 @@ ptlrpc_request_alloc_internal(struct obd_import *imp,
                               const struct req_format *format)
 {
         struct ptlrpc_request *request;
+       int connect = 0;
 
-        request = __ptlrpc_request_alloc(imp, pool);
-        if (request == NULL)
-                return NULL;
+       request = __ptlrpc_request_alloc(imp, pool);
+       if (request == NULL)
+               return NULL;
+
+       /* initiate connection if needed when the import has been
+        * referenced by the new request to avoid races with disconnect */
+       if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) {
+               int rc;
+               CDEBUG_LIMIT(imp->imp_idle_debug,
+                            "%s: reconnect after %llds idle\n",
+                            imp->imp_obd->obd_name, ktime_get_real_seconds() -
+                                                    imp->imp_last_reply_time);
+               spin_lock(&imp->imp_lock);
+               if (imp->imp_state == LUSTRE_IMP_IDLE) {
+                       imp->imp_generation++;
+                       imp->imp_initiated_at = imp->imp_generation;
+                       imp->imp_state =  LUSTRE_IMP_NEW;
+                       connect = 1;
+               }
+               spin_unlock(&imp->imp_lock);
+               if (connect) {
+                       rc = ptlrpc_connect_import(imp);
+                       if (rc < 0) {
+                               ptlrpc_request_free(request);
+                               return NULL;
+                       }
+                       ptlrpc_pinger_add_import(imp);
+               }
+       }
 
         req_capsule_init(&request->rq_pill, request, RCL_CLIENT);
         req_capsule_set(&request->rq_pill, format);
@@ -956,7 +984,6 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
        atomic_set(&set->set_remaining, 0);
        spin_lock_init(&set->set_new_req_lock);
        INIT_LIST_HEAD(&set->set_new_requests);
-       INIT_LIST_HEAD(&set->set_cblist);
        set->set_max_inflight = UINT_MAX;
        set->set_producer     = NULL;
        set->set_producer_arg = NULL;
@@ -1052,33 +1079,13 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 EXPORT_SYMBOL(ptlrpc_set_destroy);
 
 /**
- * Add a callback function \a fn to the set.
- * This function would be called when all requests on this set are completed.
- * The function will be passed \a data argument.
- */
-int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
-                      set_interpreter_func fn, void *data)
-{
-       struct ptlrpc_set_cbdata *cbdata;
-
-       OBD_ALLOC_PTR(cbdata);
-       if (cbdata == NULL)
-               RETURN(-ENOMEM);
-
-       cbdata->psc_interpret = fn;
-       cbdata->psc_data = data;
-       list_add_tail(&cbdata->psc_item, &set->set_cblist);
-
-       RETURN(0);
-}
-
-/**
  * Add a new request to the general purpose request set.
  * Assumes request reference from the caller.
  */
 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                         struct ptlrpc_request *req)
 {
+       LASSERT(req->rq_import->imp_state != LUSTRE_IMP_IDLE);
        LASSERT(list_empty(&req->rq_set_chain));
 
        if (req->rq_allow_intr)
@@ -1088,7 +1095,7 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
        list_add_tail(&req->rq_set_chain, &set->set_requests);
        req->rq_set = set;
        atomic_inc(&set->set_remaining);
-       req->rq_queued_time = cfs_time_current();
+       req->rq_queued_time = ktime_get_seconds();
 
        if (req->rq_reqmsg != NULL)
                lustre_msg_set_jobid(req->rq_reqmsg, NULL);
@@ -1119,7 +1126,7 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
         * The set takes over the caller's request reference.
         */
        req->rq_set = set;
-       req->rq_queued_time = cfs_time_current();
+       req->rq_queued_time = ktime_get_seconds();
        list_add_tail(&req->rq_set_chain, &set->set_new_requests);
        count = atomic_inc_return(&set->set_new_count);
        spin_unlock(&set->set_new_req_lock);
@@ -1155,17 +1162,19 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
         LASSERT (status != NULL);
         *status = 0;
 
-        if (req->rq_ctx_init || req->rq_ctx_fini) {
-                /* always allow ctx init/fini rpc go through */
-        } else if (imp->imp_state == LUSTRE_IMP_NEW) {
-                DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
-                *status = -EIO;
+       if (req->rq_ctx_init || req->rq_ctx_fini) {
+               /* always allow ctx init/fini rpc go through */
+       } else if (imp->imp_state == LUSTRE_IMP_NEW) {
+               DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
+               *status = -EIO;
        } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
-               /* pings may safely race with umount */
-               DEBUG_REQ(lustre_msg_get_opc(req->rq_reqmsg) == OBD_PING ?
+               unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+               /* pings or MDS-equivalent STATFS may safely race with umount */
+               DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ?
                          D_HA : D_ERROR, req, "IMP_CLOSED ");
                *status = -EIO;
-        } else if (ptlrpc_send_limit_expired(req)) {
+       } else if (ptlrpc_send_limit_expired(req)) {
                /* probably doesn't need to be a D_ERROR after initial testing*/
                DEBUG_REQ(D_HA, req, "send limit expired ");
                *status = -ETIMEDOUT;
@@ -1188,7 +1197,9 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
                if (atomic_read(&imp->imp_inval_count) != 0) {
                         DEBUG_REQ(D_ERROR, req, "invalidate in flight");
                         *status = -EIO;
-               } else if (req->rq_no_delay) {
+               } else if (req->rq_no_delay &&
+                          imp->imp_generation != imp->imp_initiated_at) {
+                       /* ignore nodelay for requests initiating connections */
                         *status = -EWOULDBLOCK;
                } else if (req->rq_allow_replay &&
                          (imp->imp_state == LUSTRE_IMP_REPLAY ||
@@ -1213,16 +1224,12 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
  * \retval false if no message should be printed
  * \retval true  if console message should be printed
  */
-static bool ptlrpc_console_allow(struct ptlrpc_request *req)
+static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err)
 {
-       __u32 opc;
-
        LASSERT(req->rq_reqmsg != NULL);
-       opc = lustre_msg_get_opc(req->rq_reqmsg);
 
        /* Suppress particular reconnect errors which are to be expected. */
        if (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT) {
-               int err;
 
                /* Suppress timed out reconnect requests */
                if (lustre_handle_is_used(&req->rq_import->imp_remote_handle) ||
@@ -1232,12 +1239,20 @@ static bool ptlrpc_console_allow(struct ptlrpc_request *req)
                /* Suppress most unavailable/again reconnect requests, but
                 * print occasionally so it is clear client is trying to
                 * connect to a server where no target is running. */
-               err = lustre_msg_get_status(req->rq_repmsg);
                if ((err == -ENODEV || err == -EAGAIN) &&
                    req->rq_import->imp_conn_cnt % 30 != 20)
                        return false;
        }
 
+       if (opc == LDLM_ENQUEUE && err == -EAGAIN)
+               /* -EAGAIN is normal when using POSIX flocks */
+               return false;
+
+       if (opc == OBD_PING && (err == -ENODEV || err == -ENOTCONN) &&
+           (req->rq_xid & 0xf) != 10)
+               /* Suppress most ping requests, they may fail occasionally */
+               return false;
+
        return true;
 }
 
@@ -1256,9 +1271,7 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
                lnet_nid_t nid = imp->imp_connection->c_peer.nid;
                __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
 
-               /* -EAGAIN is normal when using POSIX flocks */
-               if (ptlrpc_console_allow(req) &&
-                   !(opc == LDLM_ENQUEUE && err == -EAGAIN))
+               if (ptlrpc_console_allow(req, opc, err))
                        LCONSOLE_ERROR_MSG(0x11, "%s: operation %s to node %s "
                                           "failed: rc = %d\n",
                                           imp->imp_obd->obd_name,
@@ -1361,7 +1374,7 @@ static int after_reply(struct ptlrpc_request *req)
         }
 
        work_start = ktime_get_real();
-       timediff = ktime_us_delta(req->rq_sent_ns, work_start);
+       timediff = ktime_us_delta(work_start, req->rq_sent_ns);
 
         /*
          * NB Until this point, the whole of the incoming message,
@@ -1557,8 +1570,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
                req->rq_waiting = 1;
                spin_unlock(&req->rq_lock);
 
-               DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
-                         "(%s != %s)", lustre_msg_get_status(req->rq_reqmsg),
+               DEBUG_REQ(D_HA, req, "req waiting for recovery: (%s != %s)",
                          ptlrpc_import_state_name(req->rq_send_state),
                          ptlrpc_import_state_name(imp->imp_state));
                LASSERT(list_empty(&req->rq_list));
@@ -1616,8 +1628,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
               " %s:%s:%d:%llu:%s:%d\n", current_comm(),
               imp->imp_obd->obd_uuid.uuid,
               lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
-              libcfs_nid2str(imp->imp_connection->c_peer.nid),
-              lustre_msg_get_opc(req->rq_reqmsg));
+              obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg));
 
         rc = ptl_send_rpc(req, 0);
        if (rc == -ENOMEM) {
@@ -1871,8 +1882,11 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                                        spin_unlock(&imp->imp_lock);
                                        GOTO(interpret, req->rq_status);
                                }
+                               /* ignore on just initiated connections */
                                if (ptlrpc_no_resend(req) &&
-                                   !req->rq_wait_ctx) {
+                                   !req->rq_wait_ctx &&
+                                   imp->imp_generation !=
+                                   imp->imp_initiated_at) {
                                        req->rq_status = -ENOTCONN;
                                        ptlrpc_rqphase_move(req,
                                                            RQ_PHASE_INTERPRET);
@@ -2041,7 +2055,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                               imp->imp_obd->obd_uuid.uuid,
                               lustre_msg_get_status(req->rq_reqmsg),
                               req->rq_xid,
-                              libcfs_nid2str(imp->imp_connection->c_peer.nid),
+                              obd_import_nid2str(imp),
                               lustre_msg_get_opc(req->rq_reqmsg));
 
                spin_lock(&imp->imp_lock);
@@ -2098,6 +2112,7 @@ EXPORT_SYMBOL(ptlrpc_check_set);
 int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
 {
        struct obd_import *imp = req->rq_import;
+       unsigned int debug_mask = D_RPCTRACE;
        int rc = 0;
        ENTRY;
 
@@ -2105,12 +2120,15 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
        req->rq_timedout = 1;
        spin_unlock(&req->rq_lock);
 
-       DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent %lld/real %lld]",
-                  req->rq_net_err ? "failed due to network error" :
-                     ((req->rq_real_sent == 0 ||
+       if (ptlrpc_console_allow(req, lustre_msg_get_opc(req->rq_reqmsg),
+                                 lustre_msg_get_status(req->rq_reqmsg)))
+               debug_mask = D_WARNING;
+       DEBUG_REQ(debug_mask, req, "Request sent has %s: [sent %lld/real %lld]",
+                 req->rq_net_err ? "failed due to network error" :
+                    ((req->rq_real_sent == 0 ||
                       req->rq_real_sent < req->rq_sent ||
                       req->rq_real_sent >= req->rq_deadline) ?
-                      "timed out for sent delay" : "timed out for slow reply"),
+                     "timed out for sent delay" : "timed out for slow reply"),
                  (s64)req->rq_sent, (s64)req->rq_real_sent);
 
        if (imp != NULL && obd_debug_peer_on_timeout)
@@ -2251,7 +2269,7 @@ static void ptlrpc_interrupted_set(void *data)
 /**
  * Get the smallest timeout in the set; this does NOT set a timeout.
  */
-int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
+time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
 {
        struct list_head *tmp;
        time64_t now = ktime_get_real_seconds();
@@ -2309,8 +2327,9 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
        struct list_head            *tmp;
         struct ptlrpc_request *req;
         struct l_wait_info     lwi;
-        int                    rc, timeout;
-        ENTRY;
+       time64_t timeout;
+       int rc;
+       ENTRY;
 
        if (set->set_producer)
                (void)ptlrpc_set_producer(set);
@@ -2330,8 +2349,8 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
 
                 /* wait until all complete, interrupted, or an in-flight
                  * req times out */
-                CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
-                       set, timeout);
+               CDEBUG(D_RPCTRACE, "set %p going to sleep for %lld seconds\n",
+                       set, timeout);
 
                if ((timeout == 0 && !signal_pending(current)) ||
                    set->set_allow_intr)
@@ -2403,25 +2422,7 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                         rc = req->rq_status;
         }
 
-        if (set->set_interpret != NULL) {
-                int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
-                        set->set_interpret;
-                rc = interpreter (set, set->set_arg, rc);
-        } else {
-                struct ptlrpc_set_cbdata *cbdata, *n;
-                int err;
-
-               list_for_each_entry_safe(cbdata, n,
-                                         &set->set_cblist, psc_item) {
-                       list_del_init(&cbdata->psc_item);
-                        err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
-                        if (err && !rc)
-                                rc = err;
-                        OBD_FREE_PTR(cbdata);
-                }
-        }
-
-        RETURN(rc);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(ptlrpc_set_wait);
 
@@ -2677,8 +2678,11 @@ void ptlrpc_request_committed(struct ptlrpc_request *req, int force)
                return;
        }
 
-       if (force || req->rq_transno <= imp->imp_peer_committed_transno)
+       if (force || req->rq_transno <= imp->imp_peer_committed_transno) {
+               if (imp->imp_replay_cursor == &req->rq_replay_list)
+                       imp->imp_replay_cursor = req->rq_replay_list.next;
                ptlrpc_free_request(req);
+       }
 
        spin_unlock(&imp->imp_lock);
 }
@@ -2790,7 +2794,7 @@ void ptlrpc_cleanup_client(struct obd_import *imp)
  */
 void ptlrpc_resend_req(struct ptlrpc_request *req)
 {
-        DEBUG_REQ(D_HA, req, "going to resend");
+       DEBUG_REQ(D_HA, req, "going to resend");
        spin_lock(&req->rq_lock);
 
        /* Request got reply but linked to the import list still.
@@ -2801,14 +2805,13 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
                return;
        }
 
-        lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 });
-        req->rq_status = -EAGAIN;
+       req->rq_status = -EAGAIN;
 
-        req->rq_resend = 1;
-        req->rq_net_err = 0;
-        req->rq_timedout = 0;
+       req->rq_resend = 1;
+       req->rq_net_err = 0;
+       req->rq_timedout = 0;
 
-        ptlrpc_client_wake_req(req);
+       ptlrpc_client_wake_req(req);
        spin_unlock(&req->rq_lock);
 }
 
@@ -2964,7 +2967,6 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
                DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
                spin_lock(&imp->imp_lock);
                imp->imp_vbr_failed = 1;
-               imp->imp_no_lock_replay = 1;
                spin_unlock(&imp->imp_lock);
                lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
         } else {
@@ -2978,9 +2980,6 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
         }
 
        spin_lock(&imp->imp_lock);
-       /** if replays by version then gap occur on server, no trust to locks */
-       if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY)
-               imp->imp_no_lock_replay = 1;
        imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
        spin_unlock(&imp->imp_lock);
         LASSERT(imp->imp_last_replay_transno);
@@ -3079,14 +3078,15 @@ static int ptlrpc_replay_interpret(const struct lu_env *env,
  */
 int ptlrpc_replay_req(struct ptlrpc_request *req)
 {
-        struct ptlrpc_replay_async_args *aa;
-        ENTRY;
+       struct ptlrpc_replay_async_args *aa;
+
+       ENTRY;
 
-        LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
+       LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
 
-        LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
-        aa = ptlrpc_req_async_args(req);
-        memset(aa, 0, sizeof *aa);
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       memset(aa, 0, sizeof(*aa));
 
         /* Prepare request to be resent with ptlrpcd */
         aa->praa_old_state = req->rq_send_state;
@@ -3440,7 +3440,7 @@ void *ptlrpcd_alloc_work(struct obd_import *imp,
        req->rq_no_delay = req->rq_no_resend = 1;
        req->rq_pill.rc_fmt = (void *)&worker_format;
 
-       CLASSERT (sizeof(*args) <= sizeof(req->rq_async_args));
+       CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
        args = ptlrpc_req_async_args(req);
        args->cb     = cb;
        args->cbdata = cbdata;