Whamcloud - gitweb
correctly shrink reply for avoid send too big message to client.
authorshadow <shadow>
Thu, 23 Jul 2009 13:19:46 +0000 (13:19 +0000)
committershadow <shadow>
Thu, 23 Jul 2009 13:19:46 +0000 (13:19 +0000)
Branch b1_8
b=20020
i=adilger
i=tappro

lustre/ChangeLog
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/ptlrpc/pack_generic.c

index 33adad7..5e60a0c 100644 (file)
@@ -15,6 +15,13 @@ tbd Sun Microsystems, Inc.
          more information, please refer to bugzilla 17630.
 
 Severity   : normal
+Frequency  : with 1.8 server and 1.6 clients
+Bugzilla   : 20020
+Descriptoin: correctly shrink reply for avoid send too big message to client.
+Details    : 1.8 mds is allocate to big buffer to LOV EA data and this produce
+             some problems with sending this reply to 1.6 client.
+
+Severity   : normal
 Bugzilla   : 19529
 Description: Avoid deadlock for local client writes
 Details    : Use new OBD_BRW_MEMALLOC flag to notify OST about writes in the
index 9466226..d9d06a5 100644 (file)
@@ -755,9 +755,7 @@ static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
 
-                lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
-                if (body->eadatasize)
-                        reply_off++;
+                reply_off++;
         } else if (S_ISLNK(inode->i_mode) &&
                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
@@ -861,7 +859,7 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
                 }
                 bufcount++;
         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
-                if (i_size_read(inode) + 1 != body->eadatasize)
+                if (i_size_read(inode) > body->eadatasize)
                         CERROR("symlink size: %Lu, reply space: %d\n",
                                i_size_read(inode) + 1, body->eadatasize);
                 size[bufcount] = min_t(int, i_size_read(inode) + 1,
@@ -920,42 +918,43 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
         struct lustre_handle parent_lockh;
         int namesize;
         int rc = 0, cleanup_phase = 0, resent_req = 0;
+        int rq_offset = offset;
         char *name;
         ENTRY;
 
         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
+        LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF);
+        /* if requests were at offset 2, the getattr reply goes back at 1 */
+        if (offset == DLM_INTENT_REC_OFF) {
+                rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
+                                     sizeof(*rep));
+                offset = DLM_REPLY_REC_OFF;
+        }
 
         /* Swab now, before anyone looks inside the request */
-        body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+        body = lustre_swab_reqbuf(req, rq_offset, sizeof(*body),
                                   lustre_swab_mds_body);
         if (body == NULL) {
                 CERROR("Can't swab mds_body\n");
-                RETURN(-EFAULT);
+                GOTO(cleanup_exit, rc = -EFAULT);
         }
 
-        lustre_set_req_swabbed(req, offset + 1);
-        name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
+        lustre_set_req_swabbed(req, rq_offset + 1);
+        name = lustre_msg_string(req->rq_reqmsg, rq_offset + 1, 0);
         if (name == NULL) {
                 CERROR("Can't unpack name\n");
-                RETURN(-EFAULT);
+                GOTO(cleanup_exit, rc = -EFAULT);
         }
-        namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
+        namesize = lustre_msg_buflen(req->rq_reqmsg, rq_offset + 1);
         /* namesize less than 2 means we have empty name, probably came from
            revalidate by cfid, so no point in having name to be set */
         if (namesize <= 1)
                 name = NULL;
 
-        rc = mds_init_ucred(&uc, req, offset);
+        rc = mds_init_ucred(&uc, req, rq_offset);
         if (rc)
                 GOTO(cleanup, rc);
 
-        LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF);
-        /* if requests were at offset 2, the getattr reply goes back at 1 */
-        if (offset == DLM_INTENT_REC_OFF) {
-                rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
-                                     sizeof(*rep));
-                offset = DLM_REPLY_REC_OFF;
-        }
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
         cleanup_phase = 1; /* kernel context */
@@ -1084,6 +1083,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                         req->rq_status = rc;
                 }
         }
+cleanup_exit:
         return rc;
 }
 
@@ -1103,7 +1103,7 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
                                   lustre_swab_mds_body);
         if (body == NULL)
-                RETURN(-EFAULT);
+                GOTO(cleanup_exit, rc = -EFAULT);
 
         rc = mds_init_ucred(&uc, req, offset);
         if (rc)
@@ -1119,11 +1119,11 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
         if (rc != 0) {
                 CERROR("mds_getattr_pack_msg: %d\n", rc);
-                GOTO(out_pop, rc);
+                GOTO(out_dput, rc);
         }
 
         req->rq_status = mds_getattr_internal(obd, de, req, body,REPLY_REC_OFF);
-
+out_dput:
         l_dput(de);
         GOTO(out_pop, rc);
 out_pop:
@@ -1136,6 +1136,9 @@ out_ucred:
                 req->rq_status = rc;
         }
         mds_exit_ucred(&uc, mds);
+
+cleanup_exit:
+        mds_body_shrink_reply(req, offset, REPLY_REC_OFF);
         return rc;
 }
 
@@ -1647,6 +1650,7 @@ int mds_handle(struct ptlrpc_request *req)
                  */
                 rc = mds_getattr_lock(req, REQ_REC_OFF, MDS_INODELOCK_UPDATE,
                                       &lockh);
+                mds_body_shrink_reply(req, REQ_REC_OFF, REPLY_REC_OFF);
                 /* this non-intent call (from an ioctl) is special */
                 req->rq_status = rc;
                 if (rc == 0 && lustre_handle_is_used(&lockh))
@@ -1744,6 +1748,7 @@ int mds_handle(struct ptlrpc_request *req)
                         break;
 
                 rc = mds_reint(req, REQ_REC_OFF, NULL);
+                mds_intent_shrink_reply(req, opc, REPLY_REC_OFF);
                 fail = OBD_FAIL_MDS_REINT_NET_REP;
                 break;
         }
@@ -1752,6 +1757,7 @@ int mds_handle(struct ptlrpc_request *req)
                 DEBUG_REQ(D_INODE, req, "close");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
                 rc = mds_close(req, REQ_REC_OFF);
+                mds_body_shrink_reply(req, REQ_REC_OFF, REPLY_REC_OFF);
                 fail = OBD_FAIL_MDS_CLOSE_NET_REP;
                 break;
 
@@ -2473,6 +2479,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
                  * packet is following */
                 rep->lock_policy_res2 = mds_reint(req, DLM_INTENT_REC_OFF,
                                                   &lockh);
+                mds_intent_shrink_reply(req, REINT_OPEN, DLM_REPLY_REC_OFF);
 #if 0
                 /* We abort the lock if the lookup was negative and
                  * we did not make it to the OPEN portion */
@@ -2519,6 +2526,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
 
                 rep->lock_policy_res2 = mds_getattr_lock(req,DLM_INTENT_REC_OFF,
                                                          getattr_part, &lockh);
+                mds_body_shrink_reply(req, DLM_INTENT_REC_OFF, DLM_REPLY_REC_OFF);
                 /* FIXME: LDLM can set req->rq_status. MDS sets
                    policy_res{1,2} with disposition and status.
                    - replay: returns 0 & req->status is old status
index 5da0d41..a0fd132 100644 (file)
@@ -198,8 +198,10 @@ struct dentry *mds_lookup(struct obd_device *obd,
                           struct dentry *dparent,
                           int fid_namelen);
 
-void mds_shrink_reply(struct obd_device *obd, struct ptlrpc_request *req,
-                      struct mds_body *body, int md_off);
+void mds_body_shrink_reply(struct ptlrpc_request *req,
+                      int req_mdoff, int reply_mdoff);
+void mds_intent_shrink_reply(struct ptlrpc_request *req,
+                      int opc, int reply_mdoff);
 int mds_get_cookie_size(struct obd_device *obd, struct lov_mds_md *lmm);
 int mds_version_get_check(struct ptlrpc_request *, struct inode *, int);
 /* mds/mds_lib.c */
index f626d0a..737e8f4 100644 (file)
@@ -1636,6 +1636,16 @@ int mds_close(struct ptlrpc_request *req, int offset)
         int cookies_size;
         ENTRY;
 
+        body = lustre_swab_reqbuf(req, offset, sizeof(*body),
+                                  lustre_swab_mds_body);
+        if (body == NULL) {
+                CERROR("Can't unpack body\n");
+                req->rq_status = -EFAULT;
+                GOTO(cleanup, rc = -EFAULT);
+        }
+        /*XXX need indicase - close is need to return LOV EA */
+        body->valid |= OBD_MD_FLEASIZE;
+
         rc = lustre_pack_reply(req, 4, repsize, NULL);
         if (rc)
                 req->rq_status = rc;
@@ -1648,14 +1658,6 @@ int mds_close(struct ptlrpc_request *req, int offset)
                obd->u.mds.mds_max_mdsize, obd->u.mds.mds_max_cookiesize);
         mds_counter_incr(req->rq_export, LPROC_MDS_CLOSE);
 
-        body = lustre_swab_reqbuf(req, offset, sizeof(*body),
-                                  lustre_swab_mds_body);
-        if (body == NULL) {
-                CERROR("Can't unpack body\n");
-                req->rq_status = -EFAULT;
-                RETURN(-EFAULT);
-        }
-
         if (body->flags & MDS_BFLAG_UNCOMMITTED_WRITES)
                 /* do some stuff */ ;
 
@@ -1666,7 +1668,7 @@ int mds_close(struct ptlrpc_request *req, int offset)
                 DEBUG_REQ(D_ERROR, req, "no handle for file close ino "LPD64
                           ": cookie "LPX64, body->fid1.id, body->handle.cookie);
                 req->rq_status = -ESTALE;
-                RETURN(-ESTALE);
+                GOTO(cleanup, rc = -ESTALE);
         }
         /* Remove mfd handle so it can't be found again.  We consume mfd_list
          * reference here, but still have mds_handle2mfd ref until mfd_close. */
@@ -1699,7 +1701,7 @@ int mds_close(struct ptlrpc_request *req, int offset)
                                        &reply_body->valid);
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
-        mds_shrink_reply(obd, req, body, REPLY_REC_OFF + 1);
+cleanup:
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
                 CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
                 req->rq_status = -ENOMEM;
index 2198e7f..aed103b 100644 (file)
@@ -1787,30 +1787,91 @@ out_dput:
 int mds_get_cookie_size(struct obd_device *obd, struct lov_mds_md *lmm)
 {
         int count = le32_to_cpu(lmm->lmm_stripe_count);
-        int real_csize = count * sizeof(struct llog_cookie); 
+        int real_csize = count * sizeof(struct llog_cookie);
         return real_csize;
 }
 
-void mds_shrink_reply(struct obd_device *obd, struct ptlrpc_request *req,
-                      struct mds_body *body, int md_off)
+void mds_body_shrink_reply(struct ptlrpc_request *req,
+                      int req_mdoff, int reply_mdoff)
 {
-        int cookie_size = 0, md_size = 0;
+        struct obd_device *obd = req->rq_export->exp_obd;
+        struct mds_body *rq_body;
+        struct mds_body *reply_body;
+        int cookie_size = 0, md_size = -1;
+
+        rq_body =  lustre_msg_buf(req->rq_reqmsg, req_mdoff,
+                                  sizeof(*rq_body));
+
+        LASSERT(rq_body);
+
+        /* LSM and cookie is always placed after mds_body */
+        reply_body =  lustre_msg_buf(req->rq_repmsg, reply_mdoff,
+                                     sizeof(*reply_body));
+        reply_mdoff++;
+
+        if (rq_body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) {
+                md_size = 0;
+                if (reply_body &&
+                    reply_body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA))
+                        md_size = reply_body->eadatasize;
+                lustre_shrink_reply(req, reply_mdoff, md_size, 1);
+        }
 
-        if (body && body->valid & OBD_MD_FLEASIZE) {
-                md_size = body->eadatasize;
+        if (rq_body->valid & OBD_MD_LINKNAME) {
+                md_size = rq_body->eadatasize;
+                lustre_shrink_reply(req, reply_mdoff, md_size, 1);
         }
-        if (body && body->valid & OBD_MD_FLCOOKIE) {
-                LASSERT(body->valid & OBD_MD_FLEASIZE);
+
+
+        if (reply_body != NULL) { 
+           if (reply_body->valid & OBD_MD_FLCOOKIE) {
+                LASSERT(reply_body->valid & OBD_MD_FLEASIZE);
                 cookie_size = mds_get_cookie_size(obd, lustre_msg_buf(
-                                                  req->rq_repmsg, md_off, 0));
+                                                  req->rq_repmsg, reply_mdoff, 0));
+           } else if (reply_body->valid & OBD_MD_FLACL) {
+                cookie_size = reply_body->aclsize;
+           }
         }
+        lustre_shrink_reply(req, reply_mdoff + (md_size > 0), cookie_size, 1);
 
         CDEBUG(D_INFO, "Shrink to md_size %d cookie_size %d \n", md_size,
                cookie_size);
+}
 
-        lustre_shrink_reply(req, md_off, md_size, 1);
+void mds_intent_shrink_reply(struct ptlrpc_request *req,
+                      int opc, int reply_mdoff)
+{
+        struct obd_device *obd = req->rq_export->exp_obd;
+        struct mds_body *reply_body;
+        int cookie_size = 0, md_size = 0;
+
+        if (opc == REINT_UNLINK || opc == REINT_RENAME ||
+            opc == REINT_OPEN) {
+
+                /* LSM and cookie is always placed after mds_body */
+                reply_body =  lustre_msg_buf(req->rq_repmsg, reply_mdoff,
+                                             sizeof(*reply_body));
+                reply_mdoff++;
 
-        lustre_shrink_reply(req, md_off + (md_size > 0), cookie_size, 1); 
+                if (reply_body &&
+                    reply_body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA))
+                        md_size = reply_body->eadatasize;
+
+                lustre_shrink_reply(req, reply_mdoff, md_size, 1);
+
+                if (reply_body && reply_body->valid & OBD_MD_FLCOOKIE) {
+                        LASSERT(reply_body->valid & OBD_MD_FLEASIZE);
+                        cookie_size = mds_get_cookie_size(obd, lustre_msg_buf(
+                                                          req->rq_repmsg,
+                                                          reply_mdoff, 0));
+                }
+
+                lustre_shrink_reply(req, reply_mdoff + (md_size > 0),
+                                    cookie_size, 1);
+
+                CDEBUG(D_INFO, "Shrink to md_size %d cookie_size %d \n", md_size,
+                       cookie_size);
+        }
 }
 
 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
@@ -2072,8 +2133,6 @@ cleanup_no_trans:
         }
         req->rq_status = rc;
 
-        mds_shrink_reply(obd, req, body, offset + 1);
-
         /* trigger dqrel on the owner of child and parent */
         lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
                       FSFILT_OP_UNLINK);
@@ -2739,8 +2798,6 @@ cleanup_no_trans:
         }
         req->rq_status = rc;
 
-        mds_shrink_reply(obd, req, body, offset + 1);
-
         /* acquire/release qunit */
         lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
                       FSFILT_OP_RENAME);
index 26d8104..bfff38d 100644 (file)
@@ -613,7 +613,7 @@ void lustre_shrink_reply_v1(struct ptlrpc_request *req, int segment,
         LASSERT(req->rq_reply_state);
         LASSERT(msg);
         LASSERT(segment >= 0);
-        LASSERT(msg->lm_bufcount > segment);
+        LASSERT(msg->lm_bufcount >= segment);
         LASSERT(msg->lm_buflens[segment] >= newlen);
 
         if (msg->lm_buflens[segment] == newlen)
@@ -652,7 +652,8 @@ void lustre_shrink_reply_v2(struct ptlrpc_request *req, int segment,
 
         LASSERT(req->rq_reply_state);
         LASSERT(msg);
-        LASSERT(msg->lm_bufcount > segment);
+        LASSERTF(msg->lm_bufcount >= segment, "message have %d - requested %d\n",
+                 msg->lm_bufcount,segment);
         LASSERT(msg->lm_buflens[segment] >= newlen);
 
         if (msg->lm_buflens[segment] == newlen)