Whamcloud - gitweb
b=21486 Unlink truncated reply buffer
authorLiang Zhen <Zhen.Liang@Sun.COM>
Wed, 10 Feb 2010 13:30:31 +0000 (05:30 -0800)
committerRobert Read <rread@sun.com>
Wed, 10 Feb 2010 16:54:53 +0000 (08:54 -0800)
i=eric.mei
i=eeb

lustre/include/lustre_net.h
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c

index a1fd22e..c5f3b56 100644 (file)
@@ -388,8 +388,9 @@ struct ptlrpc_request {
         int rq_request_portal;  /* XXX FIXME bug 249 */
         int rq_reply_portal;    /* XXX FIXME bug 249 */
 
         int rq_request_portal;  /* XXX FIXME bug 249 */
         int rq_reply_portal;    /* XXX FIXME bug 249 */
 
-        int rq_nob_received; /* client-side # reply bytes actually received  */
-
+        int rq_nob_received; /* client-side:
+                              * !rq_truncate : # reply bytes actually received,
+                              *  rq_truncate : required repbuf_len for resend */
         int rq_reqlen;
         struct lustre_msg *rq_reqmsg;
 
         int rq_reqlen;
         struct lustre_msg *rq_reqmsg;
 
index a26f440..4d81fe6 100644 (file)
@@ -1016,14 +1016,26 @@ static int after_reply(struct ptlrpc_request *req)
         long timediff;
         ENTRY;
 
         long timediff;
         ENTRY;
 
-        LASSERT(!req->rq_receiving_reply);
-        LASSERT(obd);
-        LASSERT(req->rq_nob_received <= req->rq_repbuf_len);
+        LASSERT(obd != NULL);
+        /* repbuf must be unlinked */
+        LASSERT(!req->rq_receiving_reply && !req->rq_must_unlink);
+
+        if (req->rq_reply_truncate) {
+                if (req->rq_no_resend) {
+                        DEBUG_REQ(D_ERROR, req, "reply buffer overflow,"
+                                  " expected: %d, actual size: %d",
+                                  req->rq_nob_received, req->rq_repbuf_len);
+                        RETURN(-EOVERFLOW);
+                }
 
 
-        if (req->rq_reply_truncate && !req->rq_no_resend) {
-                req->rq_resend = 1;
                 sptlrpc_cli_free_repbuf(req);
                 sptlrpc_cli_free_repbuf(req);
-                req->rq_replen = req->rq_nob_received;
+                /* Pass the required reply buffer size (include
+                 * space for early reply).
+                 * NB: no need to roundup because alloc_repbuf
+                 * will roundup it */
+                req->rq_replen       = req->rq_nob_received;
+                req->rq_nob_received = 0;
+                req->rq_resend       = 1;
                 RETURN(0);
         }
 
                 RETURN(0);
         }
 
@@ -1031,7 +1043,6 @@ static int after_reply(struct ptlrpc_request *req)
          * NB Until this point, the whole of the incoming message,
          * including buflens, status etc is in the sender's byte order.
          */
          * NB Until this point, the whole of the incoming message,
          * including buflens, status etc is in the sender's byte order.
          */
-
         rc = sptlrpc_cli_unwrap_reply(req);
         if (rc) {
                 DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
         rc = sptlrpc_cli_unwrap_reply(req);
         if (rc) {
                 DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
@@ -1224,6 +1235,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                         cfs_list_entry(tmp, struct ptlrpc_request,
                                        rq_set_chain);
                 struct obd_import *imp = req->rq_import;
                         cfs_list_entry(tmp, struct ptlrpc_request,
                                        rq_set_chain);
                 struct obd_import *imp = req->rq_import;
+                int unregistered = 0;
                 int rc = 0;
 
                 if (req->rq_phase == RQ_PHASE_NEW &&
                 int rc = 0;
 
                 if (req->rq_phase == RQ_PHASE_NEW &&
@@ -1437,6 +1449,12 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 
                         cfs_spin_unlock(&req->rq_lock);
 
 
                         cfs_spin_unlock(&req->rq_lock);
 
+                        /* unlink from net because we are going to
+                         * swab in-place of reply buffer */
+                        unregistered = ptlrpc_unregister_reply(req, 1);
+                        if (!unregistered)
+                                continue;
+
                         req->rq_status = after_reply(req);
                         if (req->rq_resend)
                                 continue;
                         req->rq_status = after_reply(req);
                         if (req->rq_resend)
                                 continue;
@@ -1474,7 +1492,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
 
                 /* This moves to "unregistering" phase we need to wait for
                  * reply unlink. */
 
                 /* This moves to "unregistering" phase we need to wait for
                  * reply unlink. */
-                if (!ptlrpc_unregister_reply(req, 1))
+                if (!unregistered && !ptlrpc_unregister_reply(req, 1))
                         continue;
 
                 if (!ptlrpc_unregister_bulk(req, 1))
                         continue;
 
                 if (!ptlrpc_unregister_bulk(req, 1))
index 018dc62..73aa87b 100644 (file)
@@ -101,7 +101,7 @@ void reply_in_callback(lnet_event_t *ev)
 
         LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
         LASSERT (ev->md.start == req->rq_repbuf);
 
         LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
         LASSERT (ev->md.start == req->rq_repbuf);
-        LASSERT (ev->mlength <= req->rq_repbuf_len);
+        LASSERT (ev->offset + ev->mlength <= req->rq_repbuf_len);
         /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
            for adaptive timeouts' early reply. */
         LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
         /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
            for adaptive timeouts' early reply. */
         LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);