Whamcloud - gitweb
LU-1431 ptlrpc: Support for over 1MB bulk I/O RPC
[fs/lustre-release.git] / lustre / ptlrpc / events.c
index e2f098e..0dfe3a1 100644 (file)
@@ -201,28 +201,27 @@ void client_bulk_callback (lnet_event_t *ev)
                ev->type, ev->status, desc);
 
        spin_lock(&desc->bd_lock);
-        req = desc->bd_req;
-        LASSERT(desc->bd_network_rw);
-        desc->bd_network_rw = 0;
-
-        if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
-                desc->bd_success = 1;
-                desc->bd_nob_transferred = ev->mlength;
-                desc->bd_sender = ev->sender;
-        } else {
-                /* start reconnect and resend if network error hit */
+       req = desc->bd_req;
+       LASSERT(desc->bd_md_count > 0);
+       desc->bd_md_count--;
+
+       if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
+               desc->bd_nob_transferred += ev->mlength;
+               desc->bd_sender = ev->sender;
+       } else {
+               /* start reconnect and resend if network error hit */
                spin_lock(&req->rq_lock);
                req->rq_net_err = 1;
                spin_unlock(&req->rq_lock);
-        }
+       }
 
-        /* release the encrypted pages for write */
-        if (desc->bd_req->rq_bulk_write)
-                sptlrpc_enc_pool_put_pages(desc);
+       if (ev->status != 0)
+               desc->bd_failure = 1;
 
-        /* NB don't unlock till after wakeup; desc can disappear under us
-         * otherwise */
-        ptlrpc_client_wake_req(req);
+       /* NB don't unlock till after wakeup; desc can disappear under us
+        * otherwise */
+       if (desc->bd_md_count == 0)
+               ptlrpc_client_wake_req(desc->bd_req);
 
        spin_unlock(&desc->bd_lock);
        EXIT;
@@ -435,16 +434,16 @@ void reply_out_callback(lnet_event_t *ev)
  */
 void server_bulk_callback (lnet_event_t *ev)
 {
-        struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
-        struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
-        ENTRY;
+       struct ptlrpc_cb_id     *cbid = ev->md.user_ptr;
+       struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
+       ENTRY;
 
-        LASSERT (ev->type == LNET_EVENT_SEND ||
-                 ev->type == LNET_EVENT_UNLINK ||
-                 (desc->bd_type == BULK_PUT_SOURCE &&
-                  ev->type == LNET_EVENT_ACK) ||
-                 (desc->bd_type == BULK_GET_SINK &&
-                  ev->type == LNET_EVENT_REPLY));
+       LASSERT(ev->type == LNET_EVENT_SEND ||
+               ev->type == LNET_EVENT_UNLINK ||
+               (desc->bd_type == BULK_PUT_SOURCE &&
+                ev->type == LNET_EVENT_ACK) ||
+               (desc->bd_type == BULK_GET_SINK &&
+                ev->type == LNET_EVENT_REPLY));
 
         CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
                "event type %d, status %d, desc %p\n",
@@ -452,22 +451,27 @@ void server_bulk_callback (lnet_event_t *ev)
 
        spin_lock(&desc->bd_lock);
 
-        if ((ev->type == LNET_EVENT_ACK ||
-             ev->type == LNET_EVENT_REPLY) &&
-            ev->status == 0) {
-                /* We heard back from the peer, so even if we get this
-                 * before the SENT event (oh yes we can), we know we
-                 * read/wrote the peer buffer and how much... */
-                desc->bd_success = 1;
-                desc->bd_nob_transferred = ev->mlength;
-                desc->bd_sender = ev->sender;
-        }
+       LASSERT(desc->bd_md_count > 0);
 
-        if (ev->unlinked) {
-                /* This is the last callback no matter what... */
-                desc->bd_network_rw = 0;
-                cfs_waitq_signal(&desc->bd_waitq);
-        }
+       if ((ev->type == LNET_EVENT_ACK ||
+            ev->type == LNET_EVENT_REPLY) &&
+           ev->status == 0) {
+               /* We heard back from the peer, so even if we get this
+                * before the SENT event (oh yes we can), we know we
+                * read/wrote the peer buffer and how much... */
+               desc->bd_nob_transferred += ev->mlength;
+               desc->bd_sender = ev->sender;
+       }
+
+       if (ev->status != 0)
+               desc->bd_failure = 1;
+
+       if (ev->unlinked) {
+               desc->bd_md_count--;
+               /* This is the last callback no matter what... */
+               if (desc->bd_md_count == 0)
+                       cfs_waitq_signal(&desc->bd_waitq);
+       }
 
        spin_unlock(&desc->bd_lock);
        EXIT;