Whamcloud - gitweb
LU-769 ptlrpc: Do not miss pending signals in ptlrpc_set wait
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index bb7eed7..e9979df 100644 (file)
@@ -856,16 +856,24 @@ struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp,
  */
 void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
 {
-        /* if we kill request before timeout - need adjust counter */
-        if (req->rq_phase == RQ_PHASE_RPC) {
-                struct ptlrpc_request_set *set = req->rq_set;
+        struct ptlrpc_request_set *set = req->rq_set;
+        int wakeup = 0;
 
-                if (set)
-                        cfs_atomic_dec(&set->set_remaining);
-        }
+        /* hold ref on the request to prevent others (ptlrpcd) to free it */
+        ptlrpc_request_addref(req);
+        cfs_list_del_init(&req->rq_list);
+
+        /* if we kill request before timeout - need adjust counter */
+        if (req->rq_phase == RQ_PHASE_RPC && set != NULL &&
+            cfs_atomic_dec_and_test(&set->set_remaining))
+                wakeup = 1;
 
         ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
-        cfs_list_del_init(&req->rq_list);
+
+        /* Only need to call wakeup once when to be empty. */
+        if (wakeup)
+                cfs_waitq_signal(&set->set_waitq);
+        ptlrpc_req_finished(req);
 }
 
 /**
@@ -880,8 +888,10 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
         OBD_ALLOC(set, sizeof *set);
         if (!set)
                 RETURN(NULL);
+        cfs_atomic_set(&set->set_refcount, 1);
         CFS_INIT_LIST_HEAD(&set->set_requests);
         cfs_waitq_init(&set->set_waitq);
+        cfs_atomic_set(&set->set_new_count, 0);
         cfs_atomic_set(&set->set_remaining, 0);
         cfs_spin_lock_init(&set->set_new_req_lock);
         CFS_INIT_LIST_HEAD(&set->set_new_requests);
@@ -945,7 +955,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
 
         LASSERT(cfs_atomic_read(&set->set_remaining) == 0);
 
-        OBD_FREE(set, sizeof(*set));
+        ptlrpc_reqset_put(set);
         EXIT;
 }
 
@@ -977,43 +987,49 @@ int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
                         struct ptlrpc_request *req)
 {
+        LASSERT(cfs_list_empty(&req->rq_set_chain));
+
         /* The set takes over the caller's request reference */
         cfs_list_add_tail(&req->rq_set_chain, &set->set_requests);
         req->rq_set = set;
         cfs_atomic_inc(&set->set_remaining);
-        req->rq_queued_time = cfs_time_current(); /* Where is the best place to set this? */
+        req->rq_queued_time = cfs_time_current();
 }
 
 /**
  * Add a request to a request with dedicated server thread
  * and wake the thread to make any necessary processing.
  * Currently only used for ptlrpcd.
- * Returns 0 if succesful or non zero error code on error.
- * (the only possible error for now is if the dedicated server thread
- * is shutting down)
  */
-int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
+void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
                            struct ptlrpc_request *req)
 {
         struct ptlrpc_request_set *set = pc->pc_set;
+        int count, i;
 
-        /*
-         * Let caller know that we stopped and will not handle this request.
-         * It needs to take care itself of request.
-         */
-        if (cfs_test_bit(LIOD_STOP, &pc->pc_flags))
-                return -EALREADY;
+        LASSERT(req->rq_set == NULL);
+        LASSERT(cfs_test_bit(LIOD_STOP, &pc->pc_flags) == 0);
 
         cfs_spin_lock(&set->set_new_req_lock);
         /*
          * The set takes over the caller's request reference.
          */
-        cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
         req->rq_set = set;
+        req->rq_queued_time = cfs_time_current();
+        cfs_list_add_tail(&req->rq_set_chain, &set->set_new_requests);
+        count = cfs_atomic_inc_return(&set->set_new_count);
         cfs_spin_unlock(&set->set_new_req_lock);
 
-        cfs_waitq_signal(&set->set_waitq);
-        return 0;
+        /* Only need to call wakeup once for the first entry. */
+        if (count == 1) {
+                cfs_waitq_signal(&set->set_waitq);
+
+                /* XXX: It maybe unnecessary to wakeup all the partners. But to
+                 *      guarantee the async RPC can be processed ASAP, we have
+                 *      no other better choice. It maybe fixed in future. */
+                for (i = 0; i < pc->pc_npartners; i++)
+                        cfs_waitq_signal(&pc->pc_partners[i]->pc_set->set_waitq);
+        }
 }
 
 /**
@@ -1891,7 +1907,7 @@ void ptlrpc_interrupted_set(void *data)
         cfs_list_t *tmp;
 
         LASSERT(set != NULL);
-        CERROR("INTERRUPTED SET %p\n", set);
+        CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
 
         cfs_list_for_each(tmp, &set->set_requests) {
                 struct ptlrpc_request *req =
@@ -2008,6 +2024,23 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
 
                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
 
+                /* LU-769 - if we ignored the signal because it was already
+                 * pending when we started, we need to handle it now or we risk
+                 * it being ignored forever */
+                if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
+                    cfs_signal_pending()) {
+                        cfs_sigset_t blocked_sigs =
+                                           cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+
+                        /* In fact we only interrupt for the "fatal" signals
+                         * like SIGINT or SIGKILL. We still ignore less
+                         * important signals since ptlrpc set is not easily
+                         * reentrant from userspace again */
+                        if (cfs_signal_pending())
+                                ptlrpc_interrupted_set(set);
+                        cfs_block_sigs(blocked_sigs);
+                }
+
                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
 
                 /* -EINTR => all requests have been flagged rq_intr so next
@@ -2623,7 +2656,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         cfs_atomic_inc(&req->rq_import->imp_replay_inflight);
         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
 
-        ptlrpcd_add_req(req, PSCOPE_OTHER);
+        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
         RETURN(0);
 }