Whamcloud - gitweb
b=17310
authoryury <yury>
Sat, 8 Nov 2008 13:31:43 +0000 (13:31 +0000)
committeryury <yury>
Sat, 8 Nov 2008 13:31:43 +0000 (13:31 +0000)
r=shadow,johann

- make sure that no new inflight rpcs may come after ptlrpcd_deactivate_import() for both
synchronous and asynchronous sending. To do so we make sure that imp_inflight++ is done only when
permission is granted by ptlrpc_import_delay_req() which makes decision should req be sent,
deferred or killed as import is not in the state to send it in observable future. For async
sending, rpc is only counted inflight when its added to sending or delaying list instead of just
adding it to set for processing.

This fixes assert in ptlrpc_invalidate_import() and as number of other issues;

- synchronize imp_inflight and the presence on sending or delaying list for ptlrpc_queue_wait()
case. So that, now it is guaranteed that if im_inflight != 0 we may always find hanging rpc either
in sending or in delaying list;

- make sure that in ptlrcp_queue_wait() we remove rpc from sending or delaying list and dec
inflight only after ptlrpc_unregister_reply() is done. This way we make sure that accounting is
correct. Rpc can't be returned to the pool or counted finished until lnet lets us go with finished
reply unlink;

- check for inflight and rq_list in pinger;

- comments, cleanups;

lustre/ptlrpc/client.c
lustre/ptlrpc/pinger.c

index b694ee3..8e605b3 100644 (file)
@@ -777,8 +777,6 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
         list_add_tail(&req->rq_set_chain, &set->set_requests);
         req->rq_set = set;
         set->set_remaining++;
-
-        atomic_inc(&req->rq_import->imp_inflight);
 }
 
 /**
@@ -1081,13 +1079,12 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
                 spin_unlock (&req->rq_lock);
 
                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
-                          "(%s != %s)",
-                          lustre_msg_get_status(req->rq_reqmsg) ,
+                          "(%s != %s)", lustre_msg_get_status(req->rq_reqmsg),
                           ptlrpc_import_state_name(req->rq_send_state),
                           ptlrpc_import_state_name(imp->imp_state));
-                LASSERT(list_empty (&req->rq_list));
-
+                LASSERT(list_empty(&req->rq_list));
                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
+                atomic_inc(&req->rq_import->imp_inflight);
                 spin_unlock(&imp->imp_lock);
                 RETURN(0);
         }
@@ -1099,9 +1096,9 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
                 RETURN(rc);
         }
 
-        /* XXX this is the same as ptlrpc_queue_wait */
         LASSERT(list_empty(&req->rq_list));
         list_add_tail(&req->rq_list, &imp->imp_sending_list);
+        atomic_inc(&req->rq_import->imp_inflight);
         spin_unlock(&imp->imp_lock);
 
         lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
@@ -1415,9 +1412,14 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                        lustre_msg_get_opc(req->rq_reqmsg));
 
                 spin_lock(&imp->imp_lock);
-                if (!list_empty(&req->rq_list))
+                /* Request already may be not on sending or delaying list. This
+                 * may happen in the case of marking it errorneous for the case
+                 * ptlrpc_import_delay_req(req, status) find it impossible to 
+                 * allow sending this rpc and returns *status != 0. */
+                if (!list_empty(&req->rq_list)) {
                         list_del_init(&req->rq_list);
-                atomic_dec(&imp->imp_inflight);
+                        atomic_dec(&imp->imp_inflight);
+                }
                 spin_unlock(&imp->imp_lock);
 
                 set->set_remaining--;
@@ -2083,7 +2085,6 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 
         LASSERT(req->rq_set == NULL);
         LASSERT(!req->rq_receiving_reply);
-        atomic_inc(&imp->imp_inflight);
 
         /* for distributed debugging */
         lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
@@ -2103,8 +2104,8 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
 restart:
         if (ptlrpc_import_delay_req(imp, req, &rc)) {
                 list_del(&req->rq_list);
-
                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
+                atomic_inc(&imp->imp_inflight);
                 spin_unlock(&imp->imp_lock);
 
                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
@@ -2124,6 +2125,7 @@ restart:
 
                 spin_lock(&imp->imp_lock);
                 list_del_init(&req->rq_list);
+                atomic_dec(&imp->imp_inflight);
 
                 if (req->rq_err) {
                         /* rq_status was set locally */
@@ -2142,7 +2144,6 @@ restart:
         }
 
         if (rc != 0) {
-                list_del_init(&req->rq_list);
                 spin_unlock(&imp->imp_lock);
                 req->rq_status = rc; // XXX this ok?
                 GOTO(out, rc);
@@ -2170,6 +2171,7 @@ restart:
         /* XXX this is the same as ptlrpc_set_wait */
         LASSERT(list_empty(&req->rq_list));
         list_add_tail(&req->rq_list, &imp->imp_sending_list);
+        atomic_inc(&imp->imp_inflight);
         spin_unlock(&imp->imp_lock);
 
         rc = sptlrpc_req_refresh_ctx(req, 0);
@@ -2213,15 +2215,16 @@ after_send:
                libcfs_nid2str(imp->imp_connection->c_peer.nid),
                lustre_msg_get_opc(req->rq_reqmsg));
 
-        spin_lock(&imp->imp_lock);
-        list_del_init(&req->rq_list);
-        spin_unlock(&imp->imp_lock);
-
         /* If the reply was received normally, this just grabs the spinlock
          * (ensuring the reply callback has returned), sees that
          * req->rq_receiving_reply is clear and returns. */
         ptlrpc_unregister_reply(req, 0);
 
+        spin_lock(&imp->imp_lock);
+        list_del_init(&req->rq_list);
+        atomic_dec(&imp->imp_inflight);
+        spin_unlock(&imp->imp_lock);
+
         if (req->rq_err) {
                 DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
                           rc, req->rq_status);
@@ -2291,8 +2294,6 @@ after_send:
 
         LASSERT(!req->rq_receiving_reply);
         ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
-
-        atomic_dec(&imp->imp_inflight);
         cfs_waitq_signal(&imp->imp_recovery_waitq);
         RETURN(rc);
 }
index da417a5..a9e4aba 100644 (file)
@@ -550,6 +550,7 @@ static int pinger_check_rpcs(void *arg)
         struct ptlrpc_request *req;
         struct ptlrpc_request_set *set;
         struct list_head *iter;
+        struct obd_import *imp;
         struct pinger_data *pd = &pinger_args;
         int rc;
 
@@ -661,17 +662,23 @@ do_check_set:
                 if (req->rq_phase == RQ_PHASE_COMPLETE)
                         continue;
 
-                ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
-                atomic_dec(&req->rq_import->imp_inflight);
-                set->set_remaining--;
-                /* If it was disconnected, don't sweat it. */
-                if (list_empty(&req->rq_import->imp_pinger_chain)) {
-                        ptlrpc_unregister_reply(req, 0);
-                        continue;
-                }
+                CDEBUG(D_RPCTRACE, "Pinger initiate expire request(%p)\n",
+                       req);
 
-                CDEBUG(D_RPCTRACE, "pinger initiate expire_one_request\n");
+                /* This will also unregister reply. */
                 ptlrpc_expire_one_request(req, 0);
+
+                /* We're done with this req, let's finally move it to complete
+                 * phase and take care of inflights. */
+                ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
+                imp = req->rq_import;
+                spin_lock(&imp->imp_lock);
+                if (!list_empty(&req->rq_list)) {
+                        list_del_init(&req->rq_list);
+                        atomic_dec(&imp->imp_inflight);
+                }
+                spin_unlock(&imp->imp_lock);
+                set->set_remaining--;
         }
         mutex_up(&pinger_sem);