Whamcloud - gitweb
ORNL-22 general ptlrpcd threads pool support
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index bb7eed7..483ca5d 100644 (file)
@@ -856,16 +856,24 @@ struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp,
  */
 void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
 {
-        /* if we kill request before timeout - need adjust counter */
-        if (req->rq_phase == RQ_PHASE_RPC) {
-                struct ptlrpc_request_set *set = req->rq_set;
+        struct ptlrpc_request_set *set = req->rq_set;
+        int wakeup = 0;
 
-                if (set)
-                        cfs_atomic_dec(&set->set_remaining);
-        }
+        /* hold ref on the request to prevent others (ptlrpcd) to free it */
+        ptlrpc_request_addref(req);
+        cfs_list_del_init(&req->rq_list);
+
+        /* if we kill request before timeout - need adjust counter */
+        if (req->rq_phase == RQ_PHASE_RPC && set != NULL &&
+            cfs_atomic_dec_and_test(&set->set_remaining))
+                wakeup = 1;
 
         ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
-        cfs_list_del_init(&req->rq_list);
+
+        /* Only need to call wakeup once when to be empty. */
+        if (wakeup)
+                cfs_waitq_signal(&set->set_waitq);
+        ptlrpc_req_finished(req);
 }
 
 /**
@@ -880,8 +888,10 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
         OBD_ALLOC(set, sizeof *set);
         if (!set)
                 RETURN(NULL);
+        cfs_atomic_set(&set->set_refcount, 1);
         CFS_INIT_LIST_HEAD(&set->set_requests);
         cfs_waitq_init(&set->set_waitq);
+        cfs_atomic_set(&set->set_new_count, 0);
         cfs_atomic_set(&set->set_remaining, 0);
         cfs_spin_lock_init(&set->set_new_req_lock);
         CFS_INIT_LIST_HEAD(&set->set_new_requests);
@@ -945,7 +955,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 
         LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
 
-        OBD_FREE(set, sizeof(*set));
+        ptlrpc_reqset_put(set);
         EXIT;
 }
 
@@ -977,43 +987,49 @@ int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                         struct ptlrpc_request *req)
 {
+        LASSERT(cfs_list_empty(&req->rq_set_chain));
+
         /* The set takes over the caller's request reference */
         cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
         req->rq_set = set;
         cfs_atomic_inc(&set->set_remaining);
-        req->rq_queued_time = cfs_time_current(); /* Where is the best place to set this? */
+        req->rq_queued_time = cfs_time_current();
 }
 
 /**
  * Add a request to a request with dedicated server thread
  * and wake the thread to make any necessary processing.
  * Currently only used for ptlrpcd.
- * Returns 0 if succesful or non zero error code on error.
- * (the only possible error for now is if the dedicated server thread
- * is shutting down)
  */
-int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
+void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
                            struct ptlrpc_request *req)
 {
         struct ptlrpc_request_set *set = pc->pc_set;
+        int count, i;
 
-        /*
-         * Let caller know that we stopped and will not handle this request.
-         * It needs to take care itself of request.
-         */
-        if (cfs_test_bit(LIOD_STOP, &pc->pc_flags))
-                return -EALREADY;
+        LASSERT(req->rq_set == NULL);
+        LASSERT(cfs_test_bit(LIOD_STOP, &pc->pc_flags) == 0);
 
         cfs_spin_lock(&set->set_new_req_lock);
         /*
          * The set takes over the caller's request reference.
          */
-        cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
         req->rq_set = set;
+        req->rq_queued_time = cfs_time_current();
+        cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
+        count = cfs_atomic_inc_return(&set->set_new_count);
         cfs_spin_unlock(&set->set_new_req_lock);
 
-        cfs_waitq_signal(&set->set_waitq);
-        return 0;
+        /* Only need to call wakeup once for the first entry. */
+        if (count == 1) {
+                cfs_waitq_signal(&set->set_waitq);
+
+                /* XXX: It maybe unnecessary to wakeup all the partners. But to
+                 *      guarantee the async RPC can be processed ASAP, we have
+                 *      no other better choice. It maybe fixed in future. */
+                for (i = 0; i < pc->pc_npartners; i++)
+                        cfs_waitq_signal(&pc->pc_partners[i]->pc_set->set_waitq);
+        }
 }
 
 /**
@@ -2623,7 +2639,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         cfs_atomic_inc(&req->rq_import->imp_replay_inflight);
         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
 
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
         RETURN(0);
 }