From db0b97a5e9b33f6bda93d03c3051acec22fa8787 Mon Sep 17 00:00:00 2001 From: lsy Date: Tue, 17 Oct 2006 13:27:43 +0000 Subject: [PATCH] add full support for remote operations. typo fix in sanity.sh. --- lustre/cmm/mdc_object.c | 5 +---- lustre/include/obd.h | 1 - lustre/include/obd_class.h | 3 +-- lustre/llite/llite_capa.c | 2 ++ lustre/lmv/lmv_intent.c | 1 - lustre/mdc/mdc_internal.h | 1 - lustre/mdc/mdc_lib.c | 12 +++--------- lustre/mdc/mdc_locks.c | 8 +++----- lustre/mdc/mdc_request.c | 10 +++------- lustre/mds/mds_lov.c | 3 ++- lustre/mdt/mdt_handler.c | 44 ++++++++++++++++++++++++++++---------------- lustre/mdt/mdt_lib.c | 5 ++--- lustre/mdt/mdt_reint.c | 3 ++- lustre/tests/sanity.sh | 2 +- 14 files changed, 48 insertions(+), 52 deletions(-) diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c index b53eb29..46c5805 100644 --- a/lustre/cmm/mdc_object.c +++ b/lustre/cmm/mdc_object.c @@ -216,12 +216,10 @@ static int mdc_attr_get(const struct lu_env *env, struct md_object *mo, memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata)); - /* FIXME: split capability */ rc = md_getattr(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), NULL, OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLFLAGS, 0, &mci->mci_req); - if (rc == 0) { /* get attr from request */ rc = mdc_req2attr_update(env, ma); @@ -467,9 +465,8 @@ static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo, mci = mdc_info_init(env); - /* FIXME: capability for split! */ rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), - fid, NULL, NULL, &mci->mci_req); + fid, &mci->mci_req); if (rc) GOTO(out, rc); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 07891fb..25a98f0 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1202,7 +1202,6 @@ struct md_ops { struct ptlrpc_request **); int (*m_is_subdir)(struct obd_export *, const struct lu_fid *, const struct lu_fid *, - struct obd_capa *, struct obd_capa *, struct ptlrpc_request **); int (*m_setattr)(struct obd_export *, struct md_op_data *, void *, int , void *, int, struct ptlrpc_request **); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 8654050..0f45852 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1711,14 +1711,13 @@ static inline int md_rename(struct obd_export *exp, struct md_op_data *op_data, static inline int md_is_subdir(struct obd_export *exp, const struct lu_fid *pfid, const struct lu_fid *cfid, - struct obd_capa *pc, struct obd_capa *cc, struct ptlrpc_request **request) { int rc; ENTRY; EXP_CHECK_MD_OP(exp, is_subdir); MD_COUNTER_INCREMENT(exp->exp_obd, is_subdir); - rc = MDP(exp->exp_obd, is_subdir)(exp, pfid, cfid, pc, cc, request); + rc = MDP(exp->exp_obd, is_subdir)(exp, pfid, cfid, request); RETURN(rc); } diff --git a/lustre/llite/llite_capa.c b/lustre/llite/llite_capa.c index b91ff6e..c7772ed 100644 --- a/lustre/llite/llite_capa.c +++ b/lustre/llite/llite_capa.c @@ -377,10 +377,12 @@ struct obd_capa *ll_mdscapa_get(struct inode *inode) } if (!ocapa && atomic_read(&ll_capa_debug)) { +#if 0 LASSERT(!S_ISDIR(inode->i_mode)); LASSERT(!obd_capa_open_count(ocapa)); LASSERT(!ll_have_md_lock(ocapa->u.cli.inode, MDS_INODELOCK_LOOKUP)); +#endif atomic_set(&ll_capa_debug, 0); } diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 76411ce..23b1ba4 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -767,7 +767,6 @@ repeat: obj = lmv_obj_grab(obd, &body->fid1); if (!obj) { - /* FIXME: remote capability */ obj = lmv_obj_create(exp, &body->fid1, mea); if (IS_ERR(obj)) GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj)); diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index bfcc8e6..22ba601 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -34,7 +34,6 @@ void mdc_pack_capa(struct ptlrpc_request *req, int offset, struct obd_capa *oc); void mdc_pack_rep_body(struct ptlrpc_request *); void mdc_is_subdir_pack(struct ptlrpc_request *req, int offset, const struct lu_fid *pfid, const struct lu_fid *cfid, - struct obd_capa *pc, struct obd_capa *cc, int flags); void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset, __u32 size, const struct lu_fid *fid, diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index c72dea9..7e6cc61 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -64,20 +64,14 @@ void mdc_pack_capa(struct ptlrpc_request *req, int offset, struct obd_capa *oc) void mdc_is_subdir_pack(struct ptlrpc_request *req, int offset, const struct lu_fid *pfid, - const struct lu_fid *cfid, - struct obd_capa *pc, - struct obd_capa *cc, int flags) + const struct lu_fid *cfid, int flags) { struct mdt_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b)); - if (pfid) { + if (pfid) b->fid1 = *pfid; - mdc_pack_capa(req, offset + 1, pc); - } - if (cfid) { + if (cfid) b->fid2 = *cfid; - mdc_pack_capa(req, offset + 2, cc); - } b->valid = OBD_MD_FLID; b->flags = flags; } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 9e95b46..ff1d836 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -289,8 +289,7 @@ int mdc_enqueue(struct obd_export *exp, size[DLM_INTENT_REC_OFF + 1] = op_data->mod_capa1 ? sizeof(struct lustre_capa) : 0; /* child capability, used for replay only */ - size[DLM_INTENT_REC_OFF + 2] = op_data->mod_capa1 ? - sizeof(struct lustre_capa) : 0; + size[DLM_INTENT_REC_OFF + 2] = sizeof(struct lustre_capa); size[DLM_INTENT_REC_OFF + 3] = op_data->namelen + 1; /* As an optimization, we allocate an RPC request buffer for * at least a default-sized LOV EA even if we aren't sending @@ -366,14 +365,13 @@ int mdc_enqueue(struct obd_export *exp, repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize; } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | - OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA; + OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA | + OBD_MD_FLMDSCAPA; valid |= client_is_remote(exp) ? OBD_MD_FLRMTPERM : OBD_MD_FLACL; size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_body); size[DLM_INTENT_REC_OFF + 1] = op_data->mod_capa1 ? sizeof(struct lustre_capa) : 0; - if (op_data->mod_capa1) - valid |= OBD_MD_FLMDSCAPA; size[DLM_INTENT_REC_OFF + 2] = op_data->namelen + 1; if (it->it_op & IT_GETATTR) diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index a2c9bd12..a85031c 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -303,25 +303,21 @@ int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid, int mdc_is_subdir(struct obd_export *exp, const struct lu_fid *pfid, const struct lu_fid *cfid, - struct obd_capa *pc, struct obd_capa *cc, struct ptlrpc_request **request) { - int size[4] = { sizeof(struct ptlrpc_body), + int size[2] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) }; struct ptlrpc_request *req; struct mdt_body *body; int rc; ENTRY; - size[REQ_REC_OFF + 1] = pc ? sizeof(struct lustre_capa) : 0; - size[REQ_REC_OFF + 2] = cc ? sizeof(struct lustre_capa) : 0; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_IS_SUBDIR, 4, size, NULL); + MDS_IS_SUBDIR, 2, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); - mdc_is_subdir_pack(req, REQ_REC_OFF, pfid, cfid, pc, cc, 0); + mdc_is_subdir_pack(req, REQ_REC_OFF, pfid, cfid, 0); ptlrpc_req_set_repsize(req, 2, size); rc = ptlrpc_queue_wait(req); diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index e965887..cce6d49 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -337,7 +337,8 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) if (data == NULL) RETURN(-ENOMEM); data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX | - OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64; + OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | + OBD_CONNECT_OSS_CAPA; data->ocd_version = LUSTRE_VERSION_CODE; data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0; /* NB: lov_connect() needs to fill in .ocd_index for each OST */ diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 4d693b5..96ab26f 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -277,7 +277,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, struct mdt_object *o) { struct md_object *next = mdt_object_child(o); - struct mdt_device *mdt = info->mti_mdt; const struct mdt_body *reqbody = info->mti_body; struct ptlrpc_request *req = mdt_info_req(info); struct mdt_export_data *med = &req->rq_export->exp_mdt_data; @@ -406,7 +405,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, } #endif - if ((reqbody->valid & OBD_MD_FLMDSCAPA) && mdt->mdt_opts.mo_mds_capa) { + if ((reqbody->valid & OBD_MD_FLMDSCAPA) && + info->mti_mdt->mdt_opts.mo_mds_capa) { struct lustre_capa *capa; capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1); @@ -417,7 +417,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, RETURN(rc); repbody->valid |= OBD_MD_FLMDSCAPA; } - RETURN(rc); } @@ -464,21 +463,31 @@ static int mdt_getattr(struct mdt_thread_info *info) int rc; ENTRY; - LASSERT(obj != NULL); - LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu)); - reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); LASSERT(reqbody); + if (reqbody->valid & OBD_MD_FLOSSCAPA) { + rc = req_capsule_pack(pill); + if (rc) + RETURN(err_serious(rc)); + rc = mdt_renew_capa(info); + mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0); + RETURN(rc); + } + + LASSERT(obj != NULL); + LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu)); + mode = lu_object_attr(&obj->mot_obj.mo_lu); if (S_ISLNK(mode) && (reqbody->valid & OBD_MD_LINKNAME) && - (reqbody->eadatasize > info->mti_mdt->mdt_max_mdsize)) { + (reqbody->eadatasize > info->mti_mdt->mdt_max_mdsize)) { req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, reqbody->eadatasize); } else { req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, info->mti_mdt->mdt_max_mdsize); } + rc = req_capsule_pack(pill); if (rc != 0) RETURN(err_serious(rc)); @@ -488,18 +497,16 @@ static int mdt_getattr(struct mdt_thread_info *info) repbody->eadatasize = 0; repbody->aclsize = 0; - if (reqbody->valid & OBD_MD_FLOSSCAPA) { - rc = mdt_renew_capa(info); - mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0); - RETURN(rc); - } - if (reqbody->valid & OBD_MD_FLRMTPERM) { rc = mdt_init_ucred(info, reqbody); if (rc) GOTO(out, rc); } + /* don't check capability at all, because rename might + * getattr for remote obj, and at that time no capability + * is available. */ + mdt_set_capainfo(info, 1, &reqbody->fid1, BYPASS_CAPA); rc = mdt_getattr_internal(info, obj); if (reqbody->valid & OBD_MD_FLRMTPERM) mdt_exit_ucred(info); @@ -513,6 +520,7 @@ static int mdt_is_subdir(struct mdt_thread_info *info) { struct mdt_object *obj = info->mti_object; struct req_capsule *pill = &info->mti_pill; + const struct mdt_body *body = info->mti_body; struct mdt_body *repbody; int rc; @@ -527,9 +535,11 @@ static int mdt_is_subdir(struct mdt_thread_info *info) * We save last checked parent fid to @repbody->fid1 for remote * directory case. */ - LASSERT(fid_is_sane(&info->mti_body->fid2)); + LASSERT(fid_is_sane(&body->fid2)); + mdt_set_capainfo(info, 0, &body->fid1, BYPASS_CAPA); + mdt_set_capainfo(info, 1, &body->fid2, BYPASS_CAPA); rc = mdo_is_subdir(info->mti_env, mdt_object_child(obj), - &info->mti_body->fid2, &repbody->fid1); + &body->fid2, &repbody->fid1); if (rc < 0) RETURN(rc); @@ -2592,7 +2602,9 @@ static int mdt_md_connect(const struct lu_env *env, if (!ocd) RETURN(-ENOMEM); /* The connection between MDS must be local */ - ocd->ocd_connect_flags |= OBD_CONNECT_LCL_CLIENT; + ocd->ocd_connect_flags = OBD_CONNECT_LCL_CLIENT | + OBD_CONNECT_MDS_CAPA | + OBD_CONNECT_OSS_CAPA; rc = obd_connect(env, conn, mdc, &mdc->obd_uuid, ocd); OBD_FREE_PTR(ocd); diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index 0b31fab..cb9bc02 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -735,11 +735,10 @@ static int mdt_create_unpack(struct mdt_thread_info *info) memset(&sp->u, 0, sizeof sp->u); sp->sp_cr_flags = rec->cr_flags; - if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) { + if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) mdt_set_capainfo(info, 0, rr->rr_fid1, req_capsule_client_get(pill, &RMF_CAPA1)); - mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA); - } + mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (S_ISDIR(attr->la_mode)) { diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index d089d36..0addab2 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -75,7 +75,6 @@ static int mdt_md_create(struct mdt_thread_info *info) mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_CREATE_WRITE); - mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA); rc = mdo_create(info->mti_env, next, rr->rr_name, mdt_object_child(child), &info->mti_spec, ma); @@ -397,6 +396,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, * and nothing should be done */ if (mdt_object_exists(mp) > 0) { + mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA); rc = mo_ref_del(info->mti_env, mdt_object_child(mp), ma); mdt_handle_last_unlink(info, mp, ma); @@ -546,6 +546,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) GOTO(out, rc = PTR_ERR(mtgtdir)); /*step 2: find & lock the target object if exists*/ + mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA); rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir), rr->rr_tgt, tgt_fid); if (rc != 0 && rc != -ENOENT) { diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 49d640d..016661b 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -698,7 +698,7 @@ test_24g() { $CHECKSTAT -a $DIR/R7a/d || error $CHECKSTAT -t dir $DIR/R7b/e || error } -run_test 24g "mkdir .../R7{a,b}/d; mv .../R7a/d .../R5b/e ======" +run_test 24g "mkdir .../R7{a,b}/d; mv .../R7a/d .../R7b/e ======" test_24h() { mkdir $DIR/R8{a,b} -- 1.8.3.1