From 7affc8d8f145a87c0f6a5e04dadbb1c3e18ce622 Mon Sep 17 00:00:00 2001 From: yury Date: Mon, 18 Sep 2006 16:11:02 +0000 Subject: [PATCH] - rename locking + CMD aware is_subdir() check. --- lustre/cmm/cmm_object.c | 44 +++++++++++++++------ lustre/cmm/mdc_object.c | 42 ++++++++++++++++++-- lustre/include/lustre/lustre_idl.h | 3 +- lustre/include/lustre_req_layout.h | 1 + lustre/include/md_object.h | 14 ++++++- lustre/include/obd.h | 2 + lustre/include/obd_class.h | 17 +++++++- lustre/include/obd_support.h | 2 + lustre/lmv/lmv_intent.c | 2 +- lustre/mdc/mdc_internal.h | 6 +++ lustre/mdc/mdc_lib.c | 15 +++++++ lustre/mdc/mdc_request.c | 34 ++++++++++++++++ lustre/mdd/mdd_handler.c | 81 +++++++++++++++++++++++++++++--------- lustre/mds/handler.c | 1 + lustre/mdt/mdt_handler.c | 40 ++++++++++++++++++- lustre/mdt/mdt_internal.h | 1 - lustre/mdt/mdt_reint.c | 49 ++++++++++++++++++----- lustre/obdclass/lprocfs_status.c | 1 + lustre/ptlrpc/layout.c | 6 +++ lustre/ptlrpc/lproc_ptlrpc.c | 3 +- 20 files changed, 312 insertions(+), 52 deletions(-) diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index b5d64f1..1929c09 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -444,7 +444,7 @@ static int __cmm_mode_get(const struct lu_context *ctx, struct md_device *md, /* get type from src, can be remote req */ rc = mo_attr_get(ctx, md_object_next(mo_s), tmp_ma); - if (rc == 0) + if (rc == 0 && ma != NULL) ma->ma_attr.la_mode = tmp_ma->ma_attr.la_mode; lu_object_put(ctx, &mo_s->mo_lu); @@ -456,7 +456,6 @@ static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po, const char *s_name, struct md_object *mo_t, const char *t_name, struct md_attr *ma) { - struct cmm_thread_info *cmi; int rc; ENTRY; @@ -467,16 +466,15 @@ static int cml_rename(const struct lu_context *ctx, struct md_object *mo_po, /* mo_t is remote object and there is RPC to unlink it */ rc = mo_ref_del(ctx, md_object_next(mo_t), ma); if (rc) - GOTO(out, rc); + RETURN(rc); mo_t = NULL; } + /* local rename, mo_t can be NULL */ rc = mdo_rename(ctx, md_object_next(mo_po), md_object_next(mo_pn), lf, s_name, md_object_next(mo_t), t_name, ma); - EXIT; -out: - return rc; + RETURN(rc); } static int cml_rename_tgt(const struct lu_context *ctx, @@ -504,7 +502,28 @@ static int cml_name_insert(const struct lu_context *ctx, RETURN(rc); } +/* Common method for remote and local use. */ +static int cmm_is_subdir(const struct lu_context *ctx, struct md_object *mo, + const struct lu_fid *fid, struct lu_fid *sfid) +{ + struct cmm_thread_info *cmi; + int rc; + ENTRY; + + rc = __cmm_mode_get(ctx, md_obj2dev(mo), fid, NULL); + if (rc) + RETURN(rc); + + cmi = lu_context_key_get(ctx, &cmm_thread_key); + if (!S_ISDIR(cmi->cmi_ma.ma_attr.la_mode)) + RETURN(0); + + rc = mdo_is_subdir(ctx, md_object_next(mo), fid, sfid); + RETURN(rc); +} + static struct md_dir_operations cml_dir_ops = { + .mdo_is_subdir = cmm_is_subdir, .mdo_lookup = cml_lookup, .mdo_create = cml_create, .mdo_link = cml_link, @@ -695,8 +714,10 @@ static struct md_object_operations cmr_mo_ops = { static int cmr_lookup(const struct lu_context *ctx, struct md_object *mo_p, const char *name, struct lu_fid *lf) { - /*this can happens while rename() - * If new parent is remote dir, lookup will happens here */ + /* + * This can happens while rename() If new parent is remote dir, lookup + * will happen here. + */ RETURN(-EREMOTE); } @@ -783,9 +804,9 @@ static int cmr_unlink(const struct lu_context *ctx, struct md_object *mo_p, } static int cmr_rename(const struct lu_context *ctx, struct md_object *mo_po, - struct md_object *mo_pn, const struct lu_fid *lf, - const char *s_name, struct md_object *mo_t, - const char *t_name, struct md_attr *ma) + struct md_object *mo_pn, const struct lu_fid *lf, + const char *s_name, struct md_object *mo_t, + const char *t_name, struct md_attr *ma) { int rc; ENTRY; @@ -828,6 +849,7 @@ static int cmr_rename_tgt(const struct lu_context *ctx, } static struct md_dir_operations cmr_dir_ops = { + .mdo_is_subdir = cmm_is_subdir, .mdo_lookup = cmr_lookup, .mdo_create = cmr_create, .mdo_link = cmr_link, diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c index 0146a18..a5d4c3f 100644 --- a/lustre/cmm/mdc_object.c +++ b/lustre/cmm/mdc_object.c @@ -290,9 +290,9 @@ int mdc_send_page(struct cmm_device *cm, const struct lu_context *ctx, ENTRY; rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), - page, offset); + page, offset); CDEBUG(D_INFO, "send page %p offset %d fid "DFID" rc %d \n", - page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc); + page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc); RETURN(rc); } #endif @@ -335,7 +335,43 @@ static int mdc_rename_tgt(const struct lu_context *ctx, RETURN(rc); } +static int mdc_is_subdir(const struct lu_context *ctx, struct md_object *mo, + const struct lu_fid *fid, struct lu_fid *sfid) +{ + struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); + struct mdc_thread_info *mci; + struct mdt_body *body; + int rc; + ENTRY; + + mci = mdc_info_init(ctx); + + rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), + fid, &mci->mci_req); + + if (rc) + GOTO(out, rc); + + body = lustre_msg_buf(mci->mci_req->rq_repmsg, REPLY_REC_OFF, + sizeof(*body)); + + LASSERT(body->valid & (OBD_MD_FLMODE | OBD_MD_FLID) && + (body->mode == 0 || body->mode == 1 || body->mode == EREMOTE)); + + rc = body->mode; + if (rc == EREMOTE) { + CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src " + DFID"\n", PFID(&body->fid1)); + *sfid = body->fid1; + } + EXIT; +out: + ptlrpc_req_finished(mci->mci_req); + return rc; +} + static struct md_dir_operations mdc_dir_ops = { - .mdo_rename_tgt = mdc_rename_tgt, + .mdo_is_subdir = mdc_is_subdir, + .mdo_rename_tgt = mdc_rename_tgt }; diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 694ec2e..23b835b 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -825,7 +825,8 @@ typedef enum { MDS_QUOTACTL = 48, MDS_GETXATTR = 49, MDS_SETXATTR = 50, - MDS_WRITEPAGE = 51, + MDS_WRITEPAGE = 51, + MDS_IS_SUBDIR = 52, MDS_LAST_OPC } mds_cmd_t; diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index 64cb02b..fe92be7 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -110,6 +110,7 @@ extern const struct req_format RQF_MDS_CONNECT; extern const struct req_format RQF_MDS_DISCONNECT; extern const struct req_format RQF_MDS_READPAGE; extern const struct req_format RQF_MDS_WRITEPAGE; +extern const struct req_format RQF_MDS_IS_SUBDIR; extern const struct req_format RQF_MDS_DONE_WRITING; /* diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index f9be3dd..6fae2d9 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -92,10 +92,10 @@ struct md_create_spec { * Operations implemented for each md object (both directory and leaf). */ struct md_object_operations { - int (*moo_attr_get)(const struct lu_context *ctxt, struct md_object *dt, + int (*moo_attr_get)(const struct lu_context *ctxt, struct md_object *obj, struct md_attr *attr); - int (*moo_attr_set)(const struct lu_context *ctxt, struct md_object *dt, + int (*moo_attr_set)(const struct lu_context *ctxt, struct md_object *obj, const struct md_attr *attr); int (*moo_xattr_get)(const struct lu_context *ctxt, @@ -136,6 +136,9 @@ struct md_object_operations { * Operations implemented for each directory object. */ struct md_dir_operations { + int (*mdo_is_subdir) (const struct lu_context *, struct md_object *, + const struct lu_fid *, struct lu_fid *); + int (*mdo_lookup)(const struct lu_context *, struct md_object *, const char *, struct lu_fid *); @@ -379,6 +382,13 @@ static inline int mdo_rename(const struct lu_context *cx, return tp->mo_dir_ops->mdo_rename(cx, sp, tp, lf, sname, t, tname, ma); } +static inline int mdo_is_subdir(const struct lu_context *cx, struct md_object *mo, + const struct lu_fid *fid, struct lu_fid *sfid) +{ + LASSERT(mo->mo_dir_ops->mdo_is_subdir); + return mo->mo_dir_ops->mdo_is_subdir(cx, mo, fid, sfid); +} + static inline int mdo_link(const struct lu_context *cx, struct md_object *p, struct md_object *s, const char *name, struct md_attr *ma) diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 6dce92d..5c1ac2c 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1128,6 +1128,8 @@ struct md_ops { int (*m_rename)(struct obd_export *, struct md_op_data *, const char *, int, const char *, int, struct ptlrpc_request **); + int (*m_is_subdir)(struct obd_export *, const struct lu_fid *, + const struct lu_fid *, struct ptlrpc_request **); int (*m_setattr)(struct obd_export *, struct md_op_data *, void *, int , void *, int, struct ptlrpc_request **); int (*m_sync)(struct obd_export *, const struct lu_fid *, diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index cb0060d..a893765 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1646,7 +1646,7 @@ static inline int md_enqueue(struct obd_export *exp, int lock_type, static inline int md_getattr_name(struct obd_export *exp, const struct lu_fid *fid, - const char *filename, int namelen, + const char *name, int namelen, obd_valid valid, int ea_size, struct ptlrpc_request **request) { @@ -1654,7 +1654,7 @@ static inline int md_getattr_name(struct obd_export *exp, ENTRY; EXP_CHECK_MD_OP(exp, getattr_name); MD_COUNTER_INCREMENT(exp->exp_obd, getattr_name); - rc = MDP(exp->exp_obd, getattr_name)(exp, fid, filename, namelen, + rc = MDP(exp->exp_obd, getattr_name)(exp, fid, name, namelen, valid, ea_size, request); RETURN(rc); } @@ -1704,6 +1704,19 @@ static inline int md_rename(struct obd_export *exp, RETURN(rc); } +static inline int md_is_subdir(struct obd_export *exp, + const struct lu_fid *pfid, + const struct lu_fid *cfid, + struct ptlrpc_request **request) +{ + int rc; + ENTRY; + EXP_CHECK_MD_OP(exp, is_subdir); + MD_COUNTER_INCREMENT(exp->exp_obd, is_subdir); + rc = MDP(exp->exp_obd, is_subdir)(exp, pfid, cfid, request); + RETURN(rc); +} + static inline int md_setattr(struct obd_export *exp, struct md_op_data *op_data, void *ea, int ealen, void *ea2, int ea2len, struct ptlrpc_request **request) diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index cad9b68..3cfed08 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -95,6 +95,8 @@ extern int obd_race_state; #define OBD_FAIL_MDS_SETXATTR_WRITE 0x134 #define OBD_FAIL_MDS_WRITEPAGE_NET 0x135 #define OBD_FAIL_MDS_WRITEPAGE_PACK 0x136 +#define OBD_FAIL_MDS_IS_SUBDIR_NET 0x137 +#define OBD_FAIL_MDS_IS_SUBDIR_PACK 0x138 #define OBD_FAIL_OST 0x200 #define OBD_FAIL_OST_CONNECT_NET 0x201 diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 4eabff0..9de3265 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -250,7 +250,7 @@ repeat: } } else if (rc == -ESTALE && it->d.lustre.it_lock_mode) { /* cross-ref open can have lookup lock on child */ - ldlm_lock_decref(&it->d.lustre.it_lock_handle, + ldlm_lock_decref((struct lustre_handle *)&it->d.lustre.it_lock_handle, it->d.lustre.it_lock_mode); } diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index a403ed6..fc33ca9 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -31,6 +31,9 @@ void mdc_pack_req_body(struct ptlrpc_request *req, int offset, __u64 valid, const struct lu_fid *fid, int ea_size, int flags); void mdc_pack_rep_body(struct ptlrpc_request *); +void mdc_is_subdir_pack(struct ptlrpc_request *req, int offset, + const struct lu_fid *pfid, + const struct lu_fid *cfid, int flags); void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset, __u32 size, const struct lu_fid *fid); void mdc_getattr_pack(struct ptlrpc_request *req, int offset, __u64 valid, @@ -190,6 +193,9 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data, const char *old, int oldlen, const char *new, int newlen, struct ptlrpc_request **request); +int mdc_is_subdir(struct obd_export *exp, const struct lu_fid *pfid, + const struct lu_fid *cfid, struct ptlrpc_request **request); + int mdc_sync(struct obd_export *exp, const struct lu_fid *fid, struct ptlrpc_request **); diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index 2ad40ba..670a55d 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -53,6 +53,20 @@ void mdc_readdir_pack(struct ptlrpc_request *req, int pos, __u64 offset, b->nlink = size; /* !! */ } +void mdc_is_subdir_pack(struct ptlrpc_request *req, int offset, + const struct lu_fid *pfid, + const struct lu_fid *cfid, int flags) +{ + struct mdt_body *b = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*b)); + + if (pfid) + b->fid1 = *pfid; + if (cfid) + b->fid2 = *cfid; + b->valid = OBD_MD_FLID; + b->flags = flags; +} + static void mdc_pack_body(struct mdt_body *b) { LASSERT (b != NULL); @@ -75,6 +89,7 @@ void mdc_pack_req_body(struct ptlrpc_request *req, int offset, b->flags = flags; mdc_pack_body(b); } + /* packing of MDS records */ void mdc_create_pack(struct ptlrpc_request *req, int offset, struct md_op_data *op_data, const void *data, int datalen, diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 06af6d2..3e3a338 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -234,6 +234,39 @@ int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } +int mdc_is_subdir(struct obd_export *exp, const struct lu_fid *pfid, + const struct lu_fid *cfid, struct ptlrpc_request **request) +{ + int size[2] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) }; + struct ptlrpc_request *req; + struct mdt_body *body; + int rc; + ENTRY; + + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, + MDS_IS_SUBDIR, 2, size, NULL); + if (!req) + GOTO(out, rc = -ENOMEM); + + mdc_is_subdir_pack(req, REQ_REC_OFF, pfid, cfid, 0); + + ptlrpc_req_set_repsize(req, 2, size); + rc = ptlrpc_queue_wait(req); + if (rc != 0) + GOTO(out, rc); + + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); + if (body == NULL) { + CERROR ("Can't unpack mdt_body\n"); + GOTO(out, rc = -EPROTO); + } + EXIT; + out: + *request = req; + return rc; +} + static int mdc_xattr_common(struct obd_export *exp, const struct lu_fid *fid, int opcode, obd_valid valid, const char *xattr_name, @@ -1435,6 +1468,7 @@ struct md_ops mdc_md_ops = { .m_getattr_name = mdc_getattr_name, .m_intent_lock = mdc_intent_lock, .m_link = mdc_link, + .m_is_subdir = mdc_is_subdir, .m_rename = mdc_rename, .m_setattr = mdc_setattr, .m_setxattr = mdc_setxattr, diff --git a/lustre/mdd/mdd_handler.c b/lustre/mdd/mdd_handler.c index c8b0354..1645208 100644 --- a/lustre/mdd/mdd_handler.c +++ b/lustre/mdd/mdd_handler.c @@ -1338,10 +1338,11 @@ static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj, rc = __mdd_finish_unlink(ctxt, mdd_obj, ma, handle); + EXIT; cleanup: mdd_write_unlock(ctxt, mdd_obj); mdd_trans_stop(ctxt, mdd, rc, handle); - RETURN(rc); + return rc; } static int mdd_parent_fid(const struct lu_context *ctxt, @@ -1352,44 +1353,58 @@ static int mdd_parent_fid(const struct lu_context *ctxt, } /* - * return 0: if lf is the fid of the ancestor of p1 - * otherwise: other_value + * return 1: if lf is the fid of the ancestor of p1; + * return 0: if not; + * + * return -EREMOTE: if remote object is found, in this + * case fid of remote object is saved to @pf; + * + * otherwise: values < 0, errors. */ static int mdd_is_parent(const struct lu_context *ctxt, struct mdd_device *mdd, struct mdd_object *p1, - const struct lu_fid *lf) + const struct lu_fid *lf, + struct lu_fid *pf) { - struct lu_fid * pfid; struct mdd_object *parent = NULL; + struct lu_fid *pfid; int rc; ENTRY; + LASSERT(!lu_fid_eq(mdo2fid(p1), lf)); pfid = &mdd_ctx_info(ctxt)->mti_fid; + + /* Do not lookup ".." in root, they do not exist there. */ if (lu_fid_eq(mdo2fid(p1), &mdd->mdd_root_fid)) - RETURN(1); + RETURN(0); + for(;;) { rc = mdd_parent_fid(ctxt, p1, pfid); if (rc) GOTO(out, rc); - if (lu_fid_eq(pfid, lf)) - GOTO(out, rc = 0); if (lu_fid_eq(pfid, &mdd->mdd_root_fid)) + GOTO(out, rc = 0); + if (lu_fid_eq(pfid, lf)) GOTO(out, rc = 1); if (parent) mdd_object_put(ctxt, parent); parent = mdd_object_find(ctxt, mdd, pfid); + /* cross-ref parent, not supported yet */ - if (parent == NULL) - GOTO(out, rc = -EOPNOTSUPP); - else if (IS_ERR(parent)) + if (parent == NULL) { + if (pf != NULL) + *pf = *pfid; + GOTO(out, rc = -EREMOTE); + } else if (IS_ERR(parent)) GOTO(out, rc = PTR_ERR(parent)); p1 = parent; } + EXIT; out: if (parent && !IS_ERR(parent)) mdd_object_put(ctxt, parent); - RETURN(rc); + return rc; } static int mdd_rename_lock(const struct lu_context *ctxt, @@ -1397,13 +1412,15 @@ static int mdd_rename_lock(const struct lu_context *ctxt, struct mdd_object *src_pobj, struct mdd_object *tgt_pobj) { + int rc; ENTRY; if (src_pobj == tgt_pobj) { mdd_write_lock(ctxt, src_pobj); RETURN(0); } - /*compared the parent child relationship of src_p&tgt_p*/ + + /* compared the parent child relationship of src_p&tgt_p */ if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){ mdd_lock2(ctxt, src_pobj, tgt_pobj); RETURN(0); @@ -1412,7 +1429,11 @@ static int mdd_rename_lock(const struct lu_context *ctxt, RETURN(0); } - if (!mdd_is_parent(ctxt, mdd, src_pobj, mdo2fid(tgt_pobj))) { + rc = mdd_is_parent(ctxt, mdd, src_pobj, mdo2fid(tgt_pobj), NULL); + if (rc < 0) + RETURN(rc); + + if (rc == 1) { mdd_lock2(ctxt, tgt_pobj, src_pobj); RETURN(0); } @@ -1438,7 +1459,6 @@ static int mdd_rename_sanity_check(const struct lu_context *ctxt, int src_is_dir, struct mdd_object *tobj) { - struct mdd_device *mdd =mdo2mdd(&src_pobj->mod_obj); int rc = 0, tgt_is_dir; ENTRY; @@ -1458,10 +1478,6 @@ static int mdd_rename_sanity_check(const struct lu_context *ctxt, if (rc) RETURN(rc); - /* source should not be ancestor of target dir */ - if (src_is_dir && !mdd_is_parent(ctxt, mdd, tgt_pobj, sfid)) - rc = -EINVAL; - RETURN(rc); } /* src object can be remote that is why we use only fid and type of object */ @@ -1587,6 +1603,32 @@ static int mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj, RETURN(rc); } +/* + * returns 1: if fid is subdir of @mo; + * returns 0: if fid is not a subdir of @mo; + * + * returns EREMOTE if remote object is found, fid of remote object is saved to + * @fid; + * + * returns < 0: if error + */ +static int mdd_is_subdir(const struct lu_context *ctx, struct md_object *mo, + const struct lu_fid *fid, struct lu_fid *sfid) +{ + struct mdd_device *mdd = mdo2mdd(mo); + int rc; + ENTRY; + + if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo)))) + RETURN(0); + + rc = mdd_is_parent(ctx, mdd, md2mdd_obj(mo), fid, sfid); + if (rc == -EREMOTE) + rc = EREMOTE; + + RETURN(rc); +} + static int __mdd_object_initialize(const struct lu_context *ctxt, const struct lu_fid *pfid, struct mdd_object *child, @@ -2200,6 +2242,7 @@ struct md_device_operations mdd_ops = { }; static struct md_dir_operations mdd_dir_ops = { + .mdo_is_subdir = mdd_is_subdir, .mdo_lookup = mdd_lookup, .mdo_create = mdd_create, .mdo_rename = mdd_rename, diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index b0ff285..f9c9c7f 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1366,6 +1366,7 @@ int mds_msg_check_version(struct lustre_msg *msg) case MDS_STATFS: case MDS_READPAGE: case MDS_WRITEPAGE: + case MDS_IS_SUBDIR: case MDS_REINT: case MDS_CLOSE: case MDS_DONE_WRITING: diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index dd03a60..61e2aed 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -265,7 +265,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->eadatasize = 0; repbody->aclsize = 0; - if(reqbody->valid & OBD_MD_MEA) { + if (reqbody->valid & OBD_MD_MEA) { /* Assumption: MDT_MD size is enough for lmv size FIXME */ ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD); ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD, @@ -375,6 +375,43 @@ static int mdt_getattr(struct mdt_thread_info *info) RETURN(rc); } +static int mdt_is_subdir(struct mdt_thread_info *info) +{ + struct mdt_object *obj = info->mti_object; + struct req_capsule *pill = &info->mti_pill; + struct mdt_body *repbody; + int rc; + + obj = info->mti_object; + LASSERT(obj != NULL); + LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu)); + ENTRY; + + repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); + + /* + * We save last checked parent fid to @repbody->fid1 for remote + * directory case. + */ + rc = mdo_is_subdir(info->mti_ctxt, mdt_object_child(obj), + &info->mti_tmp_fid2, &repbody->fid1); + if (rc < 0) + RETURN(rc); + + /* + * Save error code to ->mode. Later it it is used for detecting the case + * of remote subdir. + */ + repbody->mode = rc; + repbody->valid = OBD_MD_FLMODE; + + if (rc == EREMOTE) + repbody->valid |= OBD_MD_FLID; + + + RETURN(0); +} + /* * UPDATE lock should be taken against parent, and be release before exit; * child_bits lock should be taken against child, and be returned back: @@ -3463,6 +3500,7 @@ DEF_MDT_HNDL_F(HABEO_CORPUS , CLOSE, mdt_close), DEF_MDT_HNDL_F(HABEO_CORPUS , DONE_WRITING, mdt_done_writing), DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin), DEF_MDT_HNDL_0(0, SYNC, mdt_sync), +DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR, mdt_is_subdir), DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_quotacheck_handle), DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle) }; diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index da528e1..0705f88 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -301,7 +301,6 @@ struct mdt_thread_info { struct mdt_client_data mti_mcd; loff_t mti_off; struct txn_param mti_txn_param; - }; /* * Info allocated per-transaction. diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 01798b7..14f7574 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -557,9 +557,37 @@ static void mdt_rename_unlock(struct lustre_handle *lh) EXIT; } -static int mdt_rename_check(struct mdt_thread_info *info) +/* + * This is is_subdir() variant, it is CMD is cmm forwards it to correct + * target. Source should not be ancestor of target dir. May be other rename + * checks can be moved here later. + */ +static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid) { - return 0; + struct mdt_reint_record *rr = &info->mti_rr; + struct lu_fid dst_fid = *rr->rr_fid2; + struct mdt_object *dst; + int rc = 0; + ENTRY; + + do { + dst = mdt_object_find(info->mti_ctxt, info->mti_mdt, &dst_fid); + if (!IS_ERR(dst)) { + rc = mdo_is_subdir(info->mti_ctxt, mdt_object_child(dst), + fid, &dst_fid); + mdt_object_put(info->mti_ctxt, dst); + if (rc < 0) { + CERROR("Error while doing mdo_is_subdir(), rc %d\n", + rc); + } else if (rc == 1) { + rc = -EINVAL; + } + } else { + rc = PTR_ERR(dst); + } + } while (rc == EREMOTE); + + RETURN(rc); } static int mdt_reint_rename(struct mdt_thread_info *info) @@ -600,10 +628,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info) RETURN(rc); } - rc = mdt_rename_check(info); - if (rc) - GOTO(out, rc); - lh_newp = &info->mti_lh[MDT_LH_NEW]; /* step 1: lock the source dir */ @@ -683,10 +707,15 @@ static int mdt_reint_rename(struct mdt_thread_info *info) mdt_fail_write(info->mti_ctxt, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_RENAME_WRITE); + /* Check if @dst is subdir of @src. */ + rc = mdt_rename_check(info, old_fid); + if (rc) + GOTO(out_unlock_new, rc); + rc = mdo_rename(info->mti_ctxt, mdt_object_child(msrcdir), - mdt_object_child(mtgtdir), old_fid, - rr->rr_name, mnew ? mdt_object_child(mnew): NULL, - rr->rr_tgt, ma); + mdt_object_child(mtgtdir), old_fid, rr->rr_name, + (mnew ? mdt_object_child(mnew) : NULL), rr->rr_tgt, ma); + /* handle last link of tgt object */ if (mnew) mdt_handle_last_unlink(info, mnew, ma); @@ -701,8 +730,8 @@ out_unlock_target: mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc); out_unlock_source: mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc); - mdt_rename_unlock(&rename_lh); out: + mdt_rename_unlock(&rename_lh); mdt_shrink_reply(info, REPLY_REC_OFF + 1); return rc; } diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index ce75703..1e5d5bd 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -832,6 +832,7 @@ int lprocfs_alloc_md_stats(struct obd_device *obd, LPROCFS_MD_OP_INIT(num_private_stats, stats, cancel_unused); LPROCFS_MD_OP_INIT(num_private_stats, stats, link); LPROCFS_MD_OP_INIT(num_private_stats, stats, rename); + LPROCFS_MD_OP_INIT(num_private_stats, stats, is_subdir); LPROCFS_MD_OP_INIT(num_private_stats, stats, setattr); LPROCFS_MD_OP_INIT(num_private_stats, stats, sync); LPROCFS_MD_OP_INIT(num_private_stats, stats, readpage); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 431f1d5..1c918dd 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -314,6 +314,7 @@ static const struct req_format *req_formats[] = { &RQF_MDS_PIN, &RQF_MDS_READPAGE, &RQF_MDS_WRITEPAGE, + &RQF_MDS_IS_SUBDIR, &RQF_MDS_DONE_WRITING }; @@ -655,6 +656,11 @@ const struct req_format RQF_MDS_WRITEPAGE = mdt_body_only, mdt_body_only); EXPORT_SYMBOL(RQF_MDS_WRITEPAGE); +const struct req_format RQF_MDS_IS_SUBDIR = + DEFINE_REQ_FMT0("MDS_IS_SUBDIR", + mdt_body_only, mdt_body_only); +EXPORT_SYMBOL(RQF_MDS_IS_SUBDIR); + #if !defined(__REQ_LAYOUT_USER__) int req_layout_init(void) diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 2911944..a430307 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -75,7 +75,8 @@ struct ll_rpc_opcode { { MDS_QUOTACTL, "mds_quotactl" }, { MDS_GETXATTR, "mds_getxattr" }, { MDS_SETXATTR, "mds_setxattr" }, - { MDS_WRITEPAGE, "mds_writepage" }, + { MDS_WRITEPAGE, "mds_writepage" }, + { MDS_IS_SUBDIR, "mds_is_subdir" }, { LDLM_ENQUEUE, "ldlm_enqueue" }, { LDLM_CONVERT, "ldlm_convert" }, { LDLM_CANCEL, "ldlm_cancel" }, -- 1.8.3.1