Whamcloud - gitweb
b=17631
authoryury <yury>
Mon, 24 Nov 2008 13:34:39 +0000 (13:34 +0000)
committeryury <yury>
Mon, 24 Nov 2008 13:34:39 +0000 (13:34 +0000)
r=shadow,panda

- fixes long bulk unlink done synchronously in ptlrpcd thread which could cause asserion in umount time;

lustre/ptlrpc/client.c

index b96cfc1..bf3d4c5 100644 (file)
@@ -100,8 +100,8 @@ static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal
         return desc;
 }
 
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
-                                               int npages, int type, int portal)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
+                                              int npages, int type, int portal)
 {
         struct obd_import *imp = req->rq_import;
         struct ptlrpc_bulk_desc *desc;
@@ -125,8 +125,8 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
         return desc;
 }
 
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
-                                               int npages, int type, int portal)
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
+                                              int npages, int type, int portal)
 {
         struct obd_export *exp = req->rq_export;
         struct ptlrpc_bulk_desc *desc;
@@ -1096,13 +1096,22 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                          * call interpret before that. We need to make
                          * sure that all rdma transfers finished and will
                          * not corrupt any data. */
-                        if (ptlrpc_client_recv_or_unlink(req))
+                        if (ptlrpc_client_recv_or_unlink(req) ||
+                            ptlrpc_client_bulk_active(req))
                                 continue;
                         
-                        /* Turn fail_loc off to prevent it from looping
+                        /* Turn repl fail_loc off to prevent it from looping
                          * forever. */
-                        OBD_FAIL_CHECK_QUIET(OBD_FAIL_PTLRPC_LONG_UNLINK | 
-                                             OBD_FAIL_ONCE);
+                        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
+                                OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK |
+                                               OBD_FAIL_ONCE);
+                        }
+
+                        /* Turn off bulk fail_loc. */
+                        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
+                                OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK |
+                                               OBD_FAIL_ONCE);
+                        }
 
                         /* Move to next phase if reply was successfully 
                          * unlinked. */
@@ -1120,7 +1129,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                         ptlrpc_expire_one_request(req, 1);
                         
                         /* Check if we still need to wait for unlink. */
-                        if (ptlrpc_client_recv_or_unlink(req))
+                        if (ptlrpc_client_recv_or_unlink(req) ||
+                            ptlrpc_client_bulk_active(req))
                                 continue;
                 }
 
@@ -1186,11 +1196,13 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                                         lustre_msg_add_flags(req->rq_reqmsg,
                                                              MSG_RESENT);
                                         if (req->rq_bulk) {
-                                                __u64 old_xid = req->rq_xid;
+                                                __u64 old_xid;
 
-                                                ptlrpc_unregister_bulk(req);
+                                                if (!ptlrpc_unregister_bulk(req, 1))
+                                                        continue;
 
                                                 /* ensure previous bulk fails */
+                                                old_xid = req->rq_xid;
                                                 req->rq_xid = ptlrpc_next_xid();
                                                 CDEBUG(D_HA, "resend bulk "
                                                        "old x"LPU64
@@ -1259,7 +1271,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 }
 
                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
-                if (ptlrpc_bulk_active(req->rq_bulk))
+                if (ptlrpc_client_bulk_active(req))
                         continue;
 
                 if (!req->rq_bulk->bd_success) {
@@ -1283,8 +1295,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 if (!ptlrpc_unregister_reply(req, 1))
                         continue;
 
-                if (req->rq_bulk != NULL)
-                        ptlrpc_unregister_bulk(req);
+                if (!ptlrpc_unregister_bulk(req, 1))
+                        continue;
 
                 /* When calling interpret receiving already should be
                  * finished. */
@@ -1352,13 +1364,11 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
         spin_unlock(&req->rq_lock);
 
         ptlrpc_unregister_reply(req, async_unlink);
+        ptlrpc_unregister_bulk(req, async_unlink);
 
         if (obd_dump_on_timeout)
                 libcfs_debug_dumplog();
 
-        if (req->rq_bulk != NULL)
-                ptlrpc_unregister_bulk (req);
-
         if (imp == NULL) {
                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
                 RETURN(1);
@@ -1698,7 +1708,7 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         LASSERT(!in_interrupt());
 
         /* Let's setup deadline for reply unlink. */
-        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_UNLINK) && 
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && 
             async && request->rq_reply_deadline == 0)
                 request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
 
@@ -1730,7 +1740,8 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         for (;;) {
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
-                lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
+                lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+                                           cfs_time_seconds(1), NULL, NULL);
                 rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
                                   &lwi);
                 if (rc == 0) {
@@ -1996,7 +2007,7 @@ restart:
                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
 
                 if (req->rq_bulk != NULL) {
-                        ptlrpc_unregister_bulk (req);
+                        ptlrpc_unregister_bulk(req, 0);
 
                         /* bulk requests are supposed to be
                          * idempotent, so we are free to bump the xid
@@ -2108,7 +2119,7 @@ restart:
                          * me. */
                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
                         brc = l_wait_event(req->rq_reply_waitq,
-                                           !ptlrpc_bulk_active(req->rq_bulk),
+                                           !ptlrpc_client_bulk_active(req),
                                            &lwi);
                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
                         if (brc != 0) {
@@ -2121,7 +2132,7 @@ restart:
                         }
                 }
                 if (rc < 0)
-                        ptlrpc_unregister_bulk (req);
+                        ptlrpc_unregister_bulk(req, 0);
         }
 
         LASSERT(!req->rq_receiving_reply);