Whamcloud - gitweb
LU-9859 libcfs: rename cfs_cpt_table to cfs_cpt_tab
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 48ade0f..cc93dd4 100644 (file)
@@ -37,6 +37,7 @@
 #include <linux/delay.h>
 #include <linux/random.h>
 
+#include <lnet/lib-lnet.h>
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_lib.h>
@@ -68,6 +69,28 @@ static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
                put_page(BD_GET_KIOV(desc, i).kiov_page);
 }
 
+static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc,
+                                      void *frag, int len)
+{
+       unsigned int offset = (uintptr_t)frag & ~PAGE_MASK;
+
+       ENTRY;
+       while (len > 0) {
+               int page_len = min_t(unsigned int, PAGE_SIZE - offset,
+                                    len);
+               uintptr_t vaddr = (uintptr_t) frag;
+
+               ptlrpc_prep_bulk_page_nopin(desc,
+                                           lnet_kvaddr_to_page(vaddr),
+                                           offset, page_len);
+               offset = 0;
+               len -= page_len;
+               frag += page_len;
+       }
+
+       RETURN(desc->bd_nob);
+}
+
 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
        .add_kiov_frag  = ptlrpc_prep_bulk_page_pin,
        .release_frags  = ptlrpc_release_bulk_page_pin,
@@ -77,6 +100,7 @@ EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops);
 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
        .add_kiov_frag  = ptlrpc_prep_bulk_page_nopin,
        .release_frags  = ptlrpc_release_bulk_noop,
+       .add_iov_frag   = ptlrpc_prep_bulk_frag_pages,
 };
 EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
 
@@ -624,7 +648,7 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize,
 {
        struct ptlrpc_request_pool *pool;
 
-       OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool));
+       OBD_ALLOC_PTR(pool);
        if (!pool)
                return NULL;
 
@@ -737,6 +761,41 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
 
 static atomic64_t ptlrpc_last_xid;
 
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_import->imp_lock);
+       list_del_init(&req->rq_unreplied_list);
+       ptlrpc_assign_next_xid_nolock(req);
+       spin_unlock(&req->rq_import->imp_lock);
+       DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
+}
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       __u32 opc;
+       __u16 tag;
+
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
+       tag = obd_get_mod_rpc_slot(cli, opc);
+       lustre_msg_set_tag(req->rq_reqmsg, tag);
+       ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       if (tag != 0) {
+               struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+               __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+               obd_put_mod_rpc_slot(cli, opc, tag);
+       }
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
+
 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                             __u32 version, int opcode, char **bufs,
                             struct ptlrpc_cli_ctx *ctx)
@@ -1005,8 +1064,8 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void)
        int cpt;
 
        ENTRY;
-       cpt = cfs_cpt_current(cfs_cpt_table, 0);
-       OBD_CPT_ALLOC(set, cfs_cpt_table, cpt, sizeof(*set));
+       cpt = cfs_cpt_current(cfs_cpt_tab, 0);
+       OBD_CPT_ALLOC(set, cfs_cpt_tab, cpt, sizeof(*set));
        if (!set)
                RETURN(NULL);
        atomic_set(&set->set_refcount, 1);
@@ -1681,7 +1740,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
                spin_lock(&imp->imp_lock);
                if (!list_empty(&req->rq_list)) {
                        list_del_init(&req->rq_list);
-                       atomic_dec(&req->rq_import->imp_inflight);
+                       if (atomic_dec_and_test(&req->rq_import->imp_inflight))
+                               wake_up(&req->rq_import->imp_recovery_waitq);
                }
                spin_unlock(&imp->imp_lock);
                ptlrpc_rqphase_move(req, RQ_PHASE_NEW);
@@ -1735,14 +1795,13 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
        struct list_head *tmp, *next;
-       struct list_head  comp_reqs;
+       LIST_HEAD(comp_reqs);
        int force_timer_recalc = 0;
 
        ENTRY;
        if (atomic_read(&set->set_remaining) == 0)
                RETURN(1);
 
-       INIT_LIST_HEAD(&comp_reqs);
        list_for_each_safe(tmp, next, &set->set_requests) {
                struct ptlrpc_request *req =
                        list_entry(tmp, struct ptlrpc_request,
@@ -2138,13 +2197,14 @@ interpret:
                 */
                if (!list_empty(&req->rq_list)) {
                        list_del_init(&req->rq_list);
-                       atomic_dec(&imp->imp_inflight);
+                       if (atomic_dec_and_test(&imp->imp_inflight))
+                               wake_up(&imp->imp_recovery_waitq);
                }
                list_del_init(&req->rq_unreplied_list);
                spin_unlock(&imp->imp_lock);
 
                atomic_dec(&set->set_remaining);
-               wake_up_all(&imp->imp_recovery_waitq);
+               wake_up(&imp->imp_recovery_waitq);
 
                if (set->set_producer) {
                        /* produce a new request if possible */
@@ -2264,9 +2324,8 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
  * Callback used when waiting on sets with l_wait_event.
  * Always returns 1.
  */
-int ptlrpc_expired_set(void *data)
+void ptlrpc_expired_set(struct ptlrpc_request_set *set)
 {
-       struct ptlrpc_request_set *set = data;
        struct list_head *tmp;
        time64_t now = ktime_get_real_seconds();
 
@@ -2301,13 +2360,6 @@ int ptlrpc_expired_set(void *data)
                 */
                ptlrpc_expire_one_request(req, 1);
        }
-
-       /*
-        * When waiting for a whole set, we always break out of the
-        * sleep so we can recalculate the timeout, or enable interrupts
-        * if everyone's timed out.
-        */
-       RETURN(1);
 }
 
 /**
@@ -2325,9 +2377,8 @@ EXPORT_SYMBOL(ptlrpc_mark_interrupted);
  * Interrupts (sets interrupted flag) all uncompleted requests in
  * a set \a data. Callback for l_wait_event for interruptible waits.
  */
-static void ptlrpc_interrupted_set(void *data)
+static void ptlrpc_interrupted_set(struct ptlrpc_request_set *set)
 {
-       struct ptlrpc_request_set *set = data;
        struct list_head *tmp;
 
        LASSERT(set != NULL);
@@ -2403,7 +2454,6 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
 {
        struct list_head *tmp;
        struct ptlrpc_request *req;
-       struct l_wait_info lwi;
        time64_t timeout;
        int rc;
 
@@ -2432,49 +2482,66 @@ int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
                       set, timeout);
 
                if ((timeout == 0 && !signal_pending(current)) ||
-                   set->set_allow_intr)
+                   set->set_allow_intr) {
                        /*
                         * No requests are in-flight (ether timed out
                         * or delayed), so we can allow interrupts.
                         * We still want to block for a limited time,
                         * so we allow interrupts during the timeout.
                         */
-                       lwi = LWI_TIMEOUT_INTR_ALL(
-                                       cfs_time_seconds(timeout ? timeout : 1),
-                                       ptlrpc_expired_set,
-                                       ptlrpc_interrupted_set, set);
-               else
+                       rc = l_wait_event_abortable_timeout(
+                               set->set_waitq,
+                               ptlrpc_check_set(NULL, set),
+                               cfs_time_seconds(timeout ? timeout : 1));
+                       if (rc == 0) {
+                               rc = -ETIMEDOUT;
+                               ptlrpc_expired_set(set);
+                       } else if (rc < 0) {
+                               rc = -EINTR;
+                               ptlrpc_interrupted_set(set);
+                       } else {
+                               rc = 0;
+                       }
+               } else {
                        /*
                         * At least one request is in flight, so no
                         * interrupts are allowed. Wait until all
                         * complete, or an in-flight req times out.
                         */
-                       lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
-                                         ptlrpc_expired_set, set);
-
-               rc = l_wait_event(set->set_waitq,
-                                 ptlrpc_check_set(NULL, set), &lwi);
-
-               /*
-                * LU-769 - if we ignored the signal because it was already
-                * pending when we started, we need to handle it now or we risk
-                * it being ignored forever
-                */
-               if (rc == -ETIMEDOUT &&
-                   (!lwi.lwi_allow_intr || set->set_allow_intr) &&
-                   signal_pending(current)) {
-                       sigset_t blocked_sigs =
-                                          cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+                       rc = wait_event_idle_timeout(
+                               set->set_waitq,
+                               ptlrpc_check_set(NULL, set),
+                               cfs_time_seconds(timeout ? timeout : 1));
+                       if (rc == 0) {
+                               ptlrpc_expired_set(set);
+                               rc = -ETIMEDOUT;
+                       } else {
+                               rc = 0;
+                       }
 
                        /*
-                        * In fact we only interrupt for the "fatal" signals
-                        * like SIGINT or SIGKILL. We still ignore less
-                        * important signals since ptlrpc set is not easily
-                        * reentrant from userspace again
+                        * LU-769 - if we ignored the signal because
+                        * it was already pending when we started, we
+                        * need to handle it now or we risk it being
+                        * ignored forever
                         */
-                       if (signal_pending(current))
-                               ptlrpc_interrupted_set(set);
-                       cfs_restore_sigs(blocked_sigs);
+                       if (rc == -ETIMEDOUT &&
+                           signal_pending(current)) {
+                               sigset_t blocked_sigs =
+                                       cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
+
+                               /*
+                                * In fact we only interrupt for the
+                                * "fatal" signals like SIGINT or
+                                * SIGKILL. We still ignore less
+                                * important signals since ptlrpc set
+                                * is not easily reentrant from
+                                * userspace again
+                                */
+                               if (signal_pending(current))
+                                       ptlrpc_interrupted_set(set);
+                               cfs_restore_sigs(blocked_sigs);
+                       }
                }
 
                LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
@@ -2676,9 +2743,6 @@ EXPORT_SYMBOL(ptlrpc_req_xid);
  */
 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
 {
-       int rc;
-       struct l_wait_info lwi;
-
        /*
         * Might sleep.
         */
@@ -2719,24 +2783,25 @@ static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
         * unlinked before returning a req to the pool.
         */
        for (;;) {
-               /* The wq argument is ignored by user-space wait_event macros */
                wait_queue_head_t *wq = (request->rq_set) ?
                                        &request->rq_set->set_waitq :
                                        &request->rq_reply_waitq;
+               int seconds = LONG_UNLINK;
                /*
                 * Network access will complete in finite time but the HUGE
                 * timeout lets us CWARN for visibility of sluggish NALs
                 */
-               lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
-                                          cfs_time_seconds(1), NULL, NULL);
-               rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
-                                 &lwi);
-               if (rc == 0) {
+               while (seconds > 0 &&
+                      wait_event_idle_timeout(
+                              *wq,
+                              !ptlrpc_client_recv_or_unlink(request),
+                              cfs_time_seconds(1)) == 0)
+                       seconds -= 1;
+               if (seconds > 0) {
                        ptlrpc_rqphase_move(request, request->rq_next_phase);
                        RETURN(1);
                }
 
-               LASSERT(rc == -ETIMEDOUT);
                DEBUG_REQ(D_WARNING, request,
                          "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
                          request->rq_receiving_reply,
@@ -3335,7 +3400,8 @@ void ptlrpc_init_xid(void)
        }
 
        /* Need to always be aligned to a power-of-two for mutli-bulk BRW */
-       CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
+       BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) !=
+                    0);
        xid &= PTLRPC_BULK_OPS_MASK;
        atomic64_set(&ptlrpc_last_xid, xid);
 }