From 4112a290df2763d53760ef6a96ee2453a41f1856 Mon Sep 17 00:00:00 2001 From: wangdi Date: Tue, 19 Nov 2013 06:34:21 -0800 Subject: [PATCH] LU-1187 mdt: unlink remote directory Send unlink req to the slave MDT, so both open/close and unlink will send request to the slave MDT, where the remote object resides. Then it would be able to check orphan (unlink open file) locally. Add lock_object api for enqueue remote object. Change-Id: I7483b0f023e4e3de6597da58d3d9f3e96c0d53b7 Signed-off-by: Wang Di Reviewed-on: http://review.whamcloud.com/4339 Reviewed-by: Andreas Dilger Reviewed-by: Alex Zhuravlev Tested-by: Hudson Tested-by: Maloo --- lustre/include/dt_object.h | 19 ++++ lustre/include/lustre_dlm.h | 5 + lustre/include/lustre_update.h | 2 +- lustre/include/md_object.h | 41 ++++++-- lustre/ldlm/ldlm_request.c | 22 +++++ lustre/llite/namei.c | 12 ++- lustre/lmv/lmv_obd.c | 27 +++++- lustre/lod/lod_dev.c | 2 + lustre/lod/lod_object.c | 20 ++++ lustre/mdd/mdd_dir.c | 205 ++++++++++++++++++++++++--------------- lustre/mdd/mdd_internal.h | 15 --- lustre/mdd/mdd_object.c | 61 +++++++----- lustre/mdd/mdd_permission.c | 20 ++-- lustre/mdt/mdt_handler.c | 91 ++++++++++++++--- lustre/mdt/mdt_internal.h | 10 ++ lustre/mdt/mdt_lib.c | 23 +++-- lustre/mdt/mdt_mds.c | 4 +- lustre/mdt/mdt_reint.c | 155 +++++++++++++++++++++-------- lustre/osd-ldiskfs/osd_handler.c | 13 +-- lustre/osp/osp_dev.c | 5 +- lustre/osp/osp_md_object.c | 46 ++++++++- lustre/osp/osp_object.c | 1 + lustre/tests/replay-dual.sh | 16 +-- lustre/tests/replay-single.sh | 48 ++++----- lustre/tests/test-framework.sh | 4 +- 25 files changed, 610 insertions(+), 257 deletions(-) diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 07f9a4b..8de5cc5 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -67,6 +67,7 @@ struct dt_object; struct dt_index_features; struct niobuf_local; struct niobuf_remote; +struct ldlm_enqueue_info; typedef enum { MNTOPT_USERXATTR = 0x00000001, @@ -524,6 +525,12 @@ struct dt_body_operations { struct lustre_capa *capa); }; +struct dt_lock_operations { + int (*do_object_lock)(const struct lu_env *env, struct dt_object *dt, + struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy); +}; /** * Incomplete type of index record. */ @@ -667,6 +674,7 @@ struct dt_object { const struct dt_object_operations *do_ops; const struct dt_body_operations *do_body_ops; const struct dt_index_operations *do_index_ops; + const struct dt_lock_operations *do_lock_ops; }; /* @@ -865,6 +873,17 @@ local_index_find_or_create_with_fid(const struct lu_env *env, const char *name, __u32 mode, const struct dt_index_features *ft); +static inline int dt_object_lock(const struct lu_env *env, + struct dt_object *o, struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy) +{ + LASSERT(o); + LASSERT(o->do_lock_ops); + LASSERT(o->do_lock_ops->do_object_lock); + return o->do_lock_ops->do_object_lock(env, o, lh, einfo, policy); +} + int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir, const char *name, struct lu_fid *fid); diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 689ba01..45ea747 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -1588,6 +1588,11 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, int version, int opc, int canceloff, cfs_list_t *cancels, int count); + +struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len); +int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req, + const struct ldlm_request *dlm_req, + const struct ldlm_callback_suite *cbs); int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode, __u64 *flags, void *lvb, __u32 lvb_len, diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index d85b8a1..d79cd70 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -31,7 +31,7 @@ #ifndef _LUSTRE_UPDATE_H #define _LUSTRE_UPDATE_H -#define UPDATE_BUFFER_SIZE 4096 +#define UPDATE_BUFFER_SIZE 8192 struct update_request { struct dt_device *ur_dt; cfs_list_t ur_list; /* attached itself to thandle */ diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 2048cc6..d1554ea 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -200,12 +200,12 @@ struct md_op_spec { /** Create flag from client: such as MDS_OPEN_CREAT, and others. */ __u64 sp_cr_flags; - /** don't create lov objects or llog cookie - this replay */ + /** don't create lov objects or llog cookie - this replay */ unsigned int no_create:1, - /** Should mdd do lookup sanity check or not. */ - sp_cr_lookup:1; + sp_cr_lookup:1, /* do lookup sanity check or not. */ + sp_rm_entry:1; /* only remove name entry */ - /** Current lock mode for parent dir where create is performing. */ + /** Current lock mode for parent dir where create is performing. */ mdl_mode_t sp_cr_mode; /** to create directory */ @@ -285,6 +285,10 @@ struct md_object_operations { int (*moo_file_unlock)(const struct lu_env *env, struct md_object *obj, struct lov_mds_md *lmm, struct lustre_handle *lockh); + int (*moo_object_lock)(const struct lu_env *env, struct md_object *obj, + struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy); }; /** @@ -713,6 +717,16 @@ static inline int mo_file_unlock(const struct lu_env *env, struct md_object *m, return m->mo_ops->moo_file_unlock(env, m, lmm, lockh); } +static inline int mo_object_lock(const struct lu_env *env, + struct md_object *m, + struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy) +{ + LASSERT(m->mo_ops->moo_object_lock); + return m->mo_ops->moo_object_lock(env, m, lh, einfo, policy); +} + static inline int mdo_lookup(const struct lu_env *env, struct md_object *p, const struct lu_name *lname, @@ -792,8 +806,8 @@ static inline int mdo_unlink(const struct lu_env *env, const struct lu_name *lname, struct md_attr *ma) { - LASSERT(c->mo_dir_ops->mdo_unlink); - return c->mo_dir_ops->mdo_unlink(env, p, c, lname, ma); + LASSERT(p->mo_dir_ops->mdo_unlink); + return p->mo_dir_ops->mdo_unlink(env, p, c, lname, ma); } static inline int mdo_lum_lmm_cmp(const struct lu_env *env, @@ -903,5 +917,20 @@ int llo_local_objects_setup(const struct lu_env *env, int lustre_buf2som(void *buf, int rc, struct md_som_data *msd); int lustre_buf2hsm(void *buf, int rc, struct md_hsm *mh); void lustre_hsm2buf(void *buf, struct md_hsm *mh); + +#define md_cap_t(x) (x) + +#define MD_CAP_TO_MASK(x) (1 << (x)) + +#define md_cap_raised(c, flag) (md_cap_t(c) & MD_CAP_TO_MASK(flag)) + +/* capable() is copied from linux kernel! */ +static inline int md_capable(struct lu_ucred *uc, cfs_cap_t cap) +{ + if (md_cap_raised(uc->uc_cap, cap)) + return 1; + return 0; +} + /** @} md */ #endif /* _LINUX_MD_OBJECT_H */ diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 74d775d..8e1dcef 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -828,6 +828,28 @@ int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req, } EXPORT_SYMBOL(ldlm_prep_enqueue_req); +struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len) +{ + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); + ptlrpc_request_set_replen(req); + RETURN(req); +} +EXPORT_SYMBOL(ldlm_enqueue_pack); + /** * Client-side lock enqueue. * diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index f52a890..9073f85 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -1127,6 +1127,7 @@ static int ll_rmdir_generic(struct inode *dir, struct dentry *dparent, RETURN(PTR_ERR(op_data)); ll_get_child_fid(dir, name, &op_data->op_fid3); + op_data->op_fid2 = op_data->op_fid3; rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request); ll_finish_md_op_data(op_data); if (rc == 0) { @@ -1270,11 +1271,12 @@ static int ll_unlink_generic(struct inode *dir, struct dentry *dparent, if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - ll_get_child_fid(dir, name, &op_data->op_fid3); - rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request); - ll_finish_md_op_data(op_data); - if (rc) - GOTO(out, rc); + ll_get_child_fid(dir, name, &op_data->op_fid3); + op_data->op_fid2 = op_data->op_fid3; + rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request); + ll_finish_md_op_data(op_data); + if (rc) + GOTO(out, rc); ll_update_times(request, dir); ll_stats_ops_tally(ll_i2sbi(dir), LPROC_LL_UNLINK, 1); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 763a6b1..fa3e183 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1988,14 +1988,19 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; struct lmv_tgt_desc *tgt = NULL; + struct mdt_body *body; int rc; ENTRY; rc = lmv_check_connect(obd); if (rc) RETURN(rc); - - tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); +retry: + /* Send unlink requests to the MDT where the child is located */ + if (likely(!fid_is_zero(&op_data->op_fid2))) + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2); + else + tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1); if (IS_ERR(tgt)) RETURN(PTR_ERR(tgt)); @@ -2021,9 +2026,25 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data, if (rc != 0) RETURN(rc); + CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n", + PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx); + rc = md_unlink(tgt->ltd_exp, op_data, request); + if (rc != 0 && rc != -EREMOTE) + RETURN(rc); - RETURN(rc); + body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + RETURN(-EPROTO); + /* + * Not cross-ref case, just get out of here. + */ + if (likely(!(body->valid & OBD_MD_MDS))) + RETURN(0); + + /* Clearly this is a remote object, try remote MDT */ + op_data->op_fid2 = body->fid1; + goto retry; } static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 5d8f5fe..13e24cf 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -98,6 +98,7 @@ int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod, extern struct lu_object_operations lod_lu_obj_ops; extern struct lu_object_operations lod_lu_robj_ops; extern struct dt_object_operations lod_obj_ops; +extern struct dt_lock_operations lod_lock_ops; /* Slab for OSD object allocation */ cfs_mem_cache_t *lod_object_kmem; @@ -141,6 +142,7 @@ struct lu_object *lod_object_alloc(const struct lu_env *env, lu_obj = lod2lu_obj(lod_obj); dt_object_init(&lod_obj->ldo_obj, NULL, dev); lod_obj->ldo_obj.do_ops = &lod_obj_ops; + lod_obj->ldo_obj.do_lock_ops = &lod_lock_ops; if (likely(mds == lu_site2seq(dev->ld_site)->ss_node_id)) lu_obj->lo_ops = &lod_lu_obj_ops; else diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index f082c91..2f288a4 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -1131,6 +1131,26 @@ struct dt_object_operations lod_obj_ops = { .do_object_sync = lod_object_sync, }; +static int lod_object_lock(const struct lu_env *env, + struct dt_object *dt, struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy) +{ + struct dt_object *next = dt_object_child(dt); + int rc; + ENTRY; + + /* + * declare setattr on the local object + */ + rc = dt_object_lock(env, next, lh, einfo, policy); + + RETURN(rc); +} + +struct dt_lock_operations lod_lock_ops = { + .do_object_lock = lod_object_lock, +}; static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, loff_t *pos, struct lustre_capa *capa) diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 773940e..c5f65e8 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -369,7 +369,34 @@ static inline int mdd_is_sticky(const struct lu_env *env, if (tmp_la->la_uid == uc->uc_fsuid) return 0; - return !mdd_capable(uc, CFS_CAP_FOWNER); + return !md_capable(uc, CFS_CAP_FOWNER); +} + +static int mdd_may_delete_entry(const struct lu_env *env, + struct mdd_object *pobj, int check_perm) +{ + ENTRY; + + LASSERT(pobj != NULL); + if (!mdd_object_exists(pobj)) + RETURN(-ENOENT); + + if (mdd_is_dead_obj(pobj)) + RETURN(-ENOENT); + + if (check_perm) { + int rc; + rc = mdd_permission_internal_locked(env, pobj, NULL, + MAY_WRITE | MAY_EXEC, + MOR_TGT_PARENT); + if (rc) + RETURN(rc); + } + + if (mdd_is_append(pobj)) + RETURN(-EPERM); + + RETURN(0); } /* @@ -383,31 +410,21 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj, int rc = 0; ENTRY; - LASSERT(cobj); + if (pobj) { + rc = mdd_may_delete_entry(env, pobj, check_perm); + if (rc != 0) + RETURN(rc); + } + + if (cobj == NULL) + RETURN(0); + if (!mdd_object_exists(cobj)) RETURN(-ENOENT); if (mdd_is_dead_obj(cobj)) RETURN(-ESTALE); - if (pobj) { - if (!mdd_object_exists(pobj)) - RETURN(-ENOENT); - - if (mdd_is_dead_obj(pobj)) - RETURN(-ENOENT); - - if (check_perm) { - rc = mdd_permission_internal_locked(env, pobj, NULL, - MAY_WRITE | MAY_EXEC, - MOR_TGT_PARENT); - if (rc) - RETURN(rc); - } - - if (mdd_is_append(pobj)) - RETURN(-EPERM); - } if (mdd_is_sticky(env, pobj, cobj)) RETURN(-EPERM); @@ -1095,6 +1112,7 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd, const struct lu_name *name, struct md_attr *ma, struct thandle *handle) { + struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; int rc; rc = mdo_declare_index_delete(env, p, name->ln_name, handle); @@ -1105,31 +1123,37 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd, if (rc) return rc; - rc = mdo_declare_ref_del(env, c, handle); - if (rc) - return rc; + LASSERT(ma->ma_attr.la_valid & LA_CTIME); + la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; + la->la_valid = LA_CTIME | LA_MTIME; + rc = mdo_declare_attr_set(env, p, la, handle); + if (rc) + return rc; - rc = mdo_declare_ref_del(env, c, handle); - if (rc) - return rc; + if (c != NULL) { + rc = mdo_declare_ref_del(env, c, handle); + if (rc) + return rc; - rc = mdo_declare_attr_set(env, p, NULL, handle); - if (rc) - return rc; + rc = mdo_declare_ref_del(env, c, handle); + if (rc) + return rc; - rc = mdo_declare_attr_set(env, c, NULL, handle); - if (rc) - return rc; + rc = mdo_declare_attr_set(env, c, NULL, handle); + if (rc) + return rc; - rc = mdd_declare_finish_unlink(env, c, ma, handle); - if (rc) - return rc; + rc = mdd_declare_finish_unlink(env, c, ma, handle); + if (rc) + return rc; - rc = mdd_declare_links_del(env, c, handle); - if (rc != 0) - return rc; + rc = mdd_declare_links_del(env, c, handle); + if (rc != 0) + return rc; - rc = mdd_declare_changelog_store(env, mdd, name, handle); + /* FIXME: need changelog for remove entry */ + rc = mdd_declare_changelog_store(env, mdd, name, handle); + } return rc; } @@ -1142,74 +1166,96 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, struct lu_attr *cattr = &mdd_env_info(env)->mti_cattr; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_pobj = md2mdd_obj(pobj); - struct mdd_object *mdd_cobj = md2mdd_obj(cobj); + struct mdd_object *mdd_cobj = NULL; struct mdd_device *mdd = mdo2mdd(pobj); struct dynlock_handle *dlh; struct thandle *handle; - int rc, is_dir; + int rc, is_dir = 0; ENTRY; - if (mdd_object_exists(mdd_cobj) <= 0) - RETURN(-ENOENT); + /* cobj == NULL means only delete name entry */ + if (likely(cobj != NULL)) { + mdd_cobj = md2mdd_obj(cobj); + if (mdd_object_exists(mdd_cobj) == 0) + RETURN(-ENOENT); + /* currently it is assume, it could only delete + * name entry of remote directory */ + is_dir = 1; + } - handle = mdd_trans_create(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); - rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj, - lname, ma, handle); - if (rc) - GOTO(stop, rc); + rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj, + lname, ma, handle); + if (rc) + GOTO(stop, rc); - rc = mdd_trans_start(env, mdd, handle); - if (rc) - GOTO(stop, rc); + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); - dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); - if (dlh == NULL) + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); + if (dlh == NULL) GOTO(stop, rc = -ENOMEM); - mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD); - /* fetch cattr */ - rc = mdd_la_get(env, mdd_cobj, cattr, mdd_object_capa(env, mdd_cobj)); - if (rc) - GOTO(cleanup, rc); + if (likely(mdd_cobj != NULL)) { + mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD); - is_dir = S_ISDIR(cattr->la_mode); + /* fetch cattr */ + rc = mdd_la_get(env, mdd_cobj, cattr, + mdd_object_capa(env, mdd_cobj)); + if (rc) + GOTO(cleanup, rc); + + is_dir = S_ISDIR(cattr->la_mode); + + } rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr); - if (rc) - GOTO(cleanup, rc); + if (rc) + GOTO(cleanup, rc); rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle, mdd_object_capa(env, mdd_pobj)); if (rc) GOTO(cleanup, rc); - rc = mdo_ref_del(env, mdd_cobj, handle); - if (rc != 0) { - __mdd_index_insert_only(env, mdd_pobj, mdo2fid(mdd_cobj), - name, handle, - mdd_object_capa(env, mdd_pobj)); - GOTO(cleanup, rc); + if (likely(mdd_cobj != NULL)) { + rc = mdo_ref_del(env, mdd_cobj, handle); + if (rc != 0) { + __mdd_index_insert_only(env, mdd_pobj, + mdo2fid(mdd_cobj), + name, handle, + mdd_object_capa(env, mdd_pobj)); + GOTO(cleanup, rc); + } + + if (is_dir) + /* unlink dot */ + mdo_ref_del(env, mdd_cobj, handle); + + /* fetch updated nlink */ + rc = mdd_la_get(env, mdd_cobj, cattr, + mdd_object_capa(env, mdd_cobj)); + if (rc) + GOTO(cleanup, rc); } - if (is_dir) - /* unlink dot */ - mdo_ref_del(env, mdd_cobj, handle); + LASSERT(ma->ma_attr.la_valid & LA_CTIME); + la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; - /* fetch updated nlink */ - rc = mdd_la_get(env, mdd_cobj, cattr, mdd_object_capa(env, mdd_cobj)); + la->la_valid = LA_CTIME | LA_MTIME; + rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0); if (rc) GOTO(cleanup, rc); - LASSERT(ma->ma_attr.la_valid & LA_CTIME); - la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; - - la->la_valid = LA_CTIME | LA_MTIME; - rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0); - if (rc) - GOTO(cleanup, rc); + /* Enough for only unlink the entry */ + if (unlikely(mdd_cobj == NULL)) { + mdd_pdo_write_unlock(env, mdd_pobj, dlh); + GOTO(stop, rc); + } if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) { /* update ctime of an unlinked file only if it is still @@ -1651,7 +1697,6 @@ out: return rc; } - /* * Create object and insert it into namespace. */ diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index f2d21a4..d5e4286 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -433,21 +433,6 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *d); -/* mdd_permission.c */ -#define mdd_cap_t(x) (x) - -#define MDD_CAP_TO_MASK(x) (1 << (x)) - -#define mdd_cap_raised(c, flag) (mdd_cap_t(c) & MDD_CAP_TO_MASK(flag)) - -/* capable() is copied from linux kernel! */ -static inline int mdd_capable(struct lu_ucred *uc, cfs_cap_t cap) -{ - if (mdd_cap_raised(uc->uc_cap, cap)) - return 1; - return 0; -} - int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode, struct thandle *handle); int __mdd_declare_acl_init(const struct lu_env *env, struct mdd_object *obj, diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 973f893..64ea295 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -771,18 +771,18 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL); if ((uc->uc_fsuid != tmp_la->la_uid) && - !mdd_capable(uc, CFS_CAP_FOWNER)) + !md_capable(uc, CFS_CAP_FOWNER)) RETURN(-EPERM); - /* XXX: the IMMUTABLE and APPEND_ONLY flags can - * only be changed by the relevant capability. */ - if (mdd_is_immutable(obj)) - oldflags |= LUSTRE_IMMUTABLE_FL; - if (mdd_is_append(obj)) - oldflags |= LUSTRE_APPEND_FL; - if ((oldflags ^ newflags) && - !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE)) - RETURN(-EPERM); + /* XXX: the IMMUTABLE and APPEND_ONLY flags can + * only be changed by the relevant capability. */ + if (mdd_is_immutable(obj)) + oldflags |= LUSTRE_IMMUTABLE_FL; + if (mdd_is_append(obj)) + oldflags |= LUSTRE_APPEND_FL; + if ((oldflags ^ newflags) && + !md_capable(uc, CFS_CAP_LINUX_IMMUTABLE)) + RETURN(-EPERM); if (!S_ISDIR(tmp_la->la_mode)) la->la_flags &= ~LUSTRE_DIRSYNC_FL; @@ -797,7 +797,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) && !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) { if ((uc->uc_fsuid != tmp_la->la_uid) && - !mdd_capable(uc, CFS_CAP_FOWNER)) { + !md_capable(uc, CFS_CAP_FOWNER)) { rc = mdd_permission_internal(env, obj, tmp_la, MAY_WRITE); if (rc) @@ -830,7 +830,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, if (la->la_valid & LA_MODE) { if (!(flags & MDS_PERM_BYPASS) && (uc->uc_fsuid != tmp_la->la_uid) && - !mdd_capable(uc, CFS_CAP_FOWNER)) + !md_capable(uc, CFS_CAP_FOWNER)) RETURN(-EPERM); if (la->la_mode == (cfs_umode_t) -1) @@ -839,14 +839,14 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, la->la_mode = (la->la_mode & S_IALLUGO) | (tmp_la->la_mode & ~S_IALLUGO); - /* Also check the setgid bit! */ - if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ? - la->la_gid : tmp_la->la_gid) && - !mdd_capable(uc, CFS_CAP_FSETID)) - la->la_mode &= ~S_ISGID; - } else { - la->la_mode = tmp_la->la_mode; - } + /* Also check the setgid bit! */ + if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ? + la->la_gid : tmp_la->la_gid) && + !md_capable(uc, CFS_CAP_FSETID)) + la->la_mode &= ~S_ISGID; + } else { + la->la_mode = tmp_la->la_mode; + } /* Make sure a caller can chown. */ if (la->la_valid & LA_UID) { @@ -854,7 +854,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, la->la_uid = tmp_la->la_uid; if (((uc->uc_fsuid != tmp_la->la_uid) || (la->la_uid != tmp_la->la_uid)) && - !mdd_capable(uc, CFS_CAP_CHOWN)) + !md_capable(uc, CFS_CAP_CHOWN)) RETURN(-EPERM); /* If the user or group of a non-directory has been @@ -880,7 +880,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj, if (((uc->uc_fsuid != tmp_la->la_uid) || ((la->la_gid != tmp_la->la_gid) && !lustre_in_group_p(uc, la->la_gid))) && - !mdd_capable(uc, CFS_CAP_CHOWN)) + !md_capable(uc, CFS_CAP_CHOWN)) RETURN(-EPERM); /* Likewise, if the user or group of a non-directory @@ -1176,7 +1176,7 @@ static int mdd_xattr_sanity_check(const struct lu_env *env, RETURN(rc); if ((uc->uc_fsuid != tmp_la->la_uid) && - !mdd_capable(uc, CFS_CAP_FOWNER)) + !md_capable(uc, CFS_CAP_FOWNER)) RETURN(-EPERM); RETURN(rc); @@ -1802,7 +1802,7 @@ static int mdd_open_sanity_check(const struct lu_env *env, if (uc && ((uc->uc_valid == UCRED_OLD) || (uc->uc_valid == UCRED_NEW)) && (uc->uc_fsuid != tmp_la->la_uid) && - !mdd_capable(uc, CFS_CAP_FOWNER)) + !md_capable(uc, CFS_CAP_FOWNER)) RETURN(-EPERM); } #endif @@ -2167,6 +2167,18 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj) return dt_object_sync(env, mdd_object_child(mdd_obj)); } +static int mdd_object_lock(const struct lu_env *env, + struct md_object *obj, + struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + LASSERT(mdd_object_exists(mdd_obj)); + return dt_object_lock(env, mdd_object_child(mdd_obj), lh, + einfo, policy); +} + const struct md_object_operations mdd_obj_ops = { .moo_permission = mdd_permission, .moo_attr_get = mdd_attr_get, @@ -2184,4 +2196,5 @@ const struct md_object_operations mdd_obj_ops = { .moo_capa_get = mdd_capa_get, .moo_object_sync = mdd_object_sync, .moo_path = mdd_path, + .moo_object_lock = mdd_object_lock, }; diff --git a/lustre/mdd/mdd_permission.c b/lustre/mdd/mdd_permission.c index 1debe32..86e779e 100644 --- a/lustre/mdd/mdd_permission.c +++ b/lustre/mdd/mdd_permission.c @@ -318,17 +318,17 @@ int __mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj, RETURN(0); check_capabilities: - if (!(mask & MAY_EXEC) || - (la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode)) - if (mdd_capable(uc, CFS_CAP_DAC_OVERRIDE)) - RETURN(0); + if (!(mask & MAY_EXEC) || + (la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode)) + if (md_capable(uc, CFS_CAP_DAC_OVERRIDE)) + RETURN(0); - if ((mask == MAY_READ) || - (S_ISDIR(la->la_mode) && !(mask & MAY_WRITE))) - if (mdd_capable(uc, CFS_CAP_DAC_READ_SEARCH)) - RETURN(0); + if ((mask == MAY_READ) || + (S_ISDIR(la->la_mode) && !(mask & MAY_WRITE))) + if (md_capable(uc, CFS_CAP_DAC_READ_SEARCH)) + RETURN(0); - RETURN(-EACCES); + RETURN(-EACCES); } int mdd_permission(const struct lu_env *env, @@ -415,7 +415,7 @@ int mdd_permission(const struct lu_env *env, uc = lu_ucred_assert(env); if (la->la_uid != uc->uc_fsuid && - !mdd_capable(uc, CFS_CAP_FOWNER)) + !md_capable(uc, CFS_CAP_FOWNER)) rc = -EPERM; } diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index bdb13e1..15d653a 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -91,7 +91,6 @@ ldlm_mode_t mdt_dlm_lock_modes[] = { [MDL_GROUP] = LCK_GROUP }; - static struct mdt_device *mdt_dev(struct lu_device *d); static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags); static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt, @@ -142,6 +141,7 @@ void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm) { lh->mlh_pdo_hash = 0; lh->mlh_reg_mode = lm; + lh->mlh_rreg_mode = lm; lh->mlh_type = MDT_REG_LOCK; } @@ -149,6 +149,7 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm, const char *name, int namelen) { lh->mlh_reg_mode = lm; + lh->mlh_rreg_mode = lm; lh->mlh_type = MDT_PDO_LOCK; if (name != NULL && (name[0] != '\0')) { @@ -1254,7 +1255,8 @@ relock: GOTO(out_child, rc = -ENOENT); } - if (!(child_bits & MDS_INODELOCK_UPDATE)) { + if (!(child_bits & MDS_INODELOCK_UPDATE) && + mdt_object_exists(child) > 0) { struct md_attr *ma = &info->mti_attr; ma->ma_valid = 0; @@ -1331,7 +1333,8 @@ relock: (unsigned long)res_id->name[1], (unsigned long)res_id->name[2], PFID(mdt_object_fid(child))); - mdt_pack_size2body(info, child); + if (mdt_object_exists(child) > 0) + mdt_pack_size2body(info, child); } if (lock) LDLM_LOCK_PUT(lock); @@ -1697,18 +1700,19 @@ static long mdt_reint_opcode(struct mdt_thread_info *info, int mdt_reint(struct mdt_thread_info *info) { - long opc; - int rc; - - static const struct req_format *reint_fmts[REINT_MAX] = { - [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR, - [REINT_CREATE] = &RQF_MDS_REINT_CREATE, - [REINT_LINK] = &RQF_MDS_REINT_LINK, - [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK, - [REINT_RENAME] = &RQF_MDS_REINT_RENAME, - [REINT_OPEN] = &RQF_MDS_REINT_OPEN, - [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR - }; + long opc; + int rc; + + static const struct req_format *reint_fmts[REINT_MAX] = { + [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR, + [REINT_CREATE] = &RQF_MDS_REINT_CREATE, + [REINT_LINK] = &RQF_MDS_REINT_LINK, + [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK, + [REINT_RENAME] = &RQF_MDS_REINT_RENAME, + [REINT_OPEN] = &RQF_MDS_REINT_OPEN, + [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR, + [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK + }; ENTRY; @@ -2378,6 +2382,57 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, RETURN(rc); } +int mdt_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) +{ + struct lustre_handle lockh; + int rc; + + switch (flag) { + case LDLM_CB_BLOCKING: + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) { + CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc); + RETURN(rc); + } + break; + case LDLM_CB_CANCELING: + LDLM_DEBUG(lock, "Revoke remote lock\n"); + break; + default: + LBUG(); + } + RETURN(0); +} + +int mdt_remote_object_lock(struct mdt_thread_info *mti, + struct mdt_object *o, struct lustre_handle *lh, + ldlm_mode_t mode, __u64 ibits) +{ + struct ldlm_enqueue_info *einfo = &mti->mti_einfo; + ldlm_policy_data_t *policy = &mti->mti_policy; + int rc = 0; + ENTRY; + + LASSERT(mdt_object_exists(o) < 0); + + LASSERT((ibits & MDS_INODELOCK_UPDATE)); + + memset(einfo, 0, sizeof(*einfo)); + einfo->ei_type = LDLM_IBITS; + einfo->ei_mode = mode; + einfo->ei_cb_bl = mdt_md_blocking_ast; + einfo->ei_cb_cp = ldlm_completion_ast; + + memset(policy, 0, sizeof(*policy)); + policy->l_inodebits.bits = ibits; + + rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo, + policy); + RETURN(rc); +} + static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, __u64 ibits, bool nonblock, int locality) @@ -2396,7 +2451,6 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o, if (mdt_object_exists(o) < 0) { if (locality == MDT_CROSS_LOCK) { - /* cross-ref object fix */ ibits &= ~MDS_INODELOCK_UPDATE; ibits |= MDS_INODELOCK_LOOKUP; } else { @@ -2568,6 +2622,9 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref); mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref); + if (lustre_handle_is_used(&lh->mlh_rreg_lh)) + ldlm_lock_decref(&lh->mlh_rreg_lh, lh->mlh_rreg_mode); + EXIT; } @@ -2882,6 +2939,8 @@ void mdt_lock_handle_init(struct mdt_lock_handle *lh) lh->mlh_reg_mode = LCK_MINMODE; lh->mlh_pdo_lh.cookie = 0ull; lh->mlh_pdo_mode = LCK_MINMODE; + lh->mlh_rreg_lh.cookie = 0ull; + lh->mlh_rreg_mode = LCK_MINMODE; } void mdt_lock_handle_fini(struct mdt_lock_handle *lh) diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index c70d8ae..339fc1a 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -238,6 +238,10 @@ struct mdt_lock_handle { struct lustre_handle mlh_pdo_lh; ldlm_mode_t mlh_pdo_mode; unsigned int mlh_pdo_hash; + + /* Remote regular lock */ + struct lustre_handle mlh_rreg_lh; + ldlm_mode_t mlh_rreg_mode; }; enum { @@ -314,6 +318,9 @@ struct tx_arg { struct lu_buf buf; loff_t pos; } write; + struct { + struct ost_body *body; + } destroy; } u; }; @@ -657,6 +664,9 @@ void mdt_object_unlock_put(struct mdt_thread_info *, void mdt_client_compatibility(struct mdt_thread_info *info); +int mdt_remote_object_lock(struct mdt_thread_info *mti, + struct mdt_object *o, struct lustre_handle *lh, + ldlm_mode_t mode, __u64 ibits); int mdt_close_unpack(struct mdt_thread_info *info); int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op); int mdt_reint_rec(struct mdt_thread_info *, struct mdt_lock_handle *); diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index e662147..42b894a 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -1081,12 +1081,18 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) else ma->ma_attr_flags &= ~MDS_VTX_BYPASS; - info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info)); + info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info)); rc = mdt_dlmreq_unpack(info); RETURN(rc); } +static int mdt_rmentry_unpack(struct mdt_thread_info *info) +{ + info->mti_spec.sp_rm_entry = 1; + return mdt_unlink_unpack(info); +} + static int mdt_rename_unpack(struct mdt_thread_info *info) { struct lu_ucred *uc = mdt_ucred(info); @@ -1329,13 +1335,14 @@ static int mdt_setxattr_unpack(struct mdt_thread_info *info) typedef int (*reint_unpacker)(struct mdt_thread_info *info); static reint_unpacker mdt_reint_unpackers[REINT_MAX] = { - [REINT_SETATTR] = mdt_setattr_unpack, - [REINT_CREATE] = mdt_create_unpack, - [REINT_LINK] = mdt_link_unpack, - [REINT_UNLINK] = mdt_unlink_unpack, - [REINT_RENAME] = mdt_rename_unpack, - [REINT_OPEN] = mdt_open_unpack, - [REINT_SETXATTR] = mdt_setxattr_unpack + [REINT_SETATTR] = mdt_setattr_unpack, + [REINT_CREATE] = mdt_create_unpack, + [REINT_LINK] = mdt_link_unpack, + [REINT_UNLINK] = mdt_unlink_unpack, + [REINT_RENAME] = mdt_rename_unpack, + [REINT_OPEN] = mdt_open_unpack, + [REINT_SETXATTR] = mdt_setxattr_unpack, + [REINT_RMENTRY] = mdt_rmentry_unpack, }; int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op) diff --git a/lustre/mdt/mdt_mds.c b/lustre/mdt/mdt_mds.c index d5d4dde..fb37a2c 100644 --- a/lustre/mdt/mdt_mds.c +++ b/lustre/mdt/mdt_mds.c @@ -51,9 +51,7 @@ #include #include #include "mdt_internal.h" -#ifdef HAVE_QUOTA_SUPPORT -# include -#endif +#include #include #include #include diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index b906337..171ee3e 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -91,7 +91,6 @@ static void mdt_obj_version_get(struct mdt_thread_info *info, struct mdt_object *o, __u64 *version) { LASSERT(o); - LASSERT(mdt_object_exists(o) >= 0); if (mdt_object_exists(o) > 0 && !mdt_object_obf(o)) *version = dt_version_get(info->mti_env, mdt_obj2dt(o)); else @@ -308,6 +307,14 @@ static int mdt_md_create(struct mdt_thread_info *info) if (mdt_object_exists(child) < 0) { struct seq_server_site *ss; + struct lu_ucred *uc = mdt_ucred(info); + + if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) { + CERROR("%s: Creating remote dir is only " + "permitted for administrator: rc = %d\n", + mdt2obd_dev(mdt)->obd_name, -EPERM); + GOTO(out_put_child, rc = -EPERM); + } ss = mdt_seq_site(mdt); if (ss->ss_node_id != 0 && @@ -671,46 +678,115 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, RETURN(err_serious(-ENOENT)); /* - * step 1: lock the parent. + * step 1: Found the parent. */ - parent_lh = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name, - rr->rr_namelen); + mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); + if (IS_ERR(mp)) { + rc = PTR_ERR(mp); + GOTO(out, rc); + } - mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh, - MDS_INODELOCK_UPDATE); - if (IS_ERR(mp)) - GOTO(out, rc = PTR_ERR(mp)); + if (mdt_object_obf(mp)) + GOTO(put_parent, rc = -EPERM); + + parent_lh = &info->mti_lh[MDT_LH_PARENT]; + lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); + if (mdt_object_exists(mp) < 0) { + mdt_lock_reg_init(parent_lh, LCK_EX); + rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh, + parent_lh->mlh_rreg_mode, + MDS_INODELOCK_UPDATE); + if (rc != ELDLM_OK) + GOTO(put_parent, rc); + + } else { + mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name, + rr->rr_namelen); + rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE, + MDT_LOCAL_LOCK); + if (rc) + GOTO(put_parent, rc); - if (mdt_object_obf(mp)) - GOTO(out_unlock_parent, rc = -EPERM); + rc = mdt_version_get_check_save(info, mp, 0); + if (rc) + GOTO(unlock_parent, rc); + } - rc = mdt_version_get_check_save(info, mp, 0); - if (rc) - GOTO(out_unlock_parent, rc); + /* step 2: find & lock the child */ + /* lookup child object along with version checking */ + fid_zero(child_fid); + rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1); + if (rc != 0) + GOTO(unlock_parent, rc); mdt_reint_init_ma(info, ma); - /* step 2: find & lock the child */ - lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); - /* lookup child object along with version checking */ - fid_zero(child_fid); - rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1); - if (rc != 0) - GOTO(out_unlock_parent, rc); + /* We will lock the child regardless it is local or remote. No harm. */ + mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid); + if (IS_ERR(mc)) + GOTO(unlock_parent, rc = PTR_ERR(mc)); - /* We will lock the child regardless it is local or remote. No harm. */ - mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid); - if (IS_ERR(mc)) - GOTO(out_unlock_parent, rc = PTR_ERR(mc)); child_lh = &info->mti_lh[MDT_LH_CHILD]; mdt_lock_reg_init(child_lh, LCK_EX); + if (mdt_object_exists(mc) < 0) { + struct mdt_body *repbody; + + if (!fid_is_zero(rr->rr_fid2)) { + CDEBUG(D_INFO, "%s: name %s can not find "DFID"\n", + mdt2obd_dev(info->mti_mdt)->obd_name, + (char *)rr->rr_name, PFID(mdt_object_fid(mc))); + GOTO(unlock_parent, rc = -ENOENT); + } + CDEBUG(D_INFO, "%s: name %s: "DFID" is another MDT\n", + mdt2obd_dev(info->mti_mdt)->obd_name, + (char *)rr->rr_name, PFID(mdt_object_fid(mc))); + + if (info->mti_spec.sp_rm_entry) { + struct lu_ucred *uc = mdt_ucred(info); + + if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) { + CERROR("%s: unlink remote entry is only " + "permitted for administrator: rc = %d\n", + mdt2obd_dev(info->mti_mdt)->obd_name, + -EPERM); + GOTO(unlock_parent, rc = -EPERM); + } + + ma->ma_need = MA_INODE; + ma->ma_valid = 0; + mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); + rc = mdo_unlink(info->mti_env, mdt_object_child(mp), + NULL, lname, ma); + mdt_object_put(info->mti_env, mc); + GOTO(unlock_parent, rc); + } + /* Revoke the LOOKUP lock of the remote object granted by + * this MDT. Since the unlink will happen on another MDT, + * it will release the LOOKUP lock right away. Then What + * would happen if another client try to grab the LOOKUP + * lock at the same time with unlink XXX */ + mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP, + MDT_CROSS_LOCK); + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + LASSERT(repbody != NULL); + repbody->fid1 = *mdt_object_fid(mc); + repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS); + mdt_object_unlock_put(info, mc, child_lh, rc); + GOTO(unlock_parent, rc = -EREMOTE); + } else if (info->mti_spec.sp_rm_entry) { + CERROR("%s: lfs rmdir should not be used on local dir %s\n", + mdt2obd_dev(info->mti_mdt)->obd_name, + (char *)rr->rr_name); + mdt_object_put(info->mti_env, mc); + GOTO(unlock_parent, rc = -EPERM); + } + rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL, MDT_CROSS_LOCK); - if (rc != 0) { - mdt_object_put(info->mti_env, mc); - GOTO(out_unlock_parent, rc); - } + if (rc != 0) { + mdt_object_put(info->mti_env, mc); + GOTO(unlock_parent, rc); + } mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_UNLINK_WRITE); @@ -752,8 +828,10 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, EXIT; mdt_object_unlock_put(info, mc, child_lh, rc); -out_unlock_parent: - mdt_object_unlock_put(info, mp, parent_lh, rc); +unlock_parent: + mdt_object_unlock(info, mp, parent_lh, rc); +put_parent: + mdt_object_put(info->mti_env, mp); out: return rc; } @@ -1189,13 +1267,14 @@ typedef int (*mdt_reinter)(struct mdt_thread_info *info, struct mdt_lock_handle *lhc); static mdt_reinter reinters[REINT_MAX] = { - [REINT_SETATTR] = mdt_reint_setattr, - [REINT_CREATE] = mdt_reint_create, - [REINT_LINK] = mdt_reint_link, - [REINT_UNLINK] = mdt_reint_unlink, - [REINT_RENAME] = mdt_reint_rename, - [REINT_OPEN] = mdt_reint_open, - [REINT_SETXATTR] = mdt_reint_setxattr + [REINT_SETATTR] = mdt_reint_setattr, + [REINT_CREATE] = mdt_reint_create, + [REINT_LINK] = mdt_reint_link, + [REINT_UNLINK] = mdt_reint_unlink, + [REINT_RENAME] = mdt_reint_rename, + [REINT_OPEN] = mdt_reint_open, + [REINT_SETXATTR] = mdt_reint_setxattr, + [REINT_RMENTRY] = mdt_reint_unlink }; int mdt_reint_rec(struct mdt_thread_info *info, diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 8ce07f5..7d4d486 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -2309,15 +2309,9 @@ static struct inode *osd_create_remote_inode(const struct lu_env *env, oh = container_of(th, struct osd_thandle, ot_super); LASSERT(oh->ot_handle->h_transaction != NULL); -#ifdef HAVE_QUOTA_SUPPORT - osd_push_ctxt(info->oti_env, save); -#endif /* FIXME: Insert index api needs to know the mode of * the remote object. Just use S_IFDIR for now */ local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR); -#ifdef HAVE_QUOTA_SUPPORT - osd_pop_ctxt(info->oti_env, save); -#endif if (IS_ERR(local)) { CERROR("%s: create local error %d\n", osd_name(osd), (int)PTR_ERR(local)); @@ -3793,11 +3787,8 @@ static int osd_index_declare_ea_insert(const struct lu_env *env, if (rc <= 0) RETURN(rc); -#ifdef OSD_DECLARE_OP -#define OSD_OT_CREATE create -#define OSD_OT_INSERT insert -#define osd_trans_declare_op(env, oh, op, cred) OSD_DECLARE_OP(oh, op, cred) -#endif + rc = 0; + osd_trans_declare_op(env, oh, OSD_OT_CREATE, osd_dto_credits_noquota[DTO_OBJECT_CREATE]); osd_trans_declare_op(env, oh, OSD_OT_INSERT, diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 9a66737..a35b392 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -729,11 +729,14 @@ out_precreat: if (!m->opd_connect_mdt) osp_precreate_fini(m); out_last_used: - osp_last_used_fini(env, m); + if (!m->opd_connect_mdt) + osp_last_used_fini(env, m); out_proc: ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); obd_cleanup_client_import(obd); + if (m->opd_symlink) + lprocfs_remove(&m->opd_symlink); client_obd_cleanup(obd); out_ref: ptlrpcd_decref(); diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 9df10ed..f4493d0 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -128,7 +128,7 @@ static struct update_request if (!update) return ERR_PTR(-ENOMEM); - OBD_ALLOC(update->ur_buf, UPDATE_BUFFER_SIZE); + OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE); if (update->ur_buf == NULL) { OBD_FREE_PTR(update); return ERR_PTR(-ENOMEM); @@ -150,7 +150,7 @@ static void osp_destroy_update_req(struct update_request *update) cfs_list_del(&update->ur_list); if (update->ur_buf != NULL) - OBD_FREE(update->ur_buf, UPDATE_BUFFER_SIZE); + OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE); OBD_FREE_PTR(update); return; @@ -1069,3 +1069,45 @@ struct dt_object_operations osp_md_obj_ops = { .do_index_try = osp_md_index_try, }; +static int osp_md_object_lock(const struct lu_env *env, + struct dt_object *dt, + struct lustre_handle *lh, + struct ldlm_enqueue_info *einfo, + void *policy) +{ + struct osp_thread_info *info = osp_env_info(env); + struct ldlm_res_id *res_id = &info->osi_resid; + struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); + struct osp_device *osp = dt2osp_dev(dt_dev); + struct ptlrpc_request *req = NULL; + int rc = 0; + __u64 flags = 0; + ldlm_mode_t mode; + + fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id); + + mode = ldlm_lock_match(osp->opd_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, res_id, + einfo->ei_type, + (ldlm_policy_data_t *)policy, + einfo->ei_mode, lh, 0); + if (mode > 0) + return ELDLM_OK; + + req = ldlm_enqueue_pack(osp->opd_exp, 0); + if (IS_ERR(req)) + RETURN(PTR_ERR(req)); + + rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id, + (const ldlm_policy_data_t *)policy, + &flags, NULL, 0, LVB_T_NONE, lh, 0); + + ptlrpc_req_finished(req); + + return rc == ELDLM_OK ? 0 : -EIO; +} + +struct dt_lock_operations osp_md_lock_ops = { + .do_object_lock = osp_md_object_lock, +}; + diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 695a31e..2426be1 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -369,6 +369,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o, po->opo_obj.do_ops = &osp_md_obj_ops; o->lo_header->loh_attr |= LOHA_REMOTE; + po->opo_obj.do_lock_ops = &osp_md_lock_ops; /* Do not need get attr for new object */ if (!(conf != NULL && (conf->loc_flags & LOC_F_NEW) != 0)) { rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o), diff --git a/lustre/tests/replay-dual.sh b/lustre/tests/replay-dual.sh index e859a21..c765d2e 100755 --- a/lustre/tests/replay-dual.sh +++ b/lustre/tests/replay-dual.sh @@ -674,8 +674,8 @@ test_22c () { do_node $CLIENT1 mkdir -p $MOUNT1/${tdir} - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500 do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir & CLIENT_PID=$! do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 @@ -701,8 +701,8 @@ test_22d () { do_node $CLIENT1 mkdir -p $MOUNT1/${tdir} - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500 do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir & CLIENT_PID=$! do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 @@ -808,8 +808,8 @@ test_23c () { do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir || error "lfs mkdir failed" - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500 do_node $CLIENT1 rmdir $MOUNT1/$remote_dir & CLIENT_PID=$! do_facet mds${MDTIDX} lctl set_param fail_loc=0 @@ -837,8 +837,8 @@ test_23d () { do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir || error "lfs mkdir failed" - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500 do_node $CLIENT1 rmdir $MOUNT1/$remote_dir & CLIENT_PID=$! do_facet mds${MDTIDX} lctl set_param fail_loc=0 diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index 7750d01..24653fc 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -2006,8 +2006,8 @@ test_80a() { local remote_dir=$DIR/$tdir/remote_dir mkdir -p $DIR/$tdir - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500 $LFS mkdir -i $MDTIDX $remote_dir & local CLIENT_PID=$! do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 @@ -2034,8 +2034,8 @@ test_80b() { local remote_dir=$DIR/$tdir/remote_dir mkdir -p $DIR/$tdir - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500 $LFS mkdir -i $MDTIDX $remote_dir & local CLIENT_PID=$! do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 @@ -2062,8 +2062,8 @@ test_80c() { local remote_dir=$DIR/$tdir/remote_dir mkdir -p $DIR/$tdir - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500 $LFS mkdir -i $MDTIDX $remote_dir & local CLIENT_PID=$! do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 @@ -2086,8 +2086,8 @@ test_80d() { local remote_dir=$DIR/$tdir/remote_dir mkdir -p $DIR/$tdir - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500 $LFS mkdir -i $MDTIDX $remote_dir & local CLIENT_PID=$! do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 @@ -2223,8 +2223,8 @@ test_81a() { mkdir -p $DIR/$tdir $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed" - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500 rmdir $remote_dir & local CLIENT_PID=$! do_facet mds${MDTIDX} lctl set_param fail_loc=0 @@ -2253,8 +2253,8 @@ test_81b() { mkdir -p $DIR/$tdir $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed" - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500 rmdir $remote_dir & local CLIENT_PID=$! do_facet mds${MDTIDX} lctl set_param fail_loc=0 @@ -2284,8 +2284,8 @@ test_81c() { mkdir -p $DIR/$tdir $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed" - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500 rmdir $remote_dir & local CLIENT_PID=$! do_facet mds${MDTIDX} lctl set_param fail_loc=0 @@ -2311,8 +2311,8 @@ test_81d() { mkdir -p $DIR/$tdir $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed" - # OBD_FAIL_MDS_DROP_OBJ_UPDATE 0x188 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x188 + # OBD_FAIL_UPDATE_OBJ_NET 0x1500 + do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500 rmdir $remote_dir & local CLIENT_PID=$! do_facet mds${MDTIDX} lctl set_param fail_loc=0 @@ -2343,10 +2343,10 @@ test_81e() { $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed" # OBD_FAIL_MDS_REINT_NET_REP 0x119 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x119 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119 rmdir $remote_dir & local CLIENT_PID=$! - do_facet mds${MDTIDX} lctl set_param fail_loc=0 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 fail mds${MDTIDX} @@ -2374,10 +2374,10 @@ test_81f() { $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed" # OBD_FAIL_MDS_REINT_NET_REP 0x119 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x119 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119 rmdir $remote_dir & local CLIENT_PID=$! - do_facet mds${MDTIDX} lctl set_param fail_loc=0 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 fail mds$((MDTIDX + 1)) @@ -2405,10 +2405,10 @@ test_81g() { $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed" # OBD_FAIL_MDS_REINT_NET_REP 0x119 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x119 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119 rmdir $remote_dir & local CLIENT_PID=$! - do_facet mds${MDTIDX} lctl set_param fail_loc=0 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 fail mds${MDTIDX} fail mds$((MDTIDX + 1)) @@ -2432,10 +2432,10 @@ test_81h() { $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed" # OBD_FAIL_MDS_REINT_NET_REP 0x119 - do_facet mds${MDTIDX} lctl set_param fail_loc=0x119 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119 rmdir $remote_dir & local CLIENT_PID=$! - do_facet mds${MDTIDX} lctl set_param fail_loc=0 + do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0 fail mds${MDTIDX},mds$((MDTIDX + 1)) diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index 989031b..e87097b 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -3795,11 +3795,11 @@ drop_reint_reply() { } drop_update_reply() { -# OBD_FAIL_MDS_OBJ_UPDATE_NET +# OBD_FAIL_UPDATE_OBJ_NET local index=$1 shift 1 RC=0 - do_facet mds${index} lctl set_param fail_loc=0x188 + do_facet mds${index} lctl set_param fail_loc=0x1500 do_facet client "$@" || RC=$? do_facet mds${index} lctl set_param fail_loc=0 return $RC -- 1.8.3.1