Whamcloud - gitweb
LU-9859 libcfs: rename cfs_cpt_table to cfs_cpt_tab
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 85bb30d..cc93dd4 100644 (file)
@@ -1064,8 +1064,8 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
        int cpt;
 
        ENTRY;
-       cpt = cfs_cpt_current(cfs_cpt_table, 0);
-       OBD_CPT_ALLOC(set, cfs_cpt_table, cpt, sizeof(*set));
+       cpt = cfs_cpt_current(cfs_cpt_tab, 0);
+       OBD_CPT_ALLOC(set, cfs_cpt_tab, cpt, sizeof(*set));
        if (!set)
                RETURN(NULL);
        atomic_set(&set->set_refcount, 1);
@@ -1740,7 +1740,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
                spin_lock(&imp->imp_lock);
                if (!list_empty(&req->rq_list)) {
                        list_del_init(&req->rq_list);
-                       atomic_dec(&req->rq_import->imp_inflight);
+                       if (atomic_dec_and_test(&req->rq_import->imp_inflight))
+                               wake_up(&req->rq_import->imp_recovery_waitq);
                }
                spin_unlock(&imp->imp_lock);
                ptlrpc_rqphase_move(req, RQ_PHASE_NEW);
@@ -1794,14 +1795,13 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
        struct list_head *tmp, *next;
-       struct list_head  comp_reqs;
+       LIST_HEAD(comp_reqs);
        int force_timer_recalc = 0;
 
        ENTRY;
        if (atomic_read(&set->set_remaining) == 0)
                RETURN(1);
 
-       INIT_LIST_HEAD(&comp_reqs);
        list_for_each_safe(tmp, next, &set->set_requests) {
                struct ptlrpc_request *req =
                        list_entry(tmp, struct ptlrpc_request,
@@ -2197,13 +2197,14 @@ interpret:
                 */
                if (!list_empty(&req->rq_list)) {
                        list_del_init(&req->rq_list);
-                       atomic_dec(&imp->imp_inflight);
+                       if (atomic_dec_and_test(&imp->imp_inflight))
+                               wake_up(&imp->imp_recovery_waitq);
                }
                list_del_init(&req->rq_unreplied_list);
                spin_unlock(&imp->imp_lock);
 
                atomic_dec(&set->set_remaining);
-               wake_up_all(&imp->imp_recovery_waitq);
+               wake_up(&imp->imp_recovery_waitq);
 
                if (set->set_producer) {
                        /* produce a new request if possible */
@@ -2323,9 +2324,8 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
  * Callback used when waiting on sets with l_wait_event.
  * Always returns 1.
  */
-int ptlrpc_expired_set(void *data)
+void ptlrpc_expired_set(struct ptlrpc_request_set *set)
 {
-       struct ptlrpc_request_set *set = data;
        struct list_head *tmp;
        time64_t now = ktime_get_real_seconds();
 
@@ -2360,13 +2360,6 @@ int ptlrpc_expired_set(void *data)
                 */
                ptlrpc_expire_one_request(req, 1);
        }
-
-       /*
-        * When waiting for a whole set, we always break out of the
-        * sleep so we can recalculate the timeout, or enable interrupts
-        * if everyone's timed out.
-        */
-       RETURN(1);
 }
 
 /**
@@ -2384,9 +2377,8 @@ EXPORT_SYMBOL(ptlrpc_mark_interrupted);
  * Interrupts (sets interrupted flag) all uncompleted requests in
  * a set \a data. Callback for l_wait_event for interruptible waits.
  */
-static void ptlrpc_interrupted_set(void *data)
+static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
 {
-       struct ptlrpc_request_set *set = data;
        struct list_head *tmp;
 
        LASSERT(set != NULL);
@@ -2462,7 +2454,6 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
        struct list_head *tmp;
        struct ptlrpc_request *req;
-       struct l_wait_info lwi;
        time64_t timeout;
        int rc;
 
@@ -2491,49 +2482,66 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                       set, timeout);
 
                if ((timeout == 0 && !signal_pending(current)) ||
-                   set->set_allow_intr)
+                   set->set_allow_intr) {
                        /*
                         * No requests are in-flight (ether timed out
                         * or delayed), so we can allow interrupts.
                         * We still want to block for a limited time,
                         * so we allow interrupts during the timeout.
                         */
-                       lwi = LWI_TIMEOUT_INTR_ALL(
-                                       cfs_time_seconds(timeout ? timeout : 1),
-                                       ptlrpc_expired_set,
-                                       ptlrpc_interrupted_set, set);
-               else
+                       rc = l_wait_event_abortable_timeout(
+                               set->set_waitq,
+                               ptlrpc_check_set(NULL, set),
+                               cfs_time_seconds(timeout ? timeout : 1));
+                       if (rc == 0) {
+                               rc = -ETIMEDOUT;
+                               ptlrpc_expired_set(set);
+                       } else if (rc < 0) {
+                               rc = -EINTR;
+                               ptlrpc_interrupted_set(set);
+                       } else {
+                               rc = 0;
+                       }
+               } else {
                        /*
                         * At least one request is in flight, so no
                         * interrupts are allowed. Wait until all
                         * complete, or an in-flight req times out.
                         */
-                       lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
-                                         ptlrpc_expired_set, set);
-
-               rc = l_wait_event(set->set_waitq,
-                                 ptlrpc_check_set(NULL, set), &lwi);
-
-               /*
-                * LU-769 - if we ignored the signal because it was already
-                * pending when we started, we need to handle it now or we risk
-                * it being ignored forever
-                */
-               if (rc == -ETIMEDOUT &&
-                   (!lwi.lwi_allow_intr || set->set_allow_intr) &&
-                   signal_pending(current)) {
-                       sigset_t blocked_sigs =
-                                          cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+                       rc = wait_event_idle_timeout(
+                               set->set_waitq,
+                               ptlrpc_check_set(NULL, set),
+                               cfs_time_seconds(timeout ? timeout : 1));
+                       if (rc == 0) {
+                               ptlrpc_expired_set(set);
+                               rc = -ETIMEDOUT;
+                       } else {
+                               rc = 0;
+                       }
 
                        /*
-                        * In fact we only interrupt for the "fatal" signals
-                        * like SIGINT or SIGKILL. We still ignore less
-                        * important signals since ptlrpc set is not easily
-                        * reentrant from userspace again
+                        * LU-769 - if we ignored the signal because
+                        * it was already pending when we started, we
+                        * need to handle it now or we risk it being
+                        * ignored forever
                         */
-                       if (signal_pending(current))
-                               ptlrpc_interrupted_set(set);
-                       cfs_restore_sigs(blocked_sigs);
+                       if (rc == -ETIMEDOUT &&
+                           signal_pending(current)) {
+                               sigset_t blocked_sigs =
+                                       cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+
+                               /*
+                                * In fact we only interrupt for the
+                                * "fatal" signals like SIGINT or
+                                * SIGKILL. We still ignore less
+                                * important signals since ptlrpc set
+                                * is not easily reentrant from
+                                * userspace again
+                                */
+                               if (signal_pending(current))
+                                       ptlrpc_interrupted_set(set);
+                               cfs_restore_sigs(blocked_sigs);
+                       }
                }
 
                LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
@@ -2735,9 +2743,6 @@ EXPORT_SYMBOL(ptlrpc_req_xid);
  */
 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
 {
-       int rc;
-       struct l_wait_info lwi;
-
        /*
         * Might sleep.
         */
@@ -2778,24 +2783,25 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         * unlinked before returning a req to the pool.
         */
        for (;;) {
-               /* The wq argument is ignored by user-space wait_event macros */
                wait_queue_head_t *wq = (request->rq_set) ?
                                        &request->rq_set->set_waitq :
                                        &request->rq_reply_waitq;
+               int seconds = LONG_UNLINK;
                /*
                 * Network access will complete in finite time but the HUGE
                 * timeout lets us CWARN for visibility of sluggish NALs
                 */
-               lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
-                                          cfs_time_seconds(1), NULL, NULL);
-               rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
-                                 &lwi);
-               if (rc == 0) {
+               while (seconds > 0 &&
+                      wait_event_idle_timeout(
+                              *wq,
+                              !ptlrpc_client_recv_or_unlink(request),
+                              cfs_time_seconds(1)) == 0)
+                       seconds -= 1;
+               if (seconds > 0) {
                        ptlrpc_rqphase_move(request, request->rq_next_phase);
                        RETURN(1);
                }
 
-               LASSERT(rc == -ETIMEDOUT);
                DEBUG_REQ(D_WARNING, request,
                          "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
                          request->rq_receiving_reply,