Whamcloud - gitweb
LU-12567 ptlrpc: handle reply and resend reorder
[fs/lustre-release.git] / lustre / ptlrpc / events.c
index 7ef8f67..fe07a49 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
@@ -56,8 +55,19 @@ void request_out_callback(struct lnet_event *ev)
        LASSERT(ev->type == LNET_EVENT_SEND || ev->type == LNET_EVENT_UNLINK);
        LASSERT(ev->unlinked);
 
+       if (unlikely(lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val &&
+                    CFS_FAIL_CHECK_RESET(OBD_FAIL_NET_ERROR_RPC,
+                                         OBD_FAIL_OSP_PRECREATE_PAUSE |
+                                         CFS_FAIL_ONCE)))
+               ev->status = -ECONNABORTED;
+
        DEBUG_REQ(D_NET, req, "type %d, status %d", ev->type, ev->status);
 
+       /* Do not update imp_next_ping for connection request */
+       if (lustre_msg_get_opc(req->rq_reqmsg) !=
+           req->rq_import->imp_connect_op)
+               ptlrpc_pinger_sending_on_import(req->rq_import);
+
        sptlrpc_request_out_callback(req);
 
        spin_lock(&req->rq_lock);
@@ -192,15 +202,15 @@ void client_bulk_callback(struct lnet_event *ev)
                ev->type == LNET_EVENT_UNLINK);
        LASSERT(ev->unlinked);
 
-        if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
-                ev->status = -EIO;
+       if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB, CFS_FAIL_ONCE))
+               ev->status = -EIO;
 
-        if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,CFS_FAIL_ONCE))
-                ev->status = -EIO;
+       if (CFS_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2,CFS_FAIL_ONCE))
+               ev->status = -EIO;
 
-        CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
-               "event type %d, status %d, desc %p\n",
-               ev->type, ev->status, desc);
+       CDEBUG_LIMIT((ev->status == 0) ? D_NET : D_ERROR,
+                    "event type %d, status %d, desc %p\n",
+                    ev->type, ev->status, desc);
 
        spin_lock(&desc->bd_lock);
        req = desc->bd_req;
@@ -215,10 +225,9 @@ void client_bulk_callback(struct lnet_event *ev)
                spin_lock(&req->rq_lock);
                req->rq_net_err = 1;
                spin_unlock(&req->rq_lock);
+               desc->bd_failure = 1;
        }
 
-       if (ev->status != 0)
-               desc->bd_failure = 1;
 
        /* NB don't unlock till after wakeup; desc can disappear under us
         * otherwise */
@@ -303,24 +312,23 @@ void request_in_callback(struct lnet_event *ev)
        LASSERT((char *)ev->md_start + ev->offset + ev->mlength <=
                rqbd->rqbd_buffer + service->srv_buf_size);
 
-       CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
-              "event type %d, status %d, service %s\n",
-              ev->type, ev->status, service->srv_name);
+       CDEBUG_LIMIT((ev->status == 0) ? D_NET : D_ERROR,
+                    "event type %d, status %d, service %s\n",
+                    ev->type, ev->status, service->srv_name);
 
-        if (ev->unlinked) {
-                /* If this is the last request message to fit in the
-                 * request buffer we can use the request object embedded in
-                 * rqbd.  Note that if we failed to allocate a request,
-                 * we'd have to re-post the rqbd, which we can't do in this
-                 * context. */
-                req = &rqbd->rqbd_req;
-                memset(req, 0, sizeof (*req));
-        } else {
-                LASSERT (ev->type == LNET_EVENT_PUT);
-                if (ev->status != 0) {
-                        /* We moaned above already... */
-                        return;
-                }
+       if (ev->unlinked) {
+               /* If this is the last request message to fit in the
+                * request buffer we can use the request object embedded in
+                * rqbd.  Note that if we failed to allocate a request,
+                * we'd have to re-post the rqbd, which we can't do in this
+                * context.
+                */
+               req = &rqbd->rqbd_req;
+               memset(req, 0, sizeof(*req));
+       } else {
+               LASSERT(ev->type == LNET_EVENT_PUT);
+               if (ev->status != 0) /* We moaned above already... */
+                       return;
                req = ptlrpc_request_cache_alloc(GFP_ATOMIC);
                 if (req == NULL) {
                         CERROR("Can't allocate incoming request descriptor: "
@@ -448,9 +456,9 @@ void server_bulk_callback(struct lnet_event *ev)
                (ptlrpc_is_bulk_get_sink(desc->bd_type) &&
                 ev->type == LNET_EVENT_REPLY));
 
-        CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
-               "event type %d, status %d, desc %p\n",
-               ev->type, ev->status, desc);
+       CDEBUG_LIMIT((ev->status == 0) ? D_NET : D_ERROR,
+                    "event type %d, status %d, desc %p\n",
+                    ev->type, ev->status, desc);
 
        spin_lock(&desc->bd_lock);