Whamcloud - gitweb
correctly handle too big reply message.
authorshadow <shadow>
Thu, 1 Oct 2009 06:18:00 +0000 (06:18 +0000)
committershadow <shadow>
Thu, 1 Oct 2009 06:18:00 +0000 (06:18 +0000)
Branch HEAD
b=19526
i=ericm
i=rread

lustre/ChangeLog
lustre/include/lustre_mdc.h
lustre/include/lustre_net.h
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c

index a802682..5b92f92 100644 (file)
@@ -14,6 +14,12 @@ tbd  Sun Microsystems, Inc.
         removed cwd "./" (refer to Bugzilla 14399).
        * File join has been disabled in this release, refer to Bugzilla 16929.
 
+Severity   : enhancement
+Bugzilla   : 19526
+Description: correctly handle big reply message.
+Details    : send LNet event if reply is bigger then buffer and adjust this buffer
+             correctly.
+
 Severity   : normal
 Bugzilla   : 19917
 Description: Drop unnecessary __GFP_NOMEMALLOC flag from filter_get_page()
index 1bfa2a8..81abda6 100644 (file)
@@ -103,7 +103,22 @@ static inline void mdc_put_rpc_lock(struct mdc_rpc_lock *lck,
         EXIT;
 }
 
-struct mdc_cache_waiter {       
+static inline void mdc_update_max_ea_from_body(struct obd_export *exp,
+                                               struct mdt_body *body)
+{
+        if (body->valid & OBD_MD_FLMODEASIZE) {
+                if (exp->exp_obd->u.cli.cl_max_mds_easize < body->max_mdsize)
+                        exp->exp_obd->u.cli.cl_max_mds_easize =
+                                                body->max_mdsize;
+                if (exp->exp_obd->u.cli.cl_max_mds_cookiesize <
+                                                body->max_cookiesize)
+                        exp->exp_obd->u.cli.cl_max_mds_cookiesize =
+                                                body->max_cookiesize;
+        }
+}
+
+
+struct mdc_cache_waiter {
         struct list_head        mcw_entry;
         cfs_waitq_t             mcw_waitq;
 };
index 426f552..b739289 100644 (file)
@@ -368,7 +368,8 @@ struct ptlrpc_request {
                 /* server-side flags */
                 rq_packed_final:1,  /* packed final reply */
                 rq_hp:1,            /* high priority RPC */
-                rq_at_linked:1;     /* link into service's srv_at_array */
+                rq_at_linked:1,     /* link into service's srv_at_array */
+                rq_reply_truncate:1;
 
         enum rq_phase rq_phase; /* one of RQ_PHASE_* */
         enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
index 1da0c78..e7d8b7c 100644 (file)
@@ -530,6 +530,8 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
                         void *eadata;
 
+                         mdc_update_max_ea_from_body(exp, body);
+
                         /*
                          * The eadata is opaque; just check that it is there.
                          * Eventually, obd_unpackmd() will check the contents.
index 4af3f30..c4de6cf 100644 (file)
@@ -175,6 +175,8 @@ static int mdc_getattr_common(struct obd_export *exp,
         CDEBUG(D_NET, "mode: %o\n", body->mode);
 
         if (body->eadatasize != 0) {
+                mdc_update_max_ea_from_body(exp, body);
+
                 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
                                                       body->eadatasize);
                 if (eadata == NULL)
index fbc1a00..2dbdcf0 100644 (file)
@@ -1064,6 +1064,13 @@ static int after_reply(struct ptlrpc_request *req)
         LASSERT(obd);
         LASSERT(req->rq_nob_received <= req->rq_repbuf_len);
 
+        if (req->rq_reply_truncate && !req->rq_no_resend) {
+                req->rq_resend = 1;
+                sptlrpc_cli_free_repbuf(req);
+                req->rq_replen = req->rq_nob_received;
+                RETURN(0);
+        }
+
         /*
          * NB Until this point, the whole of the incoming message,
          * including buflens, status etc is in the sender's byte order.
@@ -2241,16 +2248,14 @@ restart:
 
                 if (req->rq_err) {
                         /* rq_status was set locally */
-                        rc = -EIO;
+                        rc = req->rq_status ? req->rq_status : -EIO;
                 }
                 else if (req->rq_intr) {
                         rc = -EINTR;
                 }
                 else if (req->rq_no_resend) {
-                        spin_unlock(&imp->imp_lock);
-                        GOTO(out, rc = -ETIMEDOUT);
-                }
-                else {
+                        rc = -ETIMEDOUT;
+                } else {
                         GOTO(restart, rc);
                 }
         }
@@ -2340,6 +2345,7 @@ after_send:
         if (req->rq_err) {
                 DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
                           rc, req->rq_status);
+                rc = rc ? rc : req->rq_status;
                 GOTO(out, rc = rc ? rc : -EIO);
         }
 
index 1098682..27f0fec 100644 (file)
@@ -115,12 +115,23 @@ void reply_in_callback(lnet_event_t *ev)
 
         if (ev->status)
                 goto out_wake;
+
         if (ev->type == LNET_EVENT_UNLINK) {
                 LASSERT(ev->unlinked);
                 DEBUG_REQ(D_RPCTRACE, req, "unlink");
                 goto out_wake;
         }
 
+        if (ev->mlength < ev->rlength ) {
+                CDEBUG(D_RPCTRACE, "truncate req %p rpc %d - %d+%d\n", req,
+                       req->rq_replen, ev->rlength, ev->offset);
+                req->rq_reply_truncate = 1;
+                req->rq_replied = 1;
+                req->rq_status = -EOVERFLOW;
+                req->rq_nob_received = ev->rlength + ev->offset;
+                goto out_wake;
+        }
+
         if ((ev->offset == 0) &&
             ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT))) {
                 /* Early reply */
index f6d1d05..b1a4f31 100644 (file)
@@ -586,6 +586,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         request->rq_net_err = 0;
         request->rq_resend = 0;
         request->rq_restart = 0;
+        request->rq_reply_truncate = 0;
         spin_unlock(&request->rq_lock);
 
         if (!noreply) {
@@ -595,7 +596,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                 reply_md.threshold = LNET_MD_THRESH_INF;
                 /* Manage remote for early replies */
                 reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
-                        LNET_MD_MANAGE_REMOTE;
+                        LNET_MD_MANAGE_REMOTE |
+                        LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */;
                 reply_md.user_ptr  = &request->rq_reply_cbid;
                 reply_md.eq_handle = ptlrpc_eq_h;