From: shadow Date: Thu, 23 Jul 2009 13:19:46 +0000 (+0000) Subject: correctly shrink reply for avoid send too big message to client. X-Git-Tag: v1_8_2_01~1^2~221 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=ee210c3b35a50608d8df9f8e88d5ae27a094639e;p=fs%2Flustre-release.git correctly shrink reply for avoid send too big message to client. Branch b1_8 b=20020 i=adilger i=tappro --- diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 33adad7..5e60a0c 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -15,6 +15,13 @@ tbd Sun Microsystems, Inc. more information, please refer to bugzilla 17630. Severity : normal +Frequency : with 1.8 server and 1.6 clients +Bugzilla : 20020 +Descriptoin: correctly shrink reply for avoid send too big message to client. +Details : 1.8 mds is allocate to big buffer to LOV EA data and this produce + some problems with sending this reply to 1.6 client. + +Severity : normal Bugzilla : 19529 Description: Avoid deadlock for local client writes Details : Use new OBD_BRW_MEMALLOC flag to notify OST about writes in the diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 94662267..d9d06a5 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -755,9 +755,7 @@ static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry, body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME | OBD_MD_FLMTIME); - lustre_shrink_reply(req, reply_off, body->eadatasize, 0); - if (body->eadatasize) - reply_off++; + reply_off++; } else if (S_ISLNK(inode->i_mode) && (reqbody->valid & OBD_MD_LINKNAME) != 0) { char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0); @@ -861,7 +859,7 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, } bufcount++; } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) { - if (i_size_read(inode) + 1 != body->eadatasize) + if (i_size_read(inode) > body->eadatasize) CERROR("symlink size: %Lu, reply space: %d\n", i_size_read(inode) + 1, body->eadatasize); size[bufcount] = min_t(int, i_size_read(inode) + 1, @@ -920,42 +918,43 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset, struct lustre_handle parent_lockh; int namesize; int rc = 0, cleanup_phase = 0, resent_req = 0; + int rq_offset = offset; char *name; ENTRY; LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)); + LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF); + /* if requests were at offset 2, the getattr reply goes back at 1 */ + if (offset == DLM_INTENT_REC_OFF) { + rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, + sizeof(*rep)); + offset = DLM_REPLY_REC_OFF; + } /* Swab now, before anyone looks inside the request */ - body = lustre_swab_reqbuf(req, offset, sizeof(*body), + body = lustre_swab_reqbuf(req, rq_offset, sizeof(*body), lustre_swab_mds_body); if (body == NULL) { CERROR("Can't swab mds_body\n"); - RETURN(-EFAULT); + GOTO(cleanup_exit, rc = -EFAULT); } - lustre_set_req_swabbed(req, offset + 1); - name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0); + lustre_set_req_swabbed(req, rq_offset + 1); + name = lustre_msg_string(req->rq_reqmsg, rq_offset + 1, 0); if (name == NULL) { CERROR("Can't unpack name\n"); - RETURN(-EFAULT); + GOTO(cleanup_exit, rc = -EFAULT); } - namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1); + namesize = lustre_msg_buflen(req->rq_reqmsg, rq_offset + 1); /* namesize less than 2 means we have empty name, probably came from revalidate by cfid, so no point in having name to be set */ if (namesize <= 1) name = NULL; - rc = mds_init_ucred(&uc, req, offset); + rc = mds_init_ucred(&uc, req, rq_offset); if (rc) GOTO(cleanup, rc); - LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF); - /* if requests were at offset 2, the getattr reply goes back at 1 */ - if (offset == DLM_INTENT_REC_OFF) { - rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, - sizeof(*rep)); - offset = DLM_REPLY_REC_OFF; - } push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); cleanup_phase = 1; /* kernel context */ @@ -1084,6 +1083,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset, req->rq_status = rc; } } +cleanup_exit: return rc; } @@ -1103,7 +1103,7 @@ static int mds_getattr(struct ptlrpc_request *req, int offset) body = lustre_swab_reqbuf(req, offset, sizeof(*body), lustre_swab_mds_body); if (body == NULL) - RETURN(-EFAULT); + GOTO(cleanup_exit, rc = -EFAULT); rc = mds_init_ucred(&uc, req, offset); if (rc) @@ -1119,11 +1119,11 @@ static int mds_getattr(struct ptlrpc_request *req, int offset) rc = mds_getattr_pack_msg(req, de->d_inode, offset); if (rc != 0) { CERROR("mds_getattr_pack_msg: %d\n", rc); - GOTO(out_pop, rc); + GOTO(out_dput, rc); } req->rq_status = mds_getattr_internal(obd, de, req, body,REPLY_REC_OFF); - +out_dput: l_dput(de); GOTO(out_pop, rc); out_pop: @@ -1136,6 +1136,9 @@ out_ucred: req->rq_status = rc; } mds_exit_ucred(&uc, mds); + +cleanup_exit: + mds_body_shrink_reply(req, offset, REPLY_REC_OFF); return rc; } @@ -1647,6 +1650,7 @@ int mds_handle(struct ptlrpc_request *req) */ rc = mds_getattr_lock(req, REQ_REC_OFF, MDS_INODELOCK_UPDATE, &lockh); + mds_body_shrink_reply(req, REQ_REC_OFF, REPLY_REC_OFF); /* this non-intent call (from an ioctl) is special */ req->rq_status = rc; if (rc == 0 && lustre_handle_is_used(&lockh)) @@ -1744,6 +1748,7 @@ int mds_handle(struct ptlrpc_request *req) break; rc = mds_reint(req, REQ_REC_OFF, NULL); + mds_intent_shrink_reply(req, opc, REPLY_REC_OFF); fail = OBD_FAIL_MDS_REINT_NET_REP; break; } @@ -1752,6 +1757,7 @@ int mds_handle(struct ptlrpc_request *req) DEBUG_REQ(D_INODE, req, "close"); OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0); rc = mds_close(req, REQ_REC_OFF); + mds_body_shrink_reply(req, REQ_REC_OFF, REPLY_REC_OFF); fail = OBD_FAIL_MDS_CLOSE_NET_REP; break; @@ -2473,6 +2479,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, * packet is following */ rep->lock_policy_res2 = mds_reint(req, DLM_INTENT_REC_OFF, &lockh); + mds_intent_shrink_reply(req, REINT_OPEN, DLM_REPLY_REC_OFF); #if 0 /* We abort the lock if the lookup was negative and * we did not make it to the OPEN portion */ @@ -2519,6 +2526,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns, rep->lock_policy_res2 = mds_getattr_lock(req,DLM_INTENT_REC_OFF, getattr_part, &lockh); + mds_body_shrink_reply(req, DLM_INTENT_REC_OFF, DLM_REPLY_REC_OFF); /* FIXME: LDLM can set req->rq_status. MDS sets policy_res{1,2} with disposition and status. - replay: returns 0 & req->status is old status diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index 5da0d41..a0fd132 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -198,8 +198,10 @@ struct dentry *mds_lookup(struct obd_device *obd, struct dentry *dparent, int fid_namelen); -void mds_shrink_reply(struct obd_device *obd, struct ptlrpc_request *req, - struct mds_body *body, int md_off); +void mds_body_shrink_reply(struct ptlrpc_request *req, + int req_mdoff, int reply_mdoff); +void mds_intent_shrink_reply(struct ptlrpc_request *req, + int opc, int reply_mdoff); int mds_get_cookie_size(struct obd_device *obd, struct lov_mds_md *lmm); int mds_version_get_check(struct ptlrpc_request *, struct inode *, int); /* mds/mds_lib.c */ diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index f626d0a..737e8f4 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -1636,6 +1636,16 @@ int mds_close(struct ptlrpc_request *req, int offset) int cookies_size; ENTRY; + body = lustre_swab_reqbuf(req, offset, sizeof(*body), + lustre_swab_mds_body); + if (body == NULL) { + CERROR("Can't unpack body\n"); + req->rq_status = -EFAULT; + GOTO(cleanup, rc = -EFAULT); + } + /*XXX need indicase - close is need to return LOV EA */ + body->valid |= OBD_MD_FLEASIZE; + rc = lustre_pack_reply(req, 4, repsize, NULL); if (rc) req->rq_status = rc; @@ -1648,14 +1658,6 @@ int mds_close(struct ptlrpc_request *req, int offset) obd->u.mds.mds_max_mdsize, obd->u.mds.mds_max_cookiesize); mds_counter_incr(req->rq_export, LPROC_MDS_CLOSE); - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR("Can't unpack body\n"); - req->rq_status = -EFAULT; - RETURN(-EFAULT); - } - if (body->flags & MDS_BFLAG_UNCOMMITTED_WRITES) /* do some stuff */ ; @@ -1666,7 +1668,7 @@ int mds_close(struct ptlrpc_request *req, int offset) DEBUG_REQ(D_ERROR, req, "no handle for file close ino "LPD64 ": cookie "LPX64, body->fid1.id, body->handle.cookie); req->rq_status = -ESTALE; - RETURN(-ESTALE); + GOTO(cleanup, rc = -ESTALE); } /* Remove mfd handle so it can't be found again. We consume mfd_list * reference here, but still have mds_handle2mfd ref until mfd_close. */ @@ -1699,7 +1701,7 @@ int mds_close(struct ptlrpc_request *req, int offset) &reply_body->valid); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - mds_shrink_reply(obd, req, body, REPLY_REC_OFF + 1); +cleanup: if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) { CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n"); req->rq_status = -ENOMEM; diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 2198e7f..aed103b 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -1787,30 +1787,91 @@ out_dput: int mds_get_cookie_size(struct obd_device *obd, struct lov_mds_md *lmm) { int count = le32_to_cpu(lmm->lmm_stripe_count); - int real_csize = count * sizeof(struct llog_cookie); + int real_csize = count * sizeof(struct llog_cookie); return real_csize; } -void mds_shrink_reply(struct obd_device *obd, struct ptlrpc_request *req, - struct mds_body *body, int md_off) +void mds_body_shrink_reply(struct ptlrpc_request *req, + int req_mdoff, int reply_mdoff) { - int cookie_size = 0, md_size = 0; + struct obd_device *obd = req->rq_export->exp_obd; + struct mds_body *rq_body; + struct mds_body *reply_body; + int cookie_size = 0, md_size = -1; + + rq_body = lustre_msg_buf(req->rq_reqmsg, req_mdoff, + sizeof(*rq_body)); + + LASSERT(rq_body); + + /* LSM and cookie is always placed after mds_body */ + reply_body = lustre_msg_buf(req->rq_repmsg, reply_mdoff, + sizeof(*reply_body)); + reply_mdoff++; + + if (rq_body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) { + md_size = 0; + if (reply_body && + reply_body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) + md_size = reply_body->eadatasize; + lustre_shrink_reply(req, reply_mdoff, md_size, 1); + } - if (body && body->valid & OBD_MD_FLEASIZE) { - md_size = body->eadatasize; + if (rq_body->valid & OBD_MD_LINKNAME) { + md_size = rq_body->eadatasize; + lustre_shrink_reply(req, reply_mdoff, md_size, 1); } - if (body && body->valid & OBD_MD_FLCOOKIE) { - LASSERT(body->valid & OBD_MD_FLEASIZE); + + + if (reply_body != NULL) { + if (reply_body->valid & OBD_MD_FLCOOKIE) { + LASSERT(reply_body->valid & OBD_MD_FLEASIZE); cookie_size = mds_get_cookie_size(obd, lustre_msg_buf( - req->rq_repmsg, md_off, 0)); + req->rq_repmsg, reply_mdoff, 0)); + } else if (reply_body->valid & OBD_MD_FLACL) { + cookie_size = reply_body->aclsize; + } } + lustre_shrink_reply(req, reply_mdoff + (md_size > 0), cookie_size, 1); CDEBUG(D_INFO, "Shrink to md_size %d cookie_size %d \n", md_size, cookie_size); +} - lustre_shrink_reply(req, md_off, md_size, 1); +void mds_intent_shrink_reply(struct ptlrpc_request *req, + int opc, int reply_mdoff) +{ + struct obd_device *obd = req->rq_export->exp_obd; + struct mds_body *reply_body; + int cookie_size = 0, md_size = 0; + + if (opc == REINT_UNLINK || opc == REINT_RENAME || + opc == REINT_OPEN) { + + /* LSM and cookie is always placed after mds_body */ + reply_body = lustre_msg_buf(req->rq_repmsg, reply_mdoff, + sizeof(*reply_body)); + reply_mdoff++; - lustre_shrink_reply(req, md_off + (md_size > 0), cookie_size, 1); + if (reply_body && + reply_body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) + md_size = reply_body->eadatasize; + + lustre_shrink_reply(req, reply_mdoff, md_size, 1); + + if (reply_body && reply_body->valid & OBD_MD_FLCOOKIE) { + LASSERT(reply_body->valid & OBD_MD_FLEASIZE); + cookie_size = mds_get_cookie_size(obd, lustre_msg_buf( + req->rq_repmsg, + reply_mdoff, 0)); + } + + lustre_shrink_reply(req, reply_mdoff + (md_size > 0), + cookie_size, 1); + + CDEBUG(D_INFO, "Shrink to md_size %d cookie_size %d \n", md_size, + cookie_size); + } } static int mds_reint_unlink(struct mds_update_record *rec, int offset, @@ -2072,8 +2133,6 @@ cleanup_no_trans: } req->rq_status = rc; - mds_shrink_reply(obd, req, body, offset + 1); - /* trigger dqrel on the owner of child and parent */ lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, FSFILT_OP_UNLINK); @@ -2739,8 +2798,6 @@ cleanup_no_trans: } req->rq_status = rc; - mds_shrink_reply(obd, req, body, offset + 1); - /* acquire/release qunit */ lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, FSFILT_OP_RENAME); diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 26d8104..bfff38d 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -613,7 +613,7 @@ void lustre_shrink_reply_v1(struct ptlrpc_request *req, int segment, LASSERT(req->rq_reply_state); LASSERT(msg); LASSERT(segment >= 0); - LASSERT(msg->lm_bufcount > segment); + LASSERT(msg->lm_bufcount >= segment); LASSERT(msg->lm_buflens[segment] >= newlen); if (msg->lm_buflens[segment] == newlen) @@ -652,7 +652,8 @@ void lustre_shrink_reply_v2(struct ptlrpc_request *req, int segment, LASSERT(req->rq_reply_state); LASSERT(msg); - LASSERT(msg->lm_bufcount > segment); + LASSERTF(msg->lm_bufcount >= segment, "message have %d - requested %d\n", + msg->lm_bufcount,segment); LASSERT(msg->lm_buflens[segment] >= newlen); if (msg->lm_buflens[segment] == newlen)