Whamcloud - gitweb
b=17310
[fs/lustre-release.git] / lustre / ptlrpc / import.c
index b575270..18fb0cf 100644 (file)
@@ -207,6 +207,47 @@ void ptlrpc_deactivate_import(struct obd_import *imp)
         ptlrpc_deactivate_and_unlock_import(imp);
 }
 
+static unsigned int 
+ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
+{
+        long dl;
+
+        if (!(((req->rq_phase & (RQ_PHASE_RPC | RQ_PHASE_UNREGISTERING)) && 
+              !req->rq_waiting) ||
+              (req->rq_phase == RQ_PHASE_BULK) ||
+              (req->rq_phase == RQ_PHASE_NEW)))
+                return 0;
+
+        if (req->rq_timedout)
+                return 0;
+
+        if (req->rq_phase == RQ_PHASE_NEW)
+                dl = req->rq_sent;
+        else
+                dl = req->rq_deadline;
+
+        if (dl <= now)
+                return 0;
+
+        return dl - now;
+}
+
+static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
+{
+        time_t now = cfs_time_current_sec();
+        struct list_head *tmp, *n;
+        struct ptlrpc_request *req;
+        unsigned int timeout = 0;
+
+        spin_lock(&imp->imp_lock);
+        list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
+        }
+        spin_unlock(&imp->imp_lock);
+        return timeout;
+}
+
 /*
  * This function will invalidate the import, if necessary, then block
  * for all the RPC completions, and finally notify the obd to
@@ -218,6 +259,7 @@ void ptlrpc_invalidate_import(struct obd_import *imp)
         struct list_head *tmp, *n;
         struct ptlrpc_request *req;
         struct l_wait_info lwi;
+        unsigned int timeout;
         int rc;
 
         atomic_inc(&imp->imp_inval_count);
@@ -234,32 +276,78 @@ void ptlrpc_invalidate_import(struct obd_import *imp)
 
         LASSERT(imp->imp_invalid);
 
-        /* wait for all requests to error out and call completion callbacks.
-           Cap it at obd_timeout -- these should all have been locally
-           cancelled by ptlrpc_abort_inflight. */
-        lwi = LWI_TIMEOUT_INTERVAL(
-                cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
-                cfs_time_seconds(1), NULL, NULL);
-        rc = l_wait_event(imp->imp_recovery_waitq,
-                          (atomic_read(&imp->imp_inflight) == 0), &lwi);
+        /* Wait forever until inflight == 0. We really can't do it another
+         * way because in some cases we need to wait for very long reply 
+         * unlink. We can't do anything before that because there is really
+         * no guarantee that some rdma transfer is not in progress right now. */
+        do {
+                /* Calculate max timeout for waiting on rpcs to error 
+                 * out. Use obd_timeout if calculated value is smaller
+                 * than it. */
+                timeout = ptlrpc_inflight_timeout(imp);
+                timeout += timeout / 3;
+             
+                if (timeout == 0)
+                        timeout = obd_timeout;
+             
+                CDEBUG(D_RPCTRACE, "Sleeping %d sec for inflight to error out\n",
+                       timeout);
+
+                /* Wait for all requests to error out and call completion
+                 * callbacks. Cap it at obd_timeout -- these should all
+                 * have been locally cancelled by ptlrpc_abort_inflight. */
+                lwi = LWI_TIMEOUT_INTERVAL(
+                        cfs_timeout_cap(cfs_time_seconds(timeout)),
+                        cfs_time_seconds(1), NULL, NULL);
+                rc = l_wait_event(imp->imp_recovery_waitq,
+                                (atomic_read(&imp->imp_inflight) == 0), &lwi);
+                if (rc) {
+                        const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
 
-        if (rc) {
-                CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
-                         obd2cli_tgt(imp->imp_obd), rc,
-                         atomic_read(&imp->imp_inflight));
-                spin_lock(&imp->imp_lock);
-                list_for_each_safe(tmp, n, &imp->imp_sending_list) {
-                        req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                        DEBUG_REQ(D_ERROR, req, "still on sending list");
-                }
-                list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
-                        req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                        DEBUG_REQ(D_ERROR, req, "still on delayed list");
-                }
-                spin_unlock(&imp->imp_lock);
-                LASSERT(atomic_read(&imp->imp_inflight) == 0);
-        }
+                        CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
+                               cli_tgt, rc, atomic_read(&imp->imp_inflight));
 
+                        spin_lock(&imp->imp_lock);
+                        list_for_each_safe(tmp, n, &imp->imp_sending_list) {
+                                req = list_entry(tmp, struct ptlrpc_request, 
+                                                 rq_list);
+                                DEBUG_REQ(D_ERROR, req, "still on sending list");
+                        }
+                        list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
+                                req = list_entry(tmp, struct ptlrpc_request, 
+                                                 rq_list);
+                                DEBUG_REQ(D_ERROR, req, "still on delayed list");
+                        }
+                     
+                        if (atomic_read(&imp->imp_unregistering) == 0) {
+                                /* We know that only "unregistering" rpcs may
+                                 * still survive in sending or delaying lists
+                                 * (They are waiting for long reply unlink in
+                                 * sluggish nets). Let's check this. If there
+                                 * is no unregistering and inflight != 0 this
+                                 * is bug. */
+                                LASSERT(atomic_read(&imp->imp_inflight) == 0);
+                             
+                                /* Let's save one loop as soon as inflight have
+                                 * dropped to zero. No new inflights possible at
+                                 * this point. */
+                                rc = 0;
+                        } else {
+                                CERROR("%s: RPCs in \"%s\" phase found (%d). "
+                                       "Network is sluggish? Waiting them "
+                                       "to error out.\n", cli_tgt,
+                                       ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
+                                       atomic_read(&imp->imp_unregistering));
+                        }
+                        spin_unlock(&imp->imp_lock);
+                  }
+        } while (rc != 0);
+
+        /* 
+         * Let's additionally check that no new rpcs added to import in
+         * "invalidate" state. 
+         */
+        LASSERT(atomic_read(&imp->imp_inflight) == 0);
 out:
         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
         sptlrpc_import_flush_all_ctx(imp);
@@ -574,6 +662,7 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
 #endif
         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
 
+        request->rq_no_resend = request->rq_no_delay = 1;
         request->rq_send_state = LUSTRE_IMP_CONNECTING;
         /* Allow a slightly larger reply for future growth compatibility */
         req_capsule_set_size(&request->rq_pill, &RMF_CONNECT_DATA, RCL_SERVER,