Whamcloud - gitweb
LU-12567 ptlrpc: handle reply and resend reorder
[fs/lustre-release.git] / lustre / ptlrpc / events.c
index f7a4c88..fe07a49 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
 #include <lustre_sec.h>
 #include "ptlrpc_internal.h"
 
-struct lnet_eq *ptlrpc_eq;
+lnet_handler_t ptlrpc_handler;
+struct percpu_ref ptlrpc_pending;
 
 /*
  *  Client's outgoing request callback
  */
 void request_out_callback(struct lnet_event *ev)
 {
-       struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
+       struct ptlrpc_cb_id   *cbid = ev->md_user_ptr;
        struct ptlrpc_request *req = cbid->cbid_arg;
        bool                   wakeup = false;
        ENTRY;
@@ -55,8 +55,19 @@ void request_out_callback(struct lnet_event *ev)
        LASSERT(ev->type == LNET_EVENT_SEND || ev->type == LNET_EVENT_UNLINK);
        LASSERT(ev->unlinked);
 
+       if (unlikely(lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val &&
+                    CFS_FAIL_CHECK_RESET(OBD_FAIL_NET_ERROR_RPC,
+                                         OBD_FAIL_OSP_PRECREATE_PAUSE |
+                                         CFS_FAIL_ONCE)))
+               ev->status = -ECONNABORTED;
+
        DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
 
+       /* Do not update imp_next_ping for connection request */
+       if (lustre_msg_get_opc(req->rq_reqmsg) !=
+           req->rq_import->imp_connect_op)
+               ptlrpc_pinger_sending_on_import(req->rq_import);
+
        sptlrpc_request_out_callback(req);
 
        spin_lock(&req->rq_lock);
@@ -87,18 +98,19 @@ void request_out_callback(struct lnet_event *ev)
  */
 void reply_in_callback(struct lnet_event *ev)
 {
-        struct ptlrpc_cb_id   *cbid = ev->md.user_ptr;
-        struct ptlrpc_request *req = cbid->cbid_arg;
-        ENTRY;
+       struct ptlrpc_cb_id   *cbid = ev->md_user_ptr;
+       struct ptlrpc_request *req = cbid->cbid_arg;
+       ENTRY;
 
-        DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
+       DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
 
-        LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
-        LASSERT (ev->md.start == req->rq_repbuf);
-        LASSERT (ev->offset + ev->mlength <= req->rq_repbuf_len);
-        /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
-           for adaptive timeouts' early reply. */
-        LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
+       LASSERT(ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
+       LASSERT(ev->md_start == req->rq_repbuf);
+       LASSERT(ev->offset + ev->mlength <= req->rq_repbuf_len);
+       /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
+        * for adaptive timeouts' early reply.
+        */
+       LASSERT((ev->md_options & LNET_MD_MANAGE_REMOTE) != 0);
 
        spin_lock(&req->rq_lock);
 
@@ -178,10 +190,10 @@ out_wake:
  */
 void client_bulk_callback(struct lnet_event *ev)
 {
-        struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
-        struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
-        struct ptlrpc_request   *req;
-        ENTRY;
+       struct ptlrpc_cb_id     *cbid = ev->md_user_ptr;
+       struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
+       struct ptlrpc_request   *req;
+       ENTRY;
 
        LASSERT((ptlrpc_is_bulk_put_sink(desc->bd_type) &&
                 ev->type == LNET_EVENT_PUT) ||
@@ -190,20 +202,20 @@ void client_bulk_callback(struct lnet_event *ev)
                ev->type == LNET_EVENT_UNLINK);
        LASSERT(ev->unlinked);
 
-        if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
-                ev->status = -EIO;
+       if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
+               ev->status = -EIO;
 
-        if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,CFS_FAIL_ONCE))
-                ev->status = -EIO;
+       if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,CFS_FAIL_ONCE))
+               ev->status = -EIO;
 
-        CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
-               "event type %d, status %d, desc %p\n",
-               ev->type, ev->status, desc);
+       CDEBUG_LIMIT((ev->status == 0) ? D_NET : D_ERROR,
+                    "event type %d, status %d, desc %p\n",
+                    ev->type, ev->status, desc);
 
        spin_lock(&desc->bd_lock);
        req = desc->bd_req;
-       LASSERT(desc->bd_md_count > 0);
-       desc->bd_md_count--;
+       LASSERT(desc->bd_refs > 0);
+       desc->bd_refs--;
 
        if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
                desc->bd_nob_transferred += ev->mlength;
@@ -213,14 +225,13 @@ void client_bulk_callback(struct lnet_event *ev)
                spin_lock(&req->rq_lock);
                req->rq_net_err = 1;
                spin_unlock(&req->rq_lock);
+               desc->bd_failure = 1;
        }
 
-       if (ev->status != 0)
-               desc->bd_failure = 1;
 
        /* NB don't unlock till after wakeup; desc can disappear under us
         * otherwise */
-       if (desc->bd_md_count == 0)
+       if (desc->bd_refs == 0)
                ptlrpc_client_wake_req(desc->bd_req);
 
        spin_unlock(&desc->bd_lock);
@@ -288,37 +299,36 @@ static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
  */
 void request_in_callback(struct lnet_event *ev)
 {
-       struct ptlrpc_cb_id               *cbid = ev->md.user_ptr;
+       struct ptlrpc_cb_id               *cbid = ev->md_user_ptr;
        struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
        struct ptlrpc_service_part        *svcpt = rqbd->rqbd_svcpt;
-       struct ptlrpc_service             *service = svcpt->scp_service;
-        struct ptlrpc_request             *req;
-        ENTRY;
+       struct ptlrpc_service             *service = svcpt->scp_service;
+       struct ptlrpc_request             *req;
+       ENTRY;
 
-        LASSERT (ev->type == LNET_EVENT_PUT ||
-                 ev->type == LNET_EVENT_UNLINK);
-        LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
-        LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
-                 rqbd->rqbd_buffer + service->srv_buf_size);
+       LASSERT(ev->type == LNET_EVENT_PUT ||
+               ev->type == LNET_EVENT_UNLINK);
+       LASSERT((char *)ev->md_start >= rqbd->rqbd_buffer);
+       LASSERT((char *)ev->md_start + ev->offset + ev->mlength <=
+               rqbd->rqbd_buffer + service->srv_buf_size);
 
-        CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
-               "event type %d, status %d, service %s\n",
-               ev->type, ev->status, service->srv_name);
+       CDEBUG_LIMIT((ev->status == 0) ? D_NET : D_ERROR,
+                    "event type %d, status %d, service %s\n",
+                    ev->type, ev->status, service->srv_name);
 
-        if (ev->unlinked) {
-                /* If this is the last request message to fit in the
-                 * request buffer we can use the request object embedded in
-                 * rqbd.  Note that if we failed to allocate a request,
-                 * we'd have to re-post the rqbd, which we can't do in this
-                 * context. */
-                req = &rqbd->rqbd_req;
-                memset(req, 0, sizeof (*req));
-        } else {
-                LASSERT (ev->type == LNET_EVENT_PUT);
-                if (ev->status != 0) {
-                        /* We moaned above already... */
-                        return;
-                }
+       if (ev->unlinked) {
+               /* If this is the last request message to fit in the
+                * request buffer we can use the request object embedded in
+                * rqbd.  Note that if we failed to allocate a request,
+                * we'd have to re-post the rqbd, which we can't do in this
+                * context.
+                */
+               req = &rqbd->rqbd_req;
+               memset(req, 0, sizeof(*req));
+       } else {
+               LASSERT(ev->type == LNET_EVENT_PUT);
+               if (ev->status != 0) /* We moaned above already... */
+                       return;
                req = ptlrpc_request_cache_alloc(GFP_ATOMIC);
                 if (req == NULL) {
                         CERROR("Can't allocate incoming request descriptor: "
@@ -334,7 +344,7 @@ void request_in_callback(struct lnet_event *ev)
         * flags are reset and scalars are zero.  We only set the message
         * size to non-zero if this was a successful receive. */
        req->rq_xid = ev->match_bits;
-       req->rq_reqbuf = ev->md.start + ev->offset;
+       req->rq_reqbuf = ev->md_start + ev->offset;
        if (ev->type == LNET_EVENT_PUT && ev->status == 0)
                req->rq_reqdata_len = ev->mlength;
        ktime_get_real_ts64(&req->rq_arrival_time);
@@ -390,7 +400,7 @@ void request_in_callback(struct lnet_event *ev)
  */
 void reply_out_callback(struct lnet_event *ev)
 {
-       struct ptlrpc_cb_id       *cbid = ev->md.user_ptr;
+       struct ptlrpc_cb_id       *cbid = ev->md_user_ptr;
        struct ptlrpc_reply_state *rs = cbid->cbid_arg;
        struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
         ENTRY;
@@ -435,7 +445,7 @@ void reply_out_callback(struct lnet_event *ev)
  */
 void server_bulk_callback(struct lnet_event *ev)
 {
-       struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
+       struct ptlrpc_cb_id     *cbid = ev->md_user_ptr;
        struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
        ENTRY;
 
@@ -446,13 +456,13 @@ void server_bulk_callback(struct lnet_event *ev)
                (ptlrpc_is_bulk_get_sink(desc->bd_type) &&
                 ev->type == LNET_EVENT_REPLY));
 
-        CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
-               "event type %d, status %d, desc %p\n",
-               ev->type, ev->status, desc);
+       CDEBUG_LIMIT((ev->status == 0) ? D_NET : D_ERROR,
+                    "event type %d, status %d, desc %p\n",
+                    ev->type, ev->status, desc);
 
        spin_lock(&desc->bd_lock);
 
-       LASSERT(desc->bd_md_count > 0);
+       LASSERT(desc->bd_refs > 0);
 
        if ((ev->type == LNET_EVENT_ACK ||
             ev->type == LNET_EVENT_REPLY) &&
@@ -468,9 +478,9 @@ void server_bulk_callback(struct lnet_event *ev)
                desc->bd_failure = 1;
 
        if (ev->unlinked) {
-               desc->bd_md_count--;
+               desc->bd_refs--;
                /* This is the last callback no matter what... */
-               if (desc->bd_md_count == 0)
+               if (desc->bd_refs == 0)
                        wake_up(&desc->bd_waitq);
        }
 
@@ -481,35 +491,37 @@ void server_bulk_callback(struct lnet_event *ev)
 
 static void ptlrpc_master_callback(struct lnet_event *ev)
 {
-        struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+       struct ptlrpc_cb_id *cbid = ev->md_user_ptr;
        void (*callback)(struct lnet_event *ev) = cbid->cbid_fn;
 
-        /* Honestly, it's best to find out early. */
-        LASSERT (cbid->cbid_arg != LP_POISON);
-        LASSERT (callback == request_out_callback ||
-                 callback == reply_in_callback ||
-                 callback == client_bulk_callback ||
-                 callback == request_in_callback ||
-                 callback == reply_out_callback
+       /* Honestly, it's best to find out early. */
+       LASSERT(cbid->cbid_arg != LP_POISON);
+       LASSERT(callback == request_out_callback ||
+               callback == reply_in_callback ||
+               callback == client_bulk_callback ||
+               callback == request_in_callback ||
+               callback == reply_out_callback
 #ifdef HAVE_SERVER_SUPPORT
-                 || callback == server_bulk_callback
+               || callback == server_bulk_callback
 #endif
-                 );
+               );
 
-        callback (ev);
+       callback(ev);
+       if (ev->unlinked)
+               percpu_ref_put(&ptlrpc_pending);
 }
 
 int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
                        struct lnet_process_id *peer, lnet_nid_t *self)
 {
-       int               best_dist = 0;
-       __u32             best_order = 0;
-       int               count = 0;
-       int               rc = -ENOENT;
-       int               dist;
-       __u32             order;
-       lnet_nid_t        dst_nid;
-       lnet_nid_t        src_nid;
+       int best_dist = 0;
+       __u32 best_order = 0;
+       int count = 0;
+       int rc = -ENOENT;
+       int dist;
+       __u32 order;
+       lnet_nid_t dst_nid;
+       lnet_nid_t src_nid;
 
        peer->pid = LNET_PID_LUSTRE;
 
@@ -524,7 +536,7 @@ int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
                        continue;
 
                if (dist == 0) {                /* local! use loopback LND */
-                       peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
+                       peer->nid = *self = LNET_NID_LO_0;
                        rc = 0;
                        break;
                }
@@ -545,36 +557,26 @@ int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
        return rc;
 }
 
-void ptlrpc_ni_fini(void)
+static struct completion ptlrpc_done;
+
+static void ptlrpc_release(struct percpu_ref *ref)
 {
-       int                 rc;
-       int                 retries;
+       complete(&ptlrpc_done);
+}
 
+static void ptlrpc_ni_fini(void)
+{
        /* Wait for the event queue to become idle since there may still be
         * messages in flight with pending events (i.e. the fire-and-forget
         * messages == client requests and "non-difficult" server
         * replies */
 
-       for (retries = 0;; retries++) {
-               rc = LNetEQFree(ptlrpc_eq);
-               switch (rc) {
-               default:
-                       LBUG();
+       init_completion(&ptlrpc_done);
+       percpu_ref_kill(&ptlrpc_pending);
+       wait_for_completion(&ptlrpc_done);
 
-               case 0:
-                       LNetNIFini();
-                       return;
-
-               case -EBUSY:
-                       if (retries != 0)
-                               CWARN("Event queue still busy\n");
-
-                       /* Wait for a bit */
-                       ssleep(2);
-                       break;
-               }
-       }
-       /* notreached */
+       lnet_assert_handler_unused(ptlrpc_handler);
+       LNetNIFini();
 }
 
 lnet_pid_t ptl_get_pid(void)
@@ -584,34 +586,34 @@ lnet_pid_t ptl_get_pid(void)
 
 int ptlrpc_ni_init(void)
 {
-        int              rc;
-        lnet_pid_t       pid;
+       int rc;
+       lnet_pid_t pid;
 
-        pid = ptl_get_pid();
-        CDEBUG(D_NET, "My pid is: %x\n", pid);
+       pid = ptl_get_pid();
+       CDEBUG(D_NET, "My pid is: %x\n", pid);
 
-        /* We're not passing any limits yet... */
-        rc = LNetNIInit(pid);
-        if (rc < 0) {
-                CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
+       /* We're not passing any limits yet... */
+       rc = LNetNIInit(pid);
+       if (rc < 0) {
+               CDEBUG(D_NET, "ptlrpc: Can't init network interface: rc = %d\n",
+                      rc);
                return rc;
-        }
+       }
 
-        /* CAVEAT EMPTOR: how we process portals events is _radically_
-         * different depending on... */
+       rc = percpu_ref_init(&ptlrpc_pending, ptlrpc_release, 0, GFP_KERNEL);
+       if (rc) {
+               CERROR("ptlrpc: Can't init percpu refcount: rc = %d\n", rc);
+               return rc;
+       }
+       /* CAVEAT EMPTOR: how we process portals events is _radically_
+        * different depending on...
+        */
        /* kernel LNet calls our master callback when there are new event,
         * because we are guaranteed to get every event via callback,
         * so we just set EQ size to 0 to avoid overhread of serializing
         * enqueue/dequeue operations in LNet. */
-       ptlrpc_eq = LNetEQAlloc(0, ptlrpc_master_callback);
-       if (!IS_ERR(ptlrpc_eq))
-               return 0;
-
-       rc = PTR_ERR(ptlrpc_eq);
-       CERROR("Failed to allocate event queue: %d\n", rc);
-       LNetNIFini();
-
-       return rc;
+       ptlrpc_handler = ptlrpc_master_callback;
+       return 0;
 }
 
 int ptlrpc_init_portals(void)