From d1358cd5f723f390ede7111872ffea153e0ac502 Mon Sep 17 00:00:00 2001 From: yury Date: Sun, 22 Oct 2006 18:43:45 +0000 Subject: [PATCH] - added last chunk of ldlm part of pdiros, enabled by default, use --disable-pdirops key to disble it; - fixes in mdt_is_subdir(); - added few asserts checking f_ver invariants. --- lustre/autoconf/lustre-core.m4 | 22 ++- lustre/cmm/cmm_internal.h | 3 +- lustre/cmm/cmm_object.c | 15 +- lustre/cmm/cmm_split.c | 79 +++++---- lustre/cmm/mdc_object.c | 1 + lustre/fid/fid_lib.c | 44 +---- lustre/include/lustre/lustre_idl.h | 9 +- lustre/include/lustre_fid.h | 22 ++- lustre/include/md_object.h | 34 ++-- lustre/mdc/mdc_locks.c | 7 + lustre/mdd/mdd_dir.c | 2 +- lustre/mdt/mdt_handler.c | 352 ++++++++++++++++++++++--------------- lustre/mdt/mdt_internal.h | 70 +++++++- lustre/mdt/mdt_lib.c | 7 + lustre/mdt/mdt_open.c | 21 ++- lustre/mdt/mdt_reint.c | 95 +++++----- lustre/mdt/mdt_xattr.c | 4 +- lustre/obdclass/lu_object.c | 1 + lustre/osd/osd_igif.c | 13 +- lustre/osd/osd_igif.h | 1 - lustre/osd/osd_oi.c | 7 +- 21 files changed, 490 insertions(+), 319 deletions(-) diff --git a/lustre/autoconf/lustre-core.m4 b/lustre/autoconf/lustre-core.m4 index 88a2f5a..70618f4 100644 --- a/lustre/autoconf/lustre-core.m4 +++ b/lustre/autoconf/lustre-core.m4 @@ -703,7 +703,7 @@ fi # whether to enable quota support # AC_DEFUN([LC_CONFIG_SPLIT], -[AC_MSG_CHECKING([whether to disable split support]) +[AC_MSG_CHECKING([whether to enable split support]) AC_ARG_ENABLE([split], AC_HELP_STRING([--disable-split], [disable split support]), @@ -715,13 +715,31 @@ fi ]) # +# LC_CONFIG_PDIROPS +# +# whether to enable PDIROPS +# +AC_DEFUN([LC_CONFIG_PDIROPS], +[ +AC_MSG_CHECKING([whether to enable PDIROPS]) +AC_ARG_ENABLE([pdirops], + AC_HELP_STRING([--disable-pdirops], + [disable PDIROPS]), + [],[enable_pdirops='yes']) +AC_MSG_RESULT([$enable_pdirops]) +if test x$enable_pdirops != xno; then + AC_DEFINE(CONFIG_PDIROPS, 1, [enable PDIROPS]) +fi +]) + +# # LC_CONFIG_LDISKFS # # whether to enable various ldiskfs debugs # AC_DEFUN([LC_CONFIG_LDISKFS], [ -AC_MSG_CHECKING([whether to disable ldiskfs asserts]) +AC_MSG_CHECKING([whether to enable ldiskfs asserts]) AC_ARG_ENABLE([ldiskfs_asserts], AC_HELP_STRING([--disable-ldiskfs-asserts], [disable ldiskfs asserts]), diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h index b142cc6..def7915 100644 --- a/lustre/cmm/cmm_internal.h +++ b/lustre/cmm/cmm_internal.h @@ -151,9 +151,10 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, const char *name); int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma); + struct md_attr *ma, int *split); int cmm_try_to_split(const struct lu_env *env, struct md_object *mo); + #endif #endif /* __KERNEL__ */ diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index 19f2aca..ad9818d 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -379,13 +379,22 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env, #ifdef HAVE_SPLIT_SUPPORT { struct md_attr *ma = &cmm_env_info(env)->cmi_ma; - + int rc, split; + + memset(ma, 0, sizeof(*ma)); + /* * Check only if we need protection from split. If not - mdt * handles other cases. */ - if (lm == MDL_PW && - cmm_expect_splitting(env, mo, ma) == CMM_EXPECT_SPLIT) + rc = cmm_expect_splitting(env, mo, ma, &split); + if (rc) { + CERROR("Can't check for possible split, error %d\n", + rc); + RETURN(MDL_MINMODE); + } + + if (lm == MDL_PW && split == CMM_EXPECT_SPLIT) RETURN(MDL_EX); } #endif diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index d5e032c..a7ad53f 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -79,12 +79,12 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, /* Get LMV EA */ ma->ma_need = MA_LMV; rc = mo_attr_get(env, mp, ma); + /* Skip checking the slave dirs (mea_count is 0) */ if (rc == 0 && ma->ma_lmv->mea_count != 0) { /* - * Get stripe by name to check the name - * belongs to master dir, otherwise - * return the -ERESTART + * Get stripe by name to check the name belongs to + * master dir, otherwise return the -ERESTART */ stripe = mea_name2idx(ma->ma_lmv, name, strlen(name)); @@ -98,42 +98,50 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, } int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, - struct md_attr *ma) + struct md_attr *ma, int *split) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); - struct lu_fid *fid = NULL; - int rc = CMM_EXPECT_SPLIT; + struct lu_fid root_fid; + int rc; ENTRY; - if (cmm->cmm_tgt_count == 0) - GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); + /* + * Check first most light things like tgt count and root fid. For some + * case this style should yeild better performance. + */ + if (cmm->cmm_tgt_count == 0) { + *split = CMM_NO_SPLIT_EXPECTED; + RETURN(0); + } - ma->ma_need = MA_INODE | MA_LMV; - rc = mo_attr_get(env, mo, ma); + rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, + &root_fid); if (rc) - GOTO(cleanup, rc = CMM_NOT_SPLITTABLE); + RETURN(rc); - if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) - GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); + if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) { + *split = CMM_NOT_SPLITTABLE; + RETURN(0); + } - if (ma->ma_lmv_size) - GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); - - OBD_ALLOC_PTR(fid); - rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid); + /* MA_INODE is needed to check inode size. */ + ma->ma_need = MA_INODE | MA_LMV; + rc = mo_attr_get(env, mo, ma); if (rc) - GOTO(cleanup, rc); - - rc = CMM_EXPECT_SPLIT; - - if (lu_fid_eq(fid, cmm2fid(md2cmm_obj(mo)))) - GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); - - EXIT; -cleanup: - if (fid) - OBD_FREE_PTR(fid); - return rc; + RETURN(rc); + + if (ma->ma_valid & MA_LMV) { + *split = CMM_NOT_SPLITTABLE; + RETURN(0); + } + + if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) { + *split = CMM_NO_SPLIT_EXPECTED; + RETURN(0); + } + + *split = CMM_EXPECT_SPLIT; + RETURN(0); } #define cmm_md_size(stripes) \ @@ -496,17 +504,20 @@ int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct md_attr *ma = &cmm_env_info(env)->cmi_ma; struct lu_buf *buf; - int rc = 0; + int rc = 0, split; ENTRY; LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu))); memset(ma, 0, sizeof(*ma)); /* Step1: Checking whether the dir needs to be split. */ - rc = cmm_expect_splitting(env, mo, ma); - if (rc != CMM_EXPECT_SPLIT) + rc = cmm_expect_splitting(env, mo, ma, &split); + if (rc) + GOTO(cleanup, rc); + + if (split != CMM_EXPECT_SPLIT) GOTO(cleanup, rc = 0); - + /* * Disable trans for splitting, since there will be so many trans in * this one ops, confilct with current recovery design. diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c index 7d3c706..80ab880 100644 --- a/lustre/cmm/mdc_object.c +++ b/lustre/cmm/mdc_object.c @@ -487,6 +487,7 @@ static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo, CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src " DFID"\n", PFID(&body->fid1)); *sfid = body->fid1; + rc = -EREMOTE; } EXIT; out: diff --git a/lustre/fid/fid_lib.c b/lustre/fid/fid_lib.c index ac81fd7..ac2d921 100644 --- a/lustre/fid/fid_lib.c +++ b/lustre/fid/fid_lib.c @@ -73,6 +73,7 @@ void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src) CLASSERT(sizeof *src == sizeof fid_seq(src) + sizeof fid_oid(src) + sizeof fid_ver(src)); + LASSERT(fid_is_igif(src) || fid_ver(src) == 0); dst->f_seq = cpu_to_le64(fid_seq(src)); dst->f_oid = cpu_to_le32(fid_oid(src)); dst->f_ver = cpu_to_le32(fid_ver(src)); @@ -88,6 +89,7 @@ void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src) dst->f_seq = le64_to_cpu(fid_seq(src)); dst->f_oid = le32_to_cpu(fid_oid(src)); dst->f_ver = le32_to_cpu(fid_ver(src)); + LASSERT(fid_is_igif(dst) || fid_ver(dst) == 0); } EXPORT_SYMBOL(fid_le_to_cpu); @@ -98,6 +100,7 @@ void fid_cpu_to_be(struct lu_fid *dst, const struct lu_fid *src) CLASSERT(sizeof *src == sizeof fid_seq(src) + sizeof fid_oid(src) + sizeof fid_ver(src)); + LASSERT(fid_is_igif(src) || fid_ver(src) == 0); dst->f_seq = cpu_to_be64(fid_seq(src)); dst->f_oid = cpu_to_be32(fid_oid(src)); dst->f_ver = cpu_to_be32(fid_ver(src)); @@ -113,6 +116,7 @@ void fid_be_to_cpu(struct lu_fid *dst, const struct lu_fid *src) dst->f_seq = be64_to_cpu(fid_seq(src)); dst->f_oid = be32_to_cpu(fid_oid(src)); dst->f_ver = be32_to_cpu(fid_ver(src)); + LASSERT(fid_is_igif(dst) || fid_ver(dst) == 0); } EXPORT_SYMBOL(fid_be_to_cpu); #endif @@ -162,43 +166,3 @@ void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src) } EXPORT_SYMBOL(range_be_to_cpu); #endif - -/* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */ -int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f, - struct lustre_handle *lh, ldlm_mode_t mode, - ldlm_policy_data_t *policy, - struct ldlm_res_id *res_id) -{ - int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; - int rc; - - LASSERT(ns != NULL); - LASSERT(lh != NULL); - LASSERT(f != NULL); - - rc = ldlm_cli_enqueue_local(ns, *fid_build_res_name(f, res_id), - LDLM_IBITS, policy, mode, &flags, - ldlm_blocking_ast, ldlm_completion_ast, - NULL, NULL, 0, NULL, lh); - return rc == ELDLM_OK ? 0 : -EIO; -} -EXPORT_SYMBOL(fid_lock); - -void fid_unlock(const struct lu_fid *f, - struct lustre_handle *lh, ldlm_mode_t mode) -{ - { - /* XXX: this is debug stuff, remove it later. */ - struct ldlm_lock *lock = ldlm_handle2lock(lh); - if (!lock) { - CERROR("Invalid lock handle "LPX64"\n", - lh->cookie); - LBUG(); - } - LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name)); - LDLM_LOCK_PUT(lock); - } - ldlm_lock_decref(lh, mode); -} -EXPORT_SYMBOL(fid_unlock); - diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index ddcbd80..923e085 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -246,6 +246,11 @@ static inline int fid_is_zero(const struct lu_fid *fid) return fid_seq(fid) == 0 && fid_oid(fid) == 0; } +static inline int fid_is_igif(const struct lu_fid *fid) +{ + return fid_seq(fid) == LUSTRE_ROOT_FID_SEQ; +} + #define DFID "[0x%16.16"LPF64"x/0x%8.8x:0x%8.8x]" #define PFID(fid) \ @@ -259,9 +264,11 @@ extern void lustre_swab_lu_range(struct lu_range *range); static inline int lu_fid_eq(const struct lu_fid *f0, const struct lu_fid *f1) { - /* check that there is no alignment padding */ + /* Check that there is no alignment padding. */ CLASSERT(sizeof *f0 == sizeof f0->f_seq + sizeof f0->f_oid + sizeof f0->f_ver); + LASSERT(fid_is_igif(f0) || fid_ver(f0) == 0); + LASSERT(fid_is_igif(f1) || fid_ver(f1) == 0); return memcmp(f0, f1, sizeof *f0) == 0; } diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index febb8b9..ec666f8 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -200,24 +200,28 @@ void fid_be_to_cpu(struct lu_fid *dst, const struct lu_fid *src); struct ldlm_namespace; -int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f, - struct lustre_handle *lh, ldlm_mode_t mode, - ldlm_policy_data_t *policy, - struct ldlm_res_id *res_id); -void fid_unlock(const struct lu_fid *f, - struct lustre_handle *lh, ldlm_mode_t mode); - /* * Build (DLM) resource name from fid. */ static inline struct ldlm_res_id * -fid_build_res_name(const struct lu_fid *f, - struct ldlm_res_id *name) +fid_build_reg_res_name(const struct lu_fid *f, + struct ldlm_res_id *name) { memset(name, 0, sizeof *name); name->name[0] = fid_seq(f); name->name[1] = fid_oid(f); name->name[2] = fid_ver(f); + name->name[3] = 0ull; + return name; +} + +static inline struct ldlm_res_id * +fid_build_pdo_res_name(const struct lu_fid *f, + unsigned int hash, + struct ldlm_res_id *name) +{ + fid_build_reg_res_name(f, name); + name->name[3] = hash; return name; } diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index d907a9b..183096f 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -93,26 +93,32 @@ struct md_capainfo *md_capainfo(const struct lu_env *env); /* metadata attributes */ enum ma_valid { - MA_INODE = (1 << 0), - MA_LOV = (1 << 1), - MA_COOKIE = (1 << 2), - MA_FLAGS = (1 << 3), - MA_LMV = (1 << 4), - MA_ACL_DEF = (1 << 5) + MA_INODE = (1 << 0), + MA_LOV = (1 << 1), + MA_COOKIE = (1 << 2), + MA_FLAGS = (1 << 3), + MA_LMV = (1 << 4), + MA_ACL_DEF = (1 << 5) }; typedef enum { - MDL_MINMODE = 0, - MDL_EX = 1, - MDL_PW = 2, - MDL_PR = 4, - MDL_CW = 8, - MDL_CR = 16, - MDL_NL = 32, - MDL_GROUP = 64, + MDL_MINMODE = 0, + MDL_EX = 1, + MDL_PW = 2, + MDL_PR = 4, + MDL_CW = 8, + MDL_CR = 16, + MDL_NL = 32, + MDL_GROUP = 64, MDL_MAXMODE } mdl_mode_t; +typedef enum { + MDT_NUL_LOCK = 0, + MDT_REG_LOCK = (1 << 0), + MDT_PDO_LOCK = (1 << 1) +} mdl_type_t; + struct md_attr { __u64 ma_valid; __u64 ma_need; diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 08e92c5..31346d0 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -692,6 +692,13 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data, LDLM_IBITS, &policy, mode, &lockh); } + if (!rc) { + mode = LCK_PW; + rc = ldlm_lock_match(exp->exp_obd->obd_namespace, + LDLM_FL_BLOCK_GRANTED, &res_id, + LDLM_IBITS, &policy, mode, &lockh); + } + if (rc) { memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index c5c37ce..5486ebd 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -1119,7 +1119,7 @@ static int mdd_create(const struct lu_env *env, #endif rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), - son, ma, handle); + son, ma, handle); mdd_write_unlock(env, son); if (rc) /* diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 5c67bd9..4e6a2e7 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -11,6 +11,7 @@ * Author: Mike Shaver * Author: Nikita Danilov * Author: Huang Hua + * Author: Yury Umanets * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. @@ -154,6 +155,133 @@ void mdt_set_disposition(struct mdt_thread_info *info, rep->lock_policy_res1 |= flag; } +#ifdef CONFIG_PDIROPS +static mdl_mode_t mdt_mdl_lock_modes[] = { + [0] = MDL_MINMODE, + [1] = MDL_EX, + [2] = MDL_PW, + [3] = MDL_PR, + [4] = MDL_CW, + [5] = MDL_CR, + [6] = MDL_NL, + [7] = MDL_GROUP +}; + +static ldlm_mode_t mdt_ldlm_lock_modes[] = { + [0] = LCK_MINMODE, + [1] = LCK_EX, + [2] = LCK_PW, + [3] = LCK_PR, + [4] = LCK_CW, + [5] = LCK_CR, + [6] = LCK_NL, + [7] = LCK_GROUP +}; + +static inline mdl_mode_t mdt_ldlm_mode2mdl_mode(ldlm_mode_t mode) +{ + int idx = ffs((int)mode); + + LASSERT(idx >= 0); + LASSERT(IS_PO2(mode)); + LASSERT(idx < ARRAY_SIZE(mdt_mdl_lock_modes)); + return mdt_mdl_lock_modes[idx]; +} + +static inline ldlm_mode_t mdt_mdl_mode2ldlm_mode(mdl_mode_t mode) +{ + int idx = ffs((int)mode); + + LASSERT(idx >= 0); + LASSERT(IS_PO2(mode)); + LASSERT(idx < ARRAY_SIZE(mdt_ldlm_lock_modes)); + return mdt_ldlm_lock_modes[idx]; +} +#endif + +void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm) +{ + lh->mlh_pdo_hash = 0; + lh->mlh_reg_mode = lm; + lh->mlh_type = MDT_REG_LOCK; +} + +void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm, + const char *name, int namelen) +{ + lh->mlh_reg_mode = lm; + lh->mlh_type = MDT_PDO_LOCK; + lh->mlh_pdo_hash = (name != NULL && namelen > 0 ? + full_name_hash(name, namelen) : 0); +} + +#ifdef CONFIG_PDIROPS +static ldlm_mode_t mdt_lock_pdo_mode(struct mdt_thread_info *info, + struct mdt_object *o, + ldlm_mode_t lm) +{ + mdl_mode_t mode; + + /* + * Any dir access needs couple of locks: + * + * 1) on part of dir we gonna take lookup/modify; + * + * 2) on whole dir to protect it from concurrent splitting and/or to + * flush client's cache for readdir(). + * + * so, for a given mode and object this routine decides what lock mode + * to use for lock #2: + * + * 1) if caller's gonna lookup in dir then we need to protect dir from + * being splitted only - LCK_CR + * + * 2) if caller's gonna modify dir then we need to protect dir from + * being splitted and to flush cache - LCK_CW + * + * 3) if caller's gonna modify dir and that dir seems ready for + * splitting then we need to protect it from any type of access + * (lookup/modify/split) - LCK_EX --bzzz + */ + + LASSERT(lm != LCK_MINMODE); + + if (mdt_object_exists(o) > 0) { + /* + * Ask underlaying level its opinion about possible locks. + */ + mode = mdo_lock_mode(info->mti_env, mdt_object_child(o), + mdt_ldlm_mode2mdl_mode(lm)); + } else { + /* Default locks for non-existing objects. */ + mode = MDL_MINMODE; + } + + if (mode != MDL_MINMODE) { + /* Lower layer said what lock mode it likes to be, use it. */ + return mdt_mdl_mode2ldlm_mode(mode); + } else { + /* + * Lower layer does not want to specify locking mode. We od it + * our selves. No special protection is needed, just flush + * client's cache on modification. + */ + if (lm == LCK_EX) { + return LCK_EX; + } else if (lm == LCK_PR) { + return LCK_CR; + } else if (lm == LCK_PW) { + return LCK_CW; + } else { + CWARN("Not expected lock type (0x%x)\n", + (int)mode); + } + } + + return LCK_MINMODE; +} +#endif + static int mdt_getstatus(struct mdt_thread_info *info) { struct mdt_device *mdt = info->mti_mdt; @@ -553,7 +681,7 @@ static int mdt_is_subdir(struct mdt_thread_info *info) * Save error code to ->mode. Later it it is used for detecting the case * of remote subdir. */ - repbody->mode = rc; + repbody->mode = rc < 0 ? -rc : rc; repbody->valid = OBD_MD_FLMODE; if (rc == -EREMOTE) @@ -609,7 +737,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, struct mdt_object *child; struct md_object *next = mdt_object_child(info->mti_object); struct lu_fid *child_fid = &info->mti_tmp_fid1; - int is_resent, rc; + int is_resent, rc, namelen = 0; const char *name; struct mdt_lock_handle *lhp; struct ldlm_lock *lock; @@ -624,6 +752,9 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (name == NULL) RETURN(err_serious(-EFAULT)); + namelen = req_capsule_get_size(&info->mti_pill, &RMF_NAME, + RCL_CLIENT); + CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), name, ldlm_rep); @@ -666,7 +797,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, rc = 0; } else { mdt_lock_handle_init(lhc); - lhc->mlh_reg_mode = LCK_CR; + mdt_lock_reg_init(lhc, MDT_RD_LOCK); /* * Object's name is on another MDS, no lookup lock is @@ -674,7 +805,9 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, */ child_bits &= ~MDS_INODELOCK_LOOKUP; child_bits |= MDS_INODELOCK_UPDATE; - rc = mdt_object_lock(info, child, lhc, child_bits); + + rc = mdt_object_lock(info, child, lhc, child_bits, + MDT_LOCAL_LOCK); } if (rc == 0) { /* Finally, we can get attr for child. */ @@ -689,8 +822,9 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, /*step 1: lock parent */ lhp = &info->mti_lh[MDT_LH_PARENT]; - lhp->mlh_reg_mode = LCK_CR; - rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE); + mdt_lock_pdo_init(lhp, MDT_RD_LOCK, name, namelen); + rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE, + MDT_LOCAL_LOCK); if (rc != 0) RETURN(rc); @@ -722,8 +856,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, LDLM_LOCK_PUT(lock); } else { mdt_lock_handle_init(lhc); - lhc->mlh_reg_mode = LCK_CR; - rc = mdt_object_cr_lock(info, child, lhc, child_bits); + mdt_lock_reg_init(lhc, MDT_RD_LOCK); + + rc = mdt_object_lock(info, child, lhc, child_bits, + MDT_CROSS_LOCK); if (rc != 0) GOTO(out_child, rc); } @@ -1411,141 +1547,66 @@ struct mdt_object *mdt_object_find(const struct lu_env *env, RETURN(m); } -static mdl_mode_t mdt_mdl_lock_modes[] = { - [0] = MDL_MINMODE, - [1] = MDL_EX, - [2] = MDL_PW, - [3] = MDL_PR, - [4] = MDL_CW, - [5] = MDL_CR, - [6] = MDL_NL, - [7] = MDL_GROUP -}; - -static ldlm_mode_t mdt_ldlm_lock_modes[] = { - [0] = LCK_MINMODE, - [1] = LCK_EX, - [2] = LCK_PW, - [3] = LCK_PR, - [4] = LCK_CW, - [5] = LCK_CR, - [6] = LCK_NL, - [7] = LCK_GROUP -}; - -static inline mdl_mode_t mdt_ldlm_mode2mdl_mode(ldlm_mode_t mode) -{ - int idx = ffs((int)mode) - 1; - LASSERT(idx >= 0); - LASSERT(IS_PO2(mode)); - LASSERT(idx < ARRAY_SIZE(mdt_mdl_lock_modes)); - return mdt_mdl_lock_modes[idx]; -} - -static inline ldlm_mode_t mdt_mdl_mode2ldlm_mode(mdl_mode_t mode) -{ - int idx = ffs((int)mode) - 1; - LASSERT(idx >= 0); - LASSERT(IS_PO2(mode)); - LASSERT(idx < ARRAY_SIZE(mdt_ldlm_lock_modes)); - return mdt_ldlm_lock_modes[idx]; -} - -int mdt_lock_init_mode(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, ldlm_mode_t lm) -{ - ENTRY; - - lh->mlh_reg_mode = lm; - -#ifdef CONFIG_PDIROPS - { - mdl_mode_t mode; - - /* - * Any dir access needs couple of locks: - * - * 1) on part of dir we gonna take lookup/modify; - * - * 2) on whole dir to protect it from concurrent splitting - * and/or to flush client's cache for readdir(). - * - * so, for a given mode and object this routine decides what - * lock mode to use for lock #2: - * - * 1) if caller's gonna lookup in dir then we need to protect - * dir from being splitted only - LCK_CR - * - * 2) if caller's gonna modify dir then we need to protect dir - * from being splitted and to flush cache - LCK_CW - * - * 3) if caller's gonna modify dir and that dir seems ready for - * splitting then we need to protect it from any type of access - * (lookup/modify/split) - LCK_EX --bzzz - */ - - /* Ask underlaying level its opinion about possible locks. */ - mode = mdo_lock_mode(info->mti_env, mdt_object_child(o), - mdt_ldlm_mode2mdl_mode(lm)); - if (mode != MDL_MINMODE) { - /* Lower layer said what lock mode it likes to be, use it. */ - lh->mlh_pdo_mode = mdt_mdl_mode2ldlm_mode(mode); - } else { - /* - * Lower layer does not want to specify locking mode. We od it - * our selves. No special protection is needed, just flush - * client's cache on modification. - */ - if (lm == LCK_EX) { - lh->mlh_pdo_mode = LCK_EX; - } else if (lm == LCK_PR) { - lh->mlh_pdo_mode = LCK_CR; - } else if (lm == LCK_PW) { - lh->mlh_pdo_mode = LCK_CW; - } else { - CWARN("Not expected lock type (0x%x)\n", (int)lm); - lh->mlh_pdo_mode = LCK_MINMODE; - } - } - } -#endif - - RETURN(0); -} - int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 ibits) + struct mdt_lock_handle *lh, __u64 ibits, int locality) { + struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; ldlm_policy_data_t *policy = &info->mti_policy; struct ldlm_res_id *res_id = &info->mti_res_id; - struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; int rc; ENTRY; LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); + LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh)); LASSERT(lh->mlh_reg_mode != LCK_MINMODE); + if (mdt_object_exists(o) < 0) { - LASSERT(!(ibits & MDS_INODELOCK_UPDATE)); - LASSERT(ibits & MDS_INODELOCK_LOOKUP); + if (locality == MDT_CROSS_LOCK) { + /* cross-ref object fix */ + ibits &= ~MDS_INODELOCK_UPDATE; + ibits |= MDS_INODELOCK_LOOKUP; + } else { + LASSERT(!(ibits & MDS_INODELOCK_UPDATE)); + LASSERT(ibits & MDS_INODELOCK_LOOKUP); + } } - memset(policy, 0, sizeof *policy); - policy->l_inodebits.bits = ibits; - rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_reg_lh, - lh->mlh_reg_mode, policy, res_id); - RETURN(rc); -} + memset(policy, 0, sizeof *policy); + fid_build_reg_res_name(mdt_object_fid(o), res_id); + +#ifdef CONFIG_PDIROPS + /* + * Take PDO lock on whole directory and build correct @res_id for lock + * on part of directrory. + */ + if (lh->mlh_type == MDT_PDO_LOCK && lh->mlh_pdo_hash != 0) { + lh->mlh_pdo_mode = mdt_lock_pdo_mode(info, o, lh->mlh_reg_mode); + if (lh->mlh_pdo_mode != LCK_MINMODE) { + policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; + rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, + policy, res_id, LDLM_FL_ATOMIC_CB); + if (rc) + RETURN(rc); + } -/* lock with cross-ref fixes */ -int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o, - struct mdt_lock_handle *lh, __u64 ibits) -{ - if (mdt_object_exists(o) < 0) { - /* cross-ref object fix */ - ibits &= ~MDS_INODELOCK_UPDATE; - ibits |= MDS_INODELOCK_LOOKUP; + fid_build_pdo_res_name(mdt_object_fid(o), lh->mlh_pdo_hash, + res_id); } - return mdt_object_lock(info, o, lh, ibits); +#endif + + policy->l_inodebits.bits = ibits; + rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, + res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB); +#ifdef CONFIG_PDIROPS + if (rc) { + if (lh->mlh_type == MDT_PDO_LOCK) { + mdt_fid_unlock(&lh->mlh_pdo_lh, lh->mlh_pdo_mode); + lh->mlh_pdo_lh.cookie = 0ull; + } + } +#endif + + RETURN(rc); } /* @@ -1556,17 +1617,25 @@ int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o, void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, int decref) { - struct ptlrpc_request *req = mdt_info_req(info); - struct lustre_handle *handle = &lh->mlh_reg_lh; - ldlm_mode_t mode = lh->mlh_reg_mode; + struct ptlrpc_request *req = mdt_info_req(info); ENTRY; - if (lustre_handle_is_used(handle)) { - if (decref) - fid_unlock(mdt_object_fid(o), handle, mode); - else - ptlrpc_save_lock(req, handle, mode); - handle->cookie = 0; + /* Do not save PDO locks to request. */ + if (lustre_handle_is_used(&lh->mlh_pdo_lh)) { + mdt_fid_unlock(&lh->mlh_pdo_lh, + lh->mlh_pdo_mode); + lh->mlh_pdo_lh.cookie = 0; + } + + if (lustre_handle_is_used(&lh->mlh_reg_lh)) { + if (decref) { + mdt_fid_unlock(&lh->mlh_reg_lh, + lh->mlh_reg_mode); + } else { + ptlrpc_save_lock(req, &lh->mlh_reg_lh, + lh->mlh_reg_mode); + } + lh->mlh_reg_lh.cookie = 0; } EXIT; } @@ -1582,7 +1651,8 @@ struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info, if (!IS_ERR(o)) { int rc; - rc = mdt_object_lock(info, o, lh, ibits); + rc = mdt_object_lock(info, o, lh, ibits, + MDT_LOCAL_LOCK); if (rc != 0) { mdt_object_put(info->mti_env, o); o = ERR_PTR(rc); @@ -1851,6 +1921,7 @@ static int mdt_req_handle(struct mdt_thread_info *info, void mdt_lock_handle_init(struct mdt_lock_handle *lh) { + lh->mlh_type = MDT_PDO_LOCK; lh->mlh_reg_lh.cookie = 0ull; lh->mlh_reg_mode = LCK_MINMODE; lh->mlh_pdo_lh.cookie = 0ull; @@ -1860,6 +1931,7 @@ void mdt_lock_handle_init(struct mdt_lock_handle *lh) void mdt_lock_handle_fini(struct mdt_lock_handle *lh) { LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); + LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh)); } /* @@ -4212,7 +4284,7 @@ static int __init mdt_mod_init(void) int rc; printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n"); - + mdt_num_threads = MDT_NUM_THREADS; lprocfs_init_vars(mdt, &lvars); rc = class_register_type(&mdt_obd_device_ops, NULL, diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 495b886..fcec96c 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -205,6 +205,9 @@ struct mdt_object { }; struct mdt_lock_handle { + /* Lock type, reg for cross-ref use or pdo lock. */ + mdl_type_t mlh_type; + /* Regular lock */ struct lustre_handle mlh_reg_lh; ldlm_mode_t mlh_reg_mode; @@ -212,6 +215,7 @@ struct mdt_lock_handle { /* Pdirops lock */ struct lustre_handle mlh_pdo_lh; ldlm_mode_t mlh_pdo_mode; + unsigned int mlh_pdo_hash; }; enum { @@ -223,14 +227,21 @@ enum { MDT_LH_NR }; +enum { + MDT_LOCAL_LOCK, + MDT_CROSS_LOCK +}; + struct mdt_reint_record { mdt_reint_t rr_opcode; const struct lu_fid *rr_fid1; const struct lu_fid *rr_fid2; const char *rr_name; + int rr_namelen; const char *rr_tgt; - int rr_eadatalen; + int rr_tgtlen; const void *rr_eadata; + int rr_eadatalen; int rr_logcookielen; const struct llog_cookie *rr_logcookies; __u32 rr_flags; @@ -429,15 +440,21 @@ void mdt_set_disposition(struct mdt_thread_info *info, void mdt_clear_disposition(struct mdt_thread_info *info, struct ldlm_reply *rep, int flag); +void mdt_lock_pdo_init(struct mdt_lock_handle *lh, + ldlm_mode_t lm, const char *name, + int namelen); + +void mdt_lock_reg_init(struct mdt_lock_handle *lh, + ldlm_mode_t lm); + +int mdt_lock_setup(struct mdt_thread_info *info, + struct mdt_object *o, + struct mdt_lock_handle *lh); + int mdt_object_lock(struct mdt_thread_info *, struct mdt_object *, struct mdt_lock_handle *, - __u64); - -int mdt_object_cr_lock(struct mdt_thread_info *, - struct mdt_object *, - struct mdt_lock_handle *, - __u64); + __u64, int); void mdt_object_unlock(struct mdt_thread_info *, struct mdt_object *, @@ -450,7 +467,7 @@ struct mdt_object *mdt_object_find(const struct lu_env *, struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *, const struct lu_fid *, struct mdt_lock_handle *, - __u64 ibits); + __u64); void mdt_object_unlock_put(struct mdt_thread_info *, struct mdt_object *, struct mdt_lock_handle *, @@ -640,6 +657,32 @@ static inline int is_identity_get_disabled(struct upcall_cache *cache) return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1; } +/* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */ +static inline int mdt_fid_lock(struct ldlm_namespace *ns, + struct lustre_handle *lh, + ldlm_mode_t mode, + ldlm_policy_data_t *policy, + struct ldlm_res_id *res_id, + int flags) +{ + int rc; + + LASSERT(ns != NULL); + LASSERT(lh != NULL); + + rc = ldlm_cli_enqueue_local(ns, *res_id, LDLM_IBITS, policy, + mode, &flags, ldlm_blocking_ast, + ldlm_completion_ast, NULL, NULL, + 0, NULL, lh); + return rc == ELDLM_OK ? 0 : -EIO; +} + +static inline void mdt_fid_unlock(struct lustre_handle *lh, + ldlm_mode_t mode) +{ + ldlm_lock_decref(lh, mode); +} + /* * Capability */ @@ -663,5 +706,16 @@ static inline void mdt_set_capainfo(struct mdt_thread_info *info, int offset, ci->mc_fid[offset] = fid; ci->mc_capa[offset] = capa; } + +#ifdef CONFIG_PDIROPS +#define MDT_RD_LOCK LCK_PR +#define MDT_WR_LOCK LCK_PW +#define MDT_EX_LOCK LCK_EX +#else +#define MDT_RD_LOCK LCK_CR +#define MDT_WR_LOCK LCK_EX +#define MDT_EX_LOCK LCK_EX +#endif + #endif /* __KERNEL__ */ #endif /* _MDT_H */ diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index b907ba5..2ec8416 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -741,6 +741,8 @@ static int mdt_create_unpack(struct mdt_thread_info *info) mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA); rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); + rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); + #ifdef CONFIG_FS_POSIX_ACL if (sp->sp_cr_flags & MDS_CREATE_RMT_ACL) { if (S_ISDIR(attr->la_mode)) @@ -822,6 +824,7 @@ static int mdt_link_unpack(struct mdt_thread_info *info) rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (rr->rr_name == NULL) RETURN(-EFAULT); + rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); RETURN(0); } @@ -861,6 +864,7 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (rr->rr_name == NULL) RETURN(-EFAULT); + rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); RETURN(0); } @@ -905,6 +909,8 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT); if (rr->rr_name == NULL || rr->rr_tgt == NULL) RETURN(-EFAULT); + rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); + rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT); RETURN(0); } @@ -955,6 +961,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info) rr->rr_name = req_capsule_client_get(pill, &RMF_NAME); if (rr->rr_name == NULL) RETURN(-EFAULT); + rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { struct md_create_spec *sp = &info->mti_spec; diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 186ffeb..f6772b7 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -149,8 +149,9 @@ int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o) * In the later case, mdt_reint_setattr will do it. */ if (cancel && (info->mti_rr.rr_fid1 != NULL)) { struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD]; - lh->mlh_reg_mode = LCK_EX; - rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE); + mdt_lock_reg_init(lh, MDT_EX_LOCK); + rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE, + MDT_LOCAL_LOCK); if (rc == 0) mdt_object_unlock(info, o, lh, 1); } @@ -782,10 +783,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) } lh = &info->mti_lh[MDT_LH_PARENT]; - if (!(create_flags & MDS_OPEN_CREAT)) - lh->mlh_reg_mode = LCK_CR; - else - lh->mlh_reg_mode = LCK_EX; + if (!(create_flags & MDS_OPEN_CREAT)) { + mdt_lock_pdo_init(lh, MDT_RD_LOCK, rr->rr_name, + rr->rr_namelen); + } else { + mdt_lock_pdo_init(lh, MDT_WR_LOCK, rr->rr_name, + rr->rr_namelen); + } parent = mdt_object_find_lock(info, rr->rr_fid1, lh, MDS_INODELOCK_UPDATE); if (IS_ERR(parent)) @@ -886,10 +890,11 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) rc = 0; } else { mdt_lock_handle_init(lhc); - lhc->mlh_reg_mode = LCK_CR; + mdt_lock_reg_init(lhc, MDT_RD_LOCK); rc = mdt_object_lock(info, child, lhc, - MDS_INODELOCK_LOOKUP); + MDS_INODELOCK_LOOKUP, + MDT_LOCAL_LOCK); } repbody->fid1 = *mdt_object_fid(child); repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index abe6158..3fb783d 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -38,12 +38,12 @@ static int mdt_md_create(struct mdt_thread_info *info) { - struct mdt_device *mdt = info->mti_mdt; - struct mdt_object *parent; - struct mdt_object *child; - struct mdt_lock_handle *lh; - struct mdt_body *repbody; - struct md_attr *ma = &info->mti_attr; + struct mdt_device *mdt = info->mti_mdt; + struct mdt_object *parent; + struct mdt_object *child; + struct mdt_lock_handle *lh; + struct mdt_body *repbody; + struct md_attr *ma = &info->mti_attr; struct mdt_reint_record *rr = &info->mti_rr; int rc; ENTRY; @@ -54,7 +54,7 @@ static int mdt_md_create(struct mdt_thread_info *info) repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); lh = &info->mti_lh[MDT_LH_PARENT]; - lh->mlh_reg_mode = LCK_EX; + mdt_lock_pdo_init(lh, MDT_WR_LOCK, rr->rr_name, rr->rr_namelen); parent = mdt_object_find_lock(info, rr->rr_fid1, lh, MDS_INODELOCK_UPDATE); @@ -160,14 +160,14 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags) RETURN(0); lh = &info->mti_lh[MDT_LH_PARENT]; - lh->mlh_reg_mode = LCK_EX; + mdt_lock_pdo_init(lh, MDT_WR_LOCK, NULL, 0); if (!(flags & MRF_SETATTR_LOCKED)) { __u64 lockpart = MDS_INODELOCK_UPDATE; if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID)) lockpart |= MDS_INODELOCK_LOOKUP; - rc = mdt_object_lock(info, mo, lh, lockpart); + rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK); if (rc != 0) GOTO(out, rc); } @@ -334,8 +334,8 @@ static int mdt_reint_create(struct mdt_thread_info *info, case S_IFBLK: case S_IFIFO: case S_IFSOCK:{ - /* special file should stay on the same node as parent */ - LASSERT(strlen(info->mti_rr.rr_name) > 0); + /* Special file should stay on the same node as parent. */ + LASSERT(info->mti_rr.rr_namelen > 0); rc = mdt_md_create(info); break; } @@ -367,7 +367,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, /* step 1: lock the parent */ parent_lh = &info->mti_lh[MDT_LH_PARENT]; - parent_lh->mlh_reg_mode = LCK_EX; + mdt_lock_pdo_init(parent_lh, MDT_WR_LOCK, rr->rr_name, + rr->rr_namelen); + mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh, MDS_INODELOCK_UPDATE); if (IS_ERR(mp)) @@ -424,8 +426,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (IS_ERR(mc)) GOTO(out_unlock_parent, rc = PTR_ERR(mc)); child_lh = &info->mti_lh[MDT_LH_CHILD]; - child_lh->mlh_reg_mode = LCK_EX; - rc = mdt_object_cr_lock(info, mc, child_lh, MDS_INODELOCK_FULL); + mdt_lock_reg_init(child_lh, MDT_EX_LOCK); + rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL, + MDT_CROSS_LOCK); if (rc != 0) GOTO(out_put_child, rc); @@ -480,7 +483,7 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (rr->rr_name[0] == 0) { /* MDT holding name ask us to add ref. */ lhs = &info->mti_lh[MDT_LH_CHILD]; - lhs->mlh_reg_mode = LCK_EX; + mdt_lock_reg_init(lhs, MDT_EX_LOCK); ms = mdt_object_find_lock(info, rr->rr_fid1, lhs, MDS_INODELOCK_UPDATE); if (IS_ERR(ms)) @@ -494,7 +497,8 @@ static int mdt_reint_link(struct mdt_thread_info *info, /* step 1: find & lock the target parent dir */ lhp = &info->mti_lh[MDT_LH_PARENT]; - lhp->mlh_reg_mode = LCK_EX; + mdt_lock_pdo_init(lhp, MDT_WR_LOCK, rr->rr_name, + rr->rr_namelen); mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, MDS_INODELOCK_UPDATE); if (IS_ERR(mp)) @@ -502,12 +506,13 @@ static int mdt_reint_link(struct mdt_thread_info *info, /* step 2: find & lock the source */ lhs = &info->mti_lh[MDT_LH_CHILD]; - lhs->mlh_reg_mode = LCK_EX; + mdt_lock_reg_init(lhs, MDT_EX_LOCK); ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); if (IS_ERR(ms)) GOTO(out_unlock_parent, rc = PTR_ERR(ms)); - rc = mdt_object_cr_lock(info, ms, lhs, MDS_INODELOCK_UPDATE); + rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE, + MDT_CROSS_LOCK); if (rc != 0) GOTO(out_unlock_source, rc); @@ -545,7 +550,8 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) /* step 1: lookup & lock the tgt dir */ lh_tgtdir = &info->mti_lh[MDT_LH_PARENT]; - lh_tgtdir->mlh_reg_mode = LCK_EX; + mdt_lock_pdo_init(lh_tgtdir, MDT_WR_LOCK, rr->rr_tgt, + rr->rr_tgtlen); mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir, MDS_INODELOCK_UPDATE); if (IS_ERR(mtgtdir)) @@ -558,13 +564,15 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) if (rc != 0 && rc != -ENOENT) { GOTO(out_unlock_tgtdir, rc); } else if (rc == 0) { - /* in case of replay that name can be already inserted, - * check that and do nothing if so */ + /* + * In case of replay that name can be already inserted, check + * that and do nothing if so. + */ if (lu_fid_eq(tgt_fid, rr->rr_fid2)) GOTO(out_unlock_tgtdir, rc); lh_tgt = &info->mti_lh[MDT_LH_CHILD]; - lh_tgt->mlh_reg_mode = LCK_EX; + mdt_lock_reg_init(lh_tgt, MDT_EX_LOCK); mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt, MDS_INODELOCK_LOOKUP); @@ -584,10 +592,9 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) if (rc == 0 && mtgt) mdt_handle_last_unlink(info, mtgt, ma); - EXIT; - if (mtgt) { + if (mtgt != NULL) mdt_object_unlock_put(info, mtgt, lh_tgt, rc); - } + EXIT; out_unlock_tgtdir: mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc); out: @@ -607,7 +614,7 @@ static int mdt_rename_lock(struct mdt_thread_info *info, ENTRY; ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site; - fid_build_res_name(&LUSTRE_BFL_FID, &res_id); + fid_build_reg_res_name(&LUSTRE_BFL_FID, &res_id); if (ls->ls_control_exp == NULL) { /* @@ -640,11 +647,11 @@ static void mdt_rename_unlock(struct lustre_handle *lh) } /* - * This is is_subdir() variant, it is CMD is cmm forwards it to correct + * This is is_subdir() variant, it is CMD if cmm forwards it to correct * target. Source should not be ancestor of target dir. May be other rename * checks can be moved here later. */ -static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid) +static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid) { struct mdt_reint_record *rr = &info->mti_rr; struct lu_fid dst_fid = *rr->rr_fid2; @@ -711,17 +718,19 @@ static int mdt_reint_rename(struct mdt_thread_info *info, lh_newp = &info->mti_lh[MDT_LH_NEW]; - /* step 1: lock the source dir */ + /* step 1: lock the source dir. */ lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; - lh_srcdirp->mlh_reg_mode = LCK_EX; + mdt_lock_pdo_init(lh_srcdirp, MDT_WR_LOCK, rr->rr_name, + rr->rr_namelen); msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp, MDS_INODELOCK_UPDATE); if (IS_ERR(msrcdir)) GOTO(out_rename_lock, rc = PTR_ERR(msrcdir)); - /*step 2: find & lock the target dir*/ + /* step 2: find & lock the target dir. */ lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; - lh_tgtdirp->mlh_reg_mode = LCK_EX; + mdt_lock_pdo_init(lh_tgtdirp, MDT_WR_LOCK, rr->rr_tgt, + rr->rr_tgtlen); if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) { mdt_object_get(info->mti_env, msrcdir); mtgtdir = msrcdir; @@ -735,15 +744,16 @@ static int mdt_reint_rename(struct mdt_thread_info *info, if (rc == 0) GOTO(out_unlock_target, rc = -ESTALE); else if (rc > 0) { - /* we lock the target dir iff it is local */ + /* we lock the target dir if it is local */ rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp, - MDS_INODELOCK_UPDATE); + MDS_INODELOCK_UPDATE, + MDT_LOCAL_LOCK); if (rc != 0) GOTO(out_unlock_target, rc); } } - /*step 3: find & lock the old object*/ + /* step 3: find & lock the old object. */ rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir), rr->rr_name, old_fid); if (rc != 0) @@ -753,18 +763,18 @@ static int mdt_reint_rename(struct mdt_thread_info *info, GOTO(out_unlock_target, rc = -EINVAL); lh_oldp = &info->mti_lh[MDT_LH_OLD]; - lh_oldp->mlh_reg_mode = LCK_EX; + mdt_lock_reg_init(lh_oldp, MDT_EX_LOCK); mold = mdt_object_find_lock(info, old_fid, lh_oldp, MDS_INODELOCK_LOOKUP); if (IS_ERR(mold)) GOTO(out_unlock_target, rc = PTR_ERR(mold)); - /*step 4: find & lock the new object*/ + /* step 4: find & lock the new object. */ /* new target object may not exist now */ rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir), rr->rr_tgt, new_fid); if (rc == 0) { - /* the new_fid should have been filled at this moment*/ + /* the new_fid should have been filled at this moment */ if (lu_fid_eq(old_fid, new_fid)) GOTO(out_unlock_old, rc); @@ -772,13 +782,13 @@ static int mdt_reint_rename(struct mdt_thread_info *info, lu_fid_eq(new_fid, rr->rr_fid2)) GOTO(out_unlock_old, rc = -EINVAL); - lh_newp->mlh_reg_mode = LCK_EX; + mdt_lock_reg_init(lh_newp, MDT_EX_LOCK); mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid); if (IS_ERR(mnew)) GOTO(out_unlock_old, rc = PTR_ERR(mnew)); - rc = mdt_object_cr_lock(info, mnew, lh_newp, - MDS_INODELOCK_FULL); + rc = mdt_object_lock(info, mnew, lh_newp, + MDS_INODELOCK_FULL, MDT_CROSS_LOCK); if (rc != 0) { mdt_object_put(info->mti_env, mnew); GOTO(out_unlock_old, rc); @@ -806,8 +816,9 @@ static int mdt_reint_rename(struct mdt_thread_info *info, mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA); mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA); + /* Check if @dst is subdir of @src. */ - rc = mdt_rename_check(info, old_fid); + rc = mdt_rename_sanity(info, old_fid); if (rc) GOTO(out_unlock_new, rc); diff --git a/lustre/mdt/mdt_xattr.c b/lustre/mdt/mdt_xattr.c index 81c579e..8e899cb 100644 --- a/lustre/mdt/mdt_xattr.c +++ b/lustre/mdt/mdt_xattr.c @@ -318,8 +318,8 @@ int mdt_setxattr(struct mdt_thread_info *info) lockpart |= MDS_INODELOCK_LOOKUP; lh = &info->mti_lh[MDT_LH_PARENT]; - lh->mlh_reg_mode = LCK_EX; - rc = mdt_object_lock(info, obj, lh, lockpart); + mdt_lock_pdo_init(lh, MDT_WR_LOCK, NULL, 0); + rc = mdt_object_lock(info, obj, lh, lockpart, MDT_LOCAL_LOCK); if (rc != 0) GOTO(out, rc); diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index d06c38d..eff3744 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -130,6 +130,7 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, * This is the only place where object fid is assigned. It's constant * after this point. */ + LASSERT(fid_is_igif(f) || fid_ver(f) == 0); top->lo_header->loh_fid = *f; layers = &top->lo_header->loh_layers; do { diff --git a/lustre/osd/osd_igif.c b/lustre/osd/osd_igif.c index b15f626..9bf9870 100644 --- a/lustre/osd/osd_igif.c +++ b/lustre/osd/osd_igif.c @@ -42,27 +42,22 @@ #include "osd_oi.h" #include "osd_igif.h" -int lu_fid_is_igif(const struct lu_fid *fid) -{ - return fid_seq(fid) == LUSTRE_ROOT_FID_SEQ; -} - void lu_igif_to_id(const struct lu_fid *fid, struct osd_inode_id *id) { - LASSERT(lu_fid_is_igif(fid)); + LASSERT(fid_is_igif(fid)); id->oii_ino = lu_igif_ino(fid); id->oii_gen = lu_igif_gen(fid); } __u32 lu_igif_ino(const struct lu_fid *fid) { - LASSERT(lu_fid_is_igif(fid)); + LASSERT(fid_is_igif(fid)); return fid_oid(fid); } __u32 lu_igif_gen(const struct lu_fid *fid) { - LASSERT(lu_fid_is_igif(fid)); + LASSERT(fid_is_igif(fid)); return fid_ver(fid); } @@ -71,5 +66,5 @@ void lu_igif_build(struct lu_fid *fid, __u32 ino, __u32 gen) fid->f_seq = LUSTRE_ROOT_FID_SEQ; fid->f_oid = ino; fid->f_ver = gen; - LASSERT(lu_fid_is_igif(fid)); + LASSERT(fid_is_igif(fid)); } diff --git a/lustre/osd/osd_igif.h b/lustre/osd/osd_igif.h index d40e630..04439d1 100644 --- a/lustre/osd/osd_igif.h +++ b/lustre/osd/osd_igif.h @@ -34,7 +34,6 @@ struct lu_fid; struct osd_inode_id; -int lu_fid_is_igif(const struct lu_fid *fid); void lu_igif_to_id(const struct lu_fid *fid, struct osd_inode_id *id); __u32 lu_igif_ino(const struct lu_fid *fid); __u32 lu_igif_gen(const struct lu_fid *fid); diff --git a/lustre/osd/osd_oi.c b/lustre/osd/osd_oi.c index 9fda0f8..0494c3b 100644 --- a/lustre/osd/osd_oi.c +++ b/lustre/osd/osd_oi.c @@ -56,7 +56,6 @@ #include "osd_oi.h" /* osd_lookup(), struct osd_thread_info */ #include "osd_internal.h" -/* lu_fid_is_igif() */ #include "osd_igif.h" #include "dt_object.h" @@ -158,7 +157,7 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi, { int rc; - if (lu_fid_is_igif(fid)) { + if (fid_is_igif(fid)) { lu_igif_to_id(fid, id); rc = 0; } else { @@ -182,7 +181,7 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi, struct dt_device *dev; struct osd_inode_id *id; - if (lu_fid_is_igif(fid)) + if (fid_is_igif(fid)) return 0; idx = oi->oi_dir; @@ -205,7 +204,7 @@ int osd_oi_delete(struct osd_thread_info *info, struct dt_object *idx; struct dt_device *dev; - if (lu_fid_is_igif(fid)) + if (fid_is_igif(fid)) return 0; idx = oi->oi_dir; -- 1.8.3.1