# whether to enable quota support
#
AC_DEFUN([LC_CONFIG_SPLIT],
-[AC_MSG_CHECKING([whether to disable split support])
+[AC_MSG_CHECKING([whether to enable split support])
AC_ARG_ENABLE([split],
AC_HELP_STRING([--disable-split],
[disable split support]),
])
#
+# LC_CONFIG_PDIROPS
+#
+# whether to enable PDIROPS
+#
+AC_DEFUN([LC_CONFIG_PDIROPS],
+[
+AC_MSG_CHECKING([whether to enable PDIROPS])
+AC_ARG_ENABLE([pdirops],
+ AC_HELP_STRING([--disable-pdirops],
+ [disable PDIROPS]),
+ [],[enable_pdirops='yes'])
+AC_MSG_RESULT([$enable_pdirops])
+if test x$enable_pdirops != xno; then
+ AC_DEFINE(CONFIG_PDIROPS, 1, [enable PDIROPS])
+fi
+])
+
+#
# LC_CONFIG_LDISKFS
#
# whether to enable various ldiskfs debugs
#
AC_DEFUN([LC_CONFIG_LDISKFS],
[
-AC_MSG_CHECKING([whether to disable ldiskfs asserts])
+AC_MSG_CHECKING([whether to enable ldiskfs asserts])
AC_ARG_ENABLE([ldiskfs_asserts],
AC_HELP_STRING([--disable-ldiskfs-asserts],
[disable ldiskfs asserts]),
const char *name);
int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma);
+ struct md_attr *ma, int *split);
int cmm_try_to_split(const struct lu_env *env, struct md_object *mo);
+
#endif
#endif /* __KERNEL__ */
#ifdef HAVE_SPLIT_SUPPORT
{
struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
-
+ int rc, split;
+
+ memset(ma, 0, sizeof(*ma));
+
/*
* Check only if we need protection from split. If not - mdt
* handles other cases.
*/
- if (lm == MDL_PW &&
- cmm_expect_splitting(env, mo, ma) == CMM_EXPECT_SPLIT)
+ rc = cmm_expect_splitting(env, mo, ma, &split);
+ if (rc) {
+ CERROR("Can't check for possible split, error %d\n",
+ rc);
+ RETURN(MDL_MINMODE);
+ }
+
+ if (lm == MDL_PW && split == CMM_EXPECT_SPLIT)
RETURN(MDL_EX);
}
#endif
/* Get LMV EA */
ma->ma_need = MA_LMV;
rc = mo_attr_get(env, mp, ma);
+
/* Skip checking the slave dirs (mea_count is 0) */
if (rc == 0 && ma->ma_lmv->mea_count != 0) {
/*
- * Get stripe by name to check the name
- * belongs to master dir, otherwise
- * return the -ERESTART
+ * Get stripe by name to check the name belongs to
+ * master dir, otherwise return the -ERESTART
*/
stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
}
int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
- struct md_attr *ma)
+ struct md_attr *ma, int *split)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct lu_fid *fid = NULL;
- int rc = CMM_EXPECT_SPLIT;
+ struct lu_fid root_fid;
+ int rc;
ENTRY;
- if (cmm->cmm_tgt_count == 0)
- GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+ /*
+ * Check first most light things like tgt count and root fid. For some
+ * case this style should yeild better performance.
+ */
+ if (cmm->cmm_tgt_count == 0) {
+ *split = CMM_NO_SPLIT_EXPECTED;
+ RETURN(0);
+ }
- ma->ma_need = MA_INODE | MA_LMV;
- rc = mo_attr_get(env, mo, ma);
+ rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
+ &root_fid);
if (rc)
- GOTO(cleanup, rc = CMM_NOT_SPLITTABLE);
+ RETURN(rc);
- if (ma->ma_attr.la_size < CMM_SPLIT_SIZE)
- GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+ if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
+ *split = CMM_NOT_SPLITTABLE;
+ RETURN(0);
+ }
- if (ma->ma_lmv_size)
- GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
-
- OBD_ALLOC_PTR(fid);
- rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
+ /* MA_INODE is needed to check inode size. */
+ ma->ma_need = MA_INODE | MA_LMV;
+ rc = mo_attr_get(env, mo, ma);
if (rc)
- GOTO(cleanup, rc);
-
- rc = CMM_EXPECT_SPLIT;
-
- if (lu_fid_eq(fid, cmm2fid(md2cmm_obj(mo))))
- GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
-
- EXIT;
-cleanup:
- if (fid)
- OBD_FREE_PTR(fid);
- return rc;
+ RETURN(rc);
+
+ if (ma->ma_valid & MA_LMV) {
+ *split = CMM_NOT_SPLITTABLE;
+ RETURN(0);
+ }
+
+ if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
+ *split = CMM_NO_SPLIT_EXPECTED;
+ RETURN(0);
+ }
+
+ *split = CMM_EXPECT_SPLIT;
+ RETURN(0);
}
#define cmm_md_size(stripes) \
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
struct lu_buf *buf;
- int rc = 0;
+ int rc = 0, split;
ENTRY;
LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
memset(ma, 0, sizeof(*ma));
/* Step1: Checking whether the dir needs to be split. */
- rc = cmm_expect_splitting(env, mo, ma);
- if (rc != CMM_EXPECT_SPLIT)
+ rc = cmm_expect_splitting(env, mo, ma, &split);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ if (split != CMM_EXPECT_SPLIT)
GOTO(cleanup, rc = 0);
-
+
/*
* Disable trans for splitting, since there will be so many trans in
* this one ops, confilct with current recovery design.
CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "
DFID"\n", PFID(&body->fid1));
*sfid = body->fid1;
+ rc = -EREMOTE;
}
EXIT;
out:
CLASSERT(sizeof *src ==
sizeof fid_seq(src) +
sizeof fid_oid(src) + sizeof fid_ver(src));
+ LASSERT(fid_is_igif(src) || fid_ver(src) == 0);
dst->f_seq = cpu_to_le64(fid_seq(src));
dst->f_oid = cpu_to_le32(fid_oid(src));
dst->f_ver = cpu_to_le32(fid_ver(src));
dst->f_seq = le64_to_cpu(fid_seq(src));
dst->f_oid = le32_to_cpu(fid_oid(src));
dst->f_ver = le32_to_cpu(fid_ver(src));
+ LASSERT(fid_is_igif(dst) || fid_ver(dst) == 0);
}
EXPORT_SYMBOL(fid_le_to_cpu);
CLASSERT(sizeof *src ==
sizeof fid_seq(src) +
sizeof fid_oid(src) + sizeof fid_ver(src));
+ LASSERT(fid_is_igif(src) || fid_ver(src) == 0);
dst->f_seq = cpu_to_be64(fid_seq(src));
dst->f_oid = cpu_to_be32(fid_oid(src));
dst->f_ver = cpu_to_be32(fid_ver(src));
dst->f_seq = be64_to_cpu(fid_seq(src));
dst->f_oid = be32_to_cpu(fid_oid(src));
dst->f_ver = be32_to_cpu(fid_ver(src));
+ LASSERT(fid_is_igif(dst) || fid_ver(dst) == 0);
}
EXPORT_SYMBOL(fid_be_to_cpu);
#endif
}
EXPORT_SYMBOL(range_be_to_cpu);
#endif
-
-/* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
-int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
- struct lustre_handle *lh, ldlm_mode_t mode,
- ldlm_policy_data_t *policy,
- struct ldlm_res_id *res_id)
-{
- int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
- int rc;
-
- LASSERT(ns != NULL);
- LASSERT(lh != NULL);
- LASSERT(f != NULL);
-
- rc = ldlm_cli_enqueue_local(ns, *fid_build_res_name(f, res_id),
- LDLM_IBITS, policy, mode, &flags,
- ldlm_blocking_ast, ldlm_completion_ast,
- NULL, NULL, 0, NULL, lh);
- return rc == ELDLM_OK ? 0 : -EIO;
-}
-EXPORT_SYMBOL(fid_lock);
-
-void fid_unlock(const struct lu_fid *f,
- struct lustre_handle *lh, ldlm_mode_t mode)
-{
- {
- /* XXX: this is debug stuff, remove it later. */
- struct ldlm_lock *lock = ldlm_handle2lock(lh);
- if (!lock) {
- CERROR("Invalid lock handle "LPX64"\n",
- lh->cookie);
- LBUG();
- }
- LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
- LDLM_LOCK_PUT(lock);
- }
- ldlm_lock_decref(lh, mode);
-}
-EXPORT_SYMBOL(fid_unlock);
-
return fid_seq(fid) == 0 && fid_oid(fid) == 0;
}
+static inline int fid_is_igif(const struct lu_fid *fid)
+{
+ return fid_seq(fid) == LUSTRE_ROOT_FID_SEQ;
+}
+
#define DFID "[0x%16.16"LPF64"x/0x%8.8x:0x%8.8x]"
#define PFID(fid) \
static inline int lu_fid_eq(const struct lu_fid *f0,
const struct lu_fid *f1)
{
- /* check that there is no alignment padding */
+ /* Check that there is no alignment padding. */
CLASSERT(sizeof *f0 ==
sizeof f0->f_seq + sizeof f0->f_oid + sizeof f0->f_ver);
+ LASSERT(fid_is_igif(f0) || fid_ver(f0) == 0);
+ LASSERT(fid_is_igif(f1) || fid_ver(f1) == 0);
return memcmp(f0, f1, sizeof *f0) == 0;
}
struct ldlm_namespace;
-int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
- struct lustre_handle *lh, ldlm_mode_t mode,
- ldlm_policy_data_t *policy,
- struct ldlm_res_id *res_id);
-void fid_unlock(const struct lu_fid *f,
- struct lustre_handle *lh, ldlm_mode_t mode);
-
/*
* Build (DLM) resource name from fid.
*/
static inline struct ldlm_res_id *
-fid_build_res_name(const struct lu_fid *f,
- struct ldlm_res_id *name)
+fid_build_reg_res_name(const struct lu_fid *f,
+ struct ldlm_res_id *name)
{
memset(name, 0, sizeof *name);
name->name[0] = fid_seq(f);
name->name[1] = fid_oid(f);
name->name[2] = fid_ver(f);
+ name->name[3] = 0ull;
+ return name;
+}
+
+static inline struct ldlm_res_id *
+fid_build_pdo_res_name(const struct lu_fid *f,
+ unsigned int hash,
+ struct ldlm_res_id *name)
+{
+ fid_build_reg_res_name(f, name);
+ name->name[3] = hash;
return name;
}
/* metadata attributes */
enum ma_valid {
- MA_INODE = (1 << 0),
- MA_LOV = (1 << 1),
- MA_COOKIE = (1 << 2),
- MA_FLAGS = (1 << 3),
- MA_LMV = (1 << 4),
- MA_ACL_DEF = (1 << 5)
+ MA_INODE = (1 << 0),
+ MA_LOV = (1 << 1),
+ MA_COOKIE = (1 << 2),
+ MA_FLAGS = (1 << 3),
+ MA_LMV = (1 << 4),
+ MA_ACL_DEF = (1 << 5)
};
typedef enum {
- MDL_MINMODE = 0,
- MDL_EX = 1,
- MDL_PW = 2,
- MDL_PR = 4,
- MDL_CW = 8,
- MDL_CR = 16,
- MDL_NL = 32,
- MDL_GROUP = 64,
+ MDL_MINMODE = 0,
+ MDL_EX = 1,
+ MDL_PW = 2,
+ MDL_PR = 4,
+ MDL_CW = 8,
+ MDL_CR = 16,
+ MDL_NL = 32,
+ MDL_GROUP = 64,
MDL_MAXMODE
} mdl_mode_t;
+typedef enum {
+ MDT_NUL_LOCK = 0,
+ MDT_REG_LOCK = (1 << 0),
+ MDT_PDO_LOCK = (1 << 1)
+} mdl_type_t;
+
struct md_attr {
__u64 ma_valid;
__u64 ma_need;
LDLM_IBITS, &policy, mode, &lockh);
}
+ if (!rc) {
+ mode = LCK_PW;
+ rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
+ LDLM_FL_BLOCK_GRANTED, &res_id,
+ LDLM_IBITS, &policy, mode, &lockh);
+ }
+
if (rc) {
memcpy(&it->d.lustre.it_lock_handle, &lockh,
sizeof(lockh));
#endif
rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
- son, ma, handle);
+ son, ma, handle);
mdd_write_unlock(env, son);
if (rc)
/*
* Author: Mike Shaver <shaver@clusterfs.com>
* Author: Nikita Danilov <nikita@clusterfs.com>
* Author: Huang Hua <huanghua@clusterfs.com>
+ * Author: Yury Umanets <umka@clusterfs.com>
*
* This file is part of the Lustre file system, http://www.lustre.org
* Lustre is a trademark of Cluster File Systems, Inc.
rep->lock_policy_res1 |= flag;
}
+#ifdef CONFIG_PDIROPS
+static mdl_mode_t mdt_mdl_lock_modes[] = {
+ [0] = MDL_MINMODE,
+ [1] = MDL_EX,
+ [2] = MDL_PW,
+ [3] = MDL_PR,
+ [4] = MDL_CW,
+ [5] = MDL_CR,
+ [6] = MDL_NL,
+ [7] = MDL_GROUP
+};
+
+static ldlm_mode_t mdt_ldlm_lock_modes[] = {
+ [0] = LCK_MINMODE,
+ [1] = LCK_EX,
+ [2] = LCK_PW,
+ [3] = LCK_PR,
+ [4] = LCK_CW,
+ [5] = LCK_CR,
+ [6] = LCK_NL,
+ [7] = LCK_GROUP
+};
+
+static inline mdl_mode_t mdt_ldlm_mode2mdl_mode(ldlm_mode_t mode)
+{
+ int idx = ffs((int)mode);
+
+ LASSERT(idx >= 0);
+ LASSERT(IS_PO2(mode));
+ LASSERT(idx < ARRAY_SIZE(mdt_mdl_lock_modes));
+ return mdt_mdl_lock_modes[idx];
+}
+
+static inline ldlm_mode_t mdt_mdl_mode2ldlm_mode(mdl_mode_t mode)
+{
+ int idx = ffs((int)mode);
+
+ LASSERT(idx >= 0);
+ LASSERT(IS_PO2(mode));
+ LASSERT(idx < ARRAY_SIZE(mdt_ldlm_lock_modes));
+ return mdt_ldlm_lock_modes[idx];
+}
+#endif
+
+void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm)
+{
+ lh->mlh_pdo_hash = 0;
+ lh->mlh_reg_mode = lm;
+ lh->mlh_type = MDT_REG_LOCK;
+}
+
+void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
+ const char *name, int namelen)
+{
+ lh->mlh_reg_mode = lm;
+ lh->mlh_type = MDT_PDO_LOCK;
+ lh->mlh_pdo_hash = (name != NULL && namelen > 0 ?
+ full_name_hash(name, namelen) : 0);
+}
+
+#ifdef CONFIG_PDIROPS
+static ldlm_mode_t mdt_lock_pdo_mode(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ ldlm_mode_t lm)
+{
+ mdl_mode_t mode;
+
+ /*
+ * Any dir access needs couple of locks:
+ *
+ * 1) on part of dir we gonna take lookup/modify;
+ *
+ * 2) on whole dir to protect it from concurrent splitting and/or to
+ * flush client's cache for readdir().
+ *
+ * so, for a given mode and object this routine decides what lock mode
+ * to use for lock #2:
+ *
+ * 1) if caller's gonna lookup in dir then we need to protect dir from
+ * being splitted only - LCK_CR
+ *
+ * 2) if caller's gonna modify dir then we need to protect dir from
+ * being splitted and to flush cache - LCK_CW
+ *
+ * 3) if caller's gonna modify dir and that dir seems ready for
+ * splitting then we need to protect it from any type of access
+ * (lookup/modify/split) - LCK_EX --bzzz
+ */
+
+ LASSERT(lm != LCK_MINMODE);
+
+ if (mdt_object_exists(o) > 0) {
+ /*
+ * Ask underlaying level its opinion about possible locks.
+ */
+ mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
+ mdt_ldlm_mode2mdl_mode(lm));
+ } else {
+ /* Default locks for non-existing objects. */
+ mode = MDL_MINMODE;
+ }
+
+ if (mode != MDL_MINMODE) {
+ /* Lower layer said what lock mode it likes to be, use it. */
+ return mdt_mdl_mode2ldlm_mode(mode);
+ } else {
+ /*
+ * Lower layer does not want to specify locking mode. We od it
+ * our selves. No special protection is needed, just flush
+ * client's cache on modification.
+ */
+ if (lm == LCK_EX) {
+ return LCK_EX;
+ } else if (lm == LCK_PR) {
+ return LCK_CR;
+ } else if (lm == LCK_PW) {
+ return LCK_CW;
+ } else {
+ CWARN("Not expected lock type (0x%x)\n",
+ (int)mode);
+ }
+ }
+
+ return LCK_MINMODE;
+}
+#endif
+
static int mdt_getstatus(struct mdt_thread_info *info)
{
struct mdt_device *mdt = info->mti_mdt;
* Save error code to ->mode. Later it it is used for detecting the case
* of remote subdir.
*/
- repbody->mode = rc;
+ repbody->mode = rc < 0 ? -rc : rc;
repbody->valid = OBD_MD_FLMODE;
if (rc == -EREMOTE)
struct mdt_object *child;
struct md_object *next = mdt_object_child(info->mti_object);
struct lu_fid *child_fid = &info->mti_tmp_fid1;
- int is_resent, rc;
+ int is_resent, rc, namelen = 0;
const char *name;
struct mdt_lock_handle *lhp;
struct ldlm_lock *lock;
if (name == NULL)
RETURN(err_serious(-EFAULT));
+ namelen = req_capsule_get_size(&info->mti_pill, &RMF_NAME,
+ RCL_CLIENT);
+
CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
PFID(mdt_object_fid(parent)), name, ldlm_rep);
rc = 0;
} else {
mdt_lock_handle_init(lhc);
- lhc->mlh_reg_mode = LCK_CR;
+ mdt_lock_reg_init(lhc, MDT_RD_LOCK);
/*
* Object's name is on another MDS, no lookup lock is
*/
child_bits &= ~MDS_INODELOCK_LOOKUP;
child_bits |= MDS_INODELOCK_UPDATE;
- rc = mdt_object_lock(info, child, lhc, child_bits);
+
+ rc = mdt_object_lock(info, child, lhc, child_bits,
+ MDT_LOCAL_LOCK);
}
if (rc == 0) {
/* Finally, we can get attr for child. */
/*step 1: lock parent */
lhp = &info->mti_lh[MDT_LH_PARENT];
- lhp->mlh_reg_mode = LCK_CR;
- rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
+ mdt_lock_pdo_init(lhp, MDT_RD_LOCK, name, namelen);
+ rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
+ MDT_LOCAL_LOCK);
if (rc != 0)
RETURN(rc);
LDLM_LOCK_PUT(lock);
} else {
mdt_lock_handle_init(lhc);
- lhc->mlh_reg_mode = LCK_CR;
- rc = mdt_object_cr_lock(info, child, lhc, child_bits);
+ mdt_lock_reg_init(lhc, MDT_RD_LOCK);
+
+ rc = mdt_object_lock(info, child, lhc, child_bits,
+ MDT_CROSS_LOCK);
if (rc != 0)
GOTO(out_child, rc);
}
RETURN(m);
}
-static mdl_mode_t mdt_mdl_lock_modes[] = {
- [0] = MDL_MINMODE,
- [1] = MDL_EX,
- [2] = MDL_PW,
- [3] = MDL_PR,
- [4] = MDL_CW,
- [5] = MDL_CR,
- [6] = MDL_NL,
- [7] = MDL_GROUP
-};
-
-static ldlm_mode_t mdt_ldlm_lock_modes[] = {
- [0] = LCK_MINMODE,
- [1] = LCK_EX,
- [2] = LCK_PW,
- [3] = LCK_PR,
- [4] = LCK_CW,
- [5] = LCK_CR,
- [6] = LCK_NL,
- [7] = LCK_GROUP
-};
-
-static inline mdl_mode_t mdt_ldlm_mode2mdl_mode(ldlm_mode_t mode)
-{
- int idx = ffs((int)mode) - 1;
- LASSERT(idx >= 0);
- LASSERT(IS_PO2(mode));
- LASSERT(idx < ARRAY_SIZE(mdt_mdl_lock_modes));
- return mdt_mdl_lock_modes[idx];
-}
-
-static inline ldlm_mode_t mdt_mdl_mode2ldlm_mode(mdl_mode_t mode)
-{
- int idx = ffs((int)mode) - 1;
- LASSERT(idx >= 0);
- LASSERT(IS_PO2(mode));
- LASSERT(idx < ARRAY_SIZE(mdt_ldlm_lock_modes));
- return mdt_ldlm_lock_modes[idx];
-}
-
-int mdt_lock_init_mode(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, ldlm_mode_t lm)
-{
- ENTRY;
-
- lh->mlh_reg_mode = lm;
-
-#ifdef CONFIG_PDIROPS
- {
- mdl_mode_t mode;
-
- /*
- * Any dir access needs couple of locks:
- *
- * 1) on part of dir we gonna take lookup/modify;
- *
- * 2) on whole dir to protect it from concurrent splitting
- * and/or to flush client's cache for readdir().
- *
- * so, for a given mode and object this routine decides what
- * lock mode to use for lock #2:
- *
- * 1) if caller's gonna lookup in dir then we need to protect
- * dir from being splitted only - LCK_CR
- *
- * 2) if caller's gonna modify dir then we need to protect dir
- * from being splitted and to flush cache - LCK_CW
- *
- * 3) if caller's gonna modify dir and that dir seems ready for
- * splitting then we need to protect it from any type of access
- * (lookup/modify/split) - LCK_EX --bzzz
- */
-
- /* Ask underlaying level its opinion about possible locks. */
- mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
- mdt_ldlm_mode2mdl_mode(lm));
- if (mode != MDL_MINMODE) {
- /* Lower layer said what lock mode it likes to be, use it. */
- lh->mlh_pdo_mode = mdt_mdl_mode2ldlm_mode(mode);
- } else {
- /*
- * Lower layer does not want to specify locking mode. We od it
- * our selves. No special protection is needed, just flush
- * client's cache on modification.
- */
- if (lm == LCK_EX) {
- lh->mlh_pdo_mode = LCK_EX;
- } else if (lm == LCK_PR) {
- lh->mlh_pdo_mode = LCK_CR;
- } else if (lm == LCK_PW) {
- lh->mlh_pdo_mode = LCK_CW;
- } else {
- CWARN("Not expected lock type (0x%x)\n", (int)lm);
- lh->mlh_pdo_mode = LCK_MINMODE;
- }
- }
- }
-#endif
-
- RETURN(0);
-}
-
int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits)
+ struct mdt_lock_handle *lh, __u64 ibits, int locality)
{
+ struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
ldlm_policy_data_t *policy = &info->mti_policy;
struct ldlm_res_id *res_id = &info->mti_res_id;
- struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
int rc;
ENTRY;
LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+ LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
+
if (mdt_object_exists(o) < 0) {
- LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
- LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+ if (locality == MDT_CROSS_LOCK) {
+ /* cross-ref object fix */
+ ibits &= ~MDS_INODELOCK_UPDATE;
+ ibits |= MDS_INODELOCK_LOOKUP;
+ } else {
+ LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
+ LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+ }
}
- memset(policy, 0, sizeof *policy);
- policy->l_inodebits.bits = ibits;
- rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_reg_lh,
- lh->mlh_reg_mode, policy, res_id);
- RETURN(rc);
-}
+ memset(policy, 0, sizeof *policy);
+ fid_build_reg_res_name(mdt_object_fid(o), res_id);
+
+#ifdef CONFIG_PDIROPS
+ /*
+ * Take PDO lock on whole directory and build correct @res_id for lock
+ * on part of directrory.
+ */
+ if (lh->mlh_type == MDT_PDO_LOCK && lh->mlh_pdo_hash != 0) {
+ lh->mlh_pdo_mode = mdt_lock_pdo_mode(info, o, lh->mlh_reg_mode);
+ if (lh->mlh_pdo_mode != LCK_MINMODE) {
+ policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
+ policy, res_id, LDLM_FL_ATOMIC_CB);
+ if (rc)
+ RETURN(rc);
+ }
-/* lock with cross-ref fixes */
-int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits)
-{
- if (mdt_object_exists(o) < 0) {
- /* cross-ref object fix */
- ibits &= ~MDS_INODELOCK_UPDATE;
- ibits |= MDS_INODELOCK_LOOKUP;
+ fid_build_pdo_res_name(mdt_object_fid(o), lh->mlh_pdo_hash,
+ res_id);
}
- return mdt_object_lock(info, o, lh, ibits);
+#endif
+
+ policy->l_inodebits.bits = ibits;
+ rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
+ res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
+#ifdef CONFIG_PDIROPS
+ if (rc) {
+ if (lh->mlh_type == MDT_PDO_LOCK) {
+ mdt_fid_unlock(&lh->mlh_pdo_lh, lh->mlh_pdo_mode);
+ lh->mlh_pdo_lh.cookie = 0ull;
+ }
+ }
+#endif
+
+ RETURN(rc);
}
/*
void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle *lh, int decref)
{
- struct ptlrpc_request *req = mdt_info_req(info);
- struct lustre_handle *handle = &lh->mlh_reg_lh;
- ldlm_mode_t mode = lh->mlh_reg_mode;
+ struct ptlrpc_request *req = mdt_info_req(info);
ENTRY;
- if (lustre_handle_is_used(handle)) {
- if (decref)
- fid_unlock(mdt_object_fid(o), handle, mode);
- else
- ptlrpc_save_lock(req, handle, mode);
- handle->cookie = 0;
+ /* Do not save PDO locks to request. */
+ if (lustre_handle_is_used(&lh->mlh_pdo_lh)) {
+ mdt_fid_unlock(&lh->mlh_pdo_lh,
+ lh->mlh_pdo_mode);
+ lh->mlh_pdo_lh.cookie = 0;
+ }
+
+ if (lustre_handle_is_used(&lh->mlh_reg_lh)) {
+ if (decref) {
+ mdt_fid_unlock(&lh->mlh_reg_lh,
+ lh->mlh_reg_mode);
+ } else {
+ ptlrpc_save_lock(req, &lh->mlh_reg_lh,
+ lh->mlh_reg_mode);
+ }
+ lh->mlh_reg_lh.cookie = 0;
}
EXIT;
}
if (!IS_ERR(o)) {
int rc;
- rc = mdt_object_lock(info, o, lh, ibits);
+ rc = mdt_object_lock(info, o, lh, ibits,
+ MDT_LOCAL_LOCK);
if (rc != 0) {
mdt_object_put(info->mti_env, o);
o = ERR_PTR(rc);
void mdt_lock_handle_init(struct mdt_lock_handle *lh)
{
+ lh->mlh_type = MDT_PDO_LOCK;
lh->mlh_reg_lh.cookie = 0ull;
lh->mlh_reg_mode = LCK_MINMODE;
lh->mlh_pdo_lh.cookie = 0ull;
void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
{
LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+ LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
}
/*
int rc;
printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n");
-
+
mdt_num_threads = MDT_NUM_THREADS;
lprocfs_init_vars(mdt, &lvars);
rc = class_register_type(&mdt_obd_device_ops, NULL,
};
struct mdt_lock_handle {
+ /* Lock type, reg for cross-ref use or pdo lock. */
+ mdl_type_t mlh_type;
+
/* Regular lock */
struct lustre_handle mlh_reg_lh;
ldlm_mode_t mlh_reg_mode;
/* Pdirops lock */
struct lustre_handle mlh_pdo_lh;
ldlm_mode_t mlh_pdo_mode;
+ unsigned int mlh_pdo_hash;
};
enum {
MDT_LH_NR
};
+enum {
+ MDT_LOCAL_LOCK,
+ MDT_CROSS_LOCK
+};
+
struct mdt_reint_record {
mdt_reint_t rr_opcode;
const struct lu_fid *rr_fid1;
const struct lu_fid *rr_fid2;
const char *rr_name;
+ int rr_namelen;
const char *rr_tgt;
- int rr_eadatalen;
+ int rr_tgtlen;
const void *rr_eadata;
+ int rr_eadatalen;
int rr_logcookielen;
const struct llog_cookie *rr_logcookies;
__u32 rr_flags;
void mdt_clear_disposition(struct mdt_thread_info *info,
struct ldlm_reply *rep, int flag);
+void mdt_lock_pdo_init(struct mdt_lock_handle *lh,
+ ldlm_mode_t lm, const char *name,
+ int namelen);
+
+void mdt_lock_reg_init(struct mdt_lock_handle *lh,
+ ldlm_mode_t lm);
+
+int mdt_lock_setup(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ struct mdt_lock_handle *lh);
+
int mdt_object_lock(struct mdt_thread_info *,
struct mdt_object *,
struct mdt_lock_handle *,
- __u64);
-
-int mdt_object_cr_lock(struct mdt_thread_info *,
- struct mdt_object *,
- struct mdt_lock_handle *,
- __u64);
+ __u64, int);
void mdt_object_unlock(struct mdt_thread_info *,
struct mdt_object *,
struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *,
const struct lu_fid *,
struct mdt_lock_handle *,
- __u64 ibits);
+ __u64);
void mdt_object_unlock_put(struct mdt_thread_info *,
struct mdt_object *,
struct mdt_lock_handle *,
return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1;
}
+/* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
+static inline int mdt_fid_lock(struct ldlm_namespace *ns,
+ struct lustre_handle *lh,
+ ldlm_mode_t mode,
+ ldlm_policy_data_t *policy,
+ struct ldlm_res_id *res_id,
+ int flags)
+{
+ int rc;
+
+ LASSERT(ns != NULL);
+ LASSERT(lh != NULL);
+
+ rc = ldlm_cli_enqueue_local(ns, *res_id, LDLM_IBITS, policy,
+ mode, &flags, ldlm_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ 0, NULL, lh);
+ return rc == ELDLM_OK ? 0 : -EIO;
+}
+
+static inline void mdt_fid_unlock(struct lustre_handle *lh,
+ ldlm_mode_t mode)
+{
+ ldlm_lock_decref(lh, mode);
+}
+
/*
* Capability
*/
ci->mc_fid[offset] = fid;
ci->mc_capa[offset] = capa;
}
+
+#ifdef CONFIG_PDIROPS
+#define MDT_RD_LOCK LCK_PR
+#define MDT_WR_LOCK LCK_PW
+#define MDT_EX_LOCK LCK_EX
+#else
+#define MDT_RD_LOCK LCK_CR
+#define MDT_WR_LOCK LCK_EX
+#define MDT_EX_LOCK LCK_EX
+#endif
+
#endif /* __KERNEL__ */
#endif /* _MDT_H */
mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+ rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
+
#ifdef CONFIG_FS_POSIX_ACL
if (sp->sp_cr_flags & MDS_CREATE_RMT_ACL) {
if (S_ISDIR(attr->la_mode))
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
RETURN(-EFAULT);
+ rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
RETURN(0);
}
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
RETURN(-EFAULT);
+ rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
RETURN(0);
}
rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
if (rr->rr_name == NULL || rr->rr_tgt == NULL)
RETURN(-EFAULT);
+ rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
+ rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT);
RETURN(0);
}
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
RETURN(-EFAULT);
+ rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
struct md_create_spec *sp = &info->mti_spec;
* In the later case, mdt_reint_setattr will do it. */
if (cancel && (info->mti_rr.rr_fid1 != NULL)) {
struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD];
- lh->mlh_reg_mode = LCK_EX;
- rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE);
+ mdt_lock_reg_init(lh, MDT_EX_LOCK);
+ rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE,
+ MDT_LOCAL_LOCK);
if (rc == 0)
mdt_object_unlock(info, o, lh, 1);
}
}
lh = &info->mti_lh[MDT_LH_PARENT];
- if (!(create_flags & MDS_OPEN_CREAT))
- lh->mlh_reg_mode = LCK_CR;
- else
- lh->mlh_reg_mode = LCK_EX;
+ if (!(create_flags & MDS_OPEN_CREAT)) {
+ mdt_lock_pdo_init(lh, MDT_RD_LOCK, rr->rr_name,
+ rr->rr_namelen);
+ } else {
+ mdt_lock_pdo_init(lh, MDT_WR_LOCK, rr->rr_name,
+ rr->rr_namelen);
+ }
parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
MDS_INODELOCK_UPDATE);
if (IS_ERR(parent))
rc = 0;
} else {
mdt_lock_handle_init(lhc);
- lhc->mlh_reg_mode = LCK_CR;
+ mdt_lock_reg_init(lhc, MDT_RD_LOCK);
rc = mdt_object_lock(info, child, lhc,
- MDS_INODELOCK_LOOKUP);
+ MDS_INODELOCK_LOOKUP,
+ MDT_LOCAL_LOCK);
}
repbody->fid1 = *mdt_object_fid(child);
repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
static int mdt_md_create(struct mdt_thread_info *info)
{
- struct mdt_device *mdt = info->mti_mdt;
- struct mdt_object *parent;
- struct mdt_object *child;
- struct mdt_lock_handle *lh;
- struct mdt_body *repbody;
- struct md_attr *ma = &info->mti_attr;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_object *parent;
+ struct mdt_object *child;
+ struct mdt_lock_handle *lh;
+ struct mdt_body *repbody;
+ struct md_attr *ma = &info->mti_attr;
struct mdt_reint_record *rr = &info->mti_rr;
int rc;
ENTRY;
repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
lh = &info->mti_lh[MDT_LH_PARENT];
- lh->mlh_reg_mode = LCK_EX;
+ mdt_lock_pdo_init(lh, MDT_WR_LOCK, rr->rr_name, rr->rr_namelen);
parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
MDS_INODELOCK_UPDATE);
RETURN(0);
lh = &info->mti_lh[MDT_LH_PARENT];
- lh->mlh_reg_mode = LCK_EX;
+ mdt_lock_pdo_init(lh, MDT_WR_LOCK, NULL, 0);
if (!(flags & MRF_SETATTR_LOCKED)) {
__u64 lockpart = MDS_INODELOCK_UPDATE;
if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
lockpart |= MDS_INODELOCK_LOOKUP;
- rc = mdt_object_lock(info, mo, lh, lockpart);
+ rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
if (rc != 0)
GOTO(out, rc);
}
case S_IFBLK:
case S_IFIFO:
case S_IFSOCK:{
- /* special file should stay on the same node as parent */
- LASSERT(strlen(info->mti_rr.rr_name) > 0);
+ /* Special file should stay on the same node as parent. */
+ LASSERT(info->mti_rr.rr_namelen > 0);
rc = mdt_md_create(info);
break;
}
/* step 1: lock the parent */
parent_lh = &info->mti_lh[MDT_LH_PARENT];
- parent_lh->mlh_reg_mode = LCK_EX;
+ mdt_lock_pdo_init(parent_lh, MDT_WR_LOCK, rr->rr_name,
+ rr->rr_namelen);
+
mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
MDS_INODELOCK_UPDATE);
if (IS_ERR(mp))
if (IS_ERR(mc))
GOTO(out_unlock_parent, rc = PTR_ERR(mc));
child_lh = &info->mti_lh[MDT_LH_CHILD];
- child_lh->mlh_reg_mode = LCK_EX;
- rc = mdt_object_cr_lock(info, mc, child_lh, MDS_INODELOCK_FULL);
+ mdt_lock_reg_init(child_lh, MDT_EX_LOCK);
+ rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
+ MDT_CROSS_LOCK);
if (rc != 0)
GOTO(out_put_child, rc);
if (rr->rr_name[0] == 0) {
/* MDT holding name ask us to add ref. */
lhs = &info->mti_lh[MDT_LH_CHILD];
- lhs->mlh_reg_mode = LCK_EX;
+ mdt_lock_reg_init(lhs, MDT_EX_LOCK);
ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
MDS_INODELOCK_UPDATE);
if (IS_ERR(ms))
/* step 1: find & lock the target parent dir */
lhp = &info->mti_lh[MDT_LH_PARENT];
- lhp->mlh_reg_mode = LCK_EX;
+ mdt_lock_pdo_init(lhp, MDT_WR_LOCK, rr->rr_name,
+ rr->rr_namelen);
mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
MDS_INODELOCK_UPDATE);
if (IS_ERR(mp))
/* step 2: find & lock the source */
lhs = &info->mti_lh[MDT_LH_CHILD];
- lhs->mlh_reg_mode = LCK_EX;
+ mdt_lock_reg_init(lhs, MDT_EX_LOCK);
ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
if (IS_ERR(ms))
GOTO(out_unlock_parent, rc = PTR_ERR(ms));
- rc = mdt_object_cr_lock(info, ms, lhs, MDS_INODELOCK_UPDATE);
+ rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
+ MDT_CROSS_LOCK);
if (rc != 0)
GOTO(out_unlock_source, rc);
/* step 1: lookup & lock the tgt dir */
lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
- lh_tgtdir->mlh_reg_mode = LCK_EX;
+ mdt_lock_pdo_init(lh_tgtdir, MDT_WR_LOCK, rr->rr_tgt,
+ rr->rr_tgtlen);
mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
MDS_INODELOCK_UPDATE);
if (IS_ERR(mtgtdir))
if (rc != 0 && rc != -ENOENT) {
GOTO(out_unlock_tgtdir, rc);
} else if (rc == 0) {
- /* in case of replay that name can be already inserted,
- * check that and do nothing if so */
+ /*
+ * In case of replay that name can be already inserted, check
+ * that and do nothing if so.
+ */
if (lu_fid_eq(tgt_fid, rr->rr_fid2))
GOTO(out_unlock_tgtdir, rc);
lh_tgt = &info->mti_lh[MDT_LH_CHILD];
- lh_tgt->mlh_reg_mode = LCK_EX;
+ mdt_lock_reg_init(lh_tgt, MDT_EX_LOCK);
mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
MDS_INODELOCK_LOOKUP);
if (rc == 0 && mtgt)
mdt_handle_last_unlink(info, mtgt, ma);
- EXIT;
- if (mtgt) {
+ if (mtgt != NULL)
mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
- }
+ EXIT;
out_unlock_tgtdir:
mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
out:
ENTRY;
ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
- fid_build_res_name(&LUSTRE_BFL_FID, &res_id);
+ fid_build_reg_res_name(&LUSTRE_BFL_FID, &res_id);
if (ls->ls_control_exp == NULL) {
/*
}
/*
- * This is is_subdir() variant, it is CMD is cmm forwards it to correct
+ * This is is_subdir() variant, it is CMD if cmm forwards it to correct
* target. Source should not be ancestor of target dir. May be other rename
* checks can be moved here later.
*/
-static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid)
+static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
{
struct mdt_reint_record *rr = &info->mti_rr;
struct lu_fid dst_fid = *rr->rr_fid2;
lh_newp = &info->mti_lh[MDT_LH_NEW];
- /* step 1: lock the source dir */
+ /* step 1: lock the source dir. */
lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
- lh_srcdirp->mlh_reg_mode = LCK_EX;
+ mdt_lock_pdo_init(lh_srcdirp, MDT_WR_LOCK, rr->rr_name,
+ rr->rr_namelen);
msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
MDS_INODELOCK_UPDATE);
if (IS_ERR(msrcdir))
GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
- /*step 2: find & lock the target dir*/
+ /* step 2: find & lock the target dir. */
lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
- lh_tgtdirp->mlh_reg_mode = LCK_EX;
+ mdt_lock_pdo_init(lh_tgtdirp, MDT_WR_LOCK, rr->rr_tgt,
+ rr->rr_tgtlen);
if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
mdt_object_get(info->mti_env, msrcdir);
mtgtdir = msrcdir;
if (rc == 0)
GOTO(out_unlock_target, rc = -ESTALE);
else if (rc > 0) {
- /* we lock the target dir iff it is local */
+ /* we lock the target dir if it is local */
rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
- MDS_INODELOCK_UPDATE);
+ MDS_INODELOCK_UPDATE,
+ MDT_LOCAL_LOCK);
if (rc != 0)
GOTO(out_unlock_target, rc);
}
}
- /*step 3: find & lock the old object*/
+ /* step 3: find & lock the old object. */
rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
rr->rr_name, old_fid);
if (rc != 0)
GOTO(out_unlock_target, rc = -EINVAL);
lh_oldp = &info->mti_lh[MDT_LH_OLD];
- lh_oldp->mlh_reg_mode = LCK_EX;
+ mdt_lock_reg_init(lh_oldp, MDT_EX_LOCK);
mold = mdt_object_find_lock(info, old_fid, lh_oldp,
MDS_INODELOCK_LOOKUP);
if (IS_ERR(mold))
GOTO(out_unlock_target, rc = PTR_ERR(mold));
- /*step 4: find & lock the new object*/
+ /* step 4: find & lock the new object. */
/* new target object may not exist now */
rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
rr->rr_tgt, new_fid);
if (rc == 0) {
- /* the new_fid should have been filled at this moment*/
+ /* the new_fid should have been filled at this moment */
if (lu_fid_eq(old_fid, new_fid))
GOTO(out_unlock_old, rc);
lu_fid_eq(new_fid, rr->rr_fid2))
GOTO(out_unlock_old, rc = -EINVAL);
- lh_newp->mlh_reg_mode = LCK_EX;
+ mdt_lock_reg_init(lh_newp, MDT_EX_LOCK);
mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
if (IS_ERR(mnew))
GOTO(out_unlock_old, rc = PTR_ERR(mnew));
- rc = mdt_object_cr_lock(info, mnew, lh_newp,
- MDS_INODELOCK_FULL);
+ rc = mdt_object_lock(info, mnew, lh_newp,
+ MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
if (rc != 0) {
mdt_object_put(info->mti_env, mnew);
GOTO(out_unlock_old, rc);
mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
+
/* Check if @dst is subdir of @src. */
- rc = mdt_rename_check(info, old_fid);
+ rc = mdt_rename_sanity(info, old_fid);
if (rc)
GOTO(out_unlock_new, rc);
lockpart |= MDS_INODELOCK_LOOKUP;
lh = &info->mti_lh[MDT_LH_PARENT];
- lh->mlh_reg_mode = LCK_EX;
- rc = mdt_object_lock(info, obj, lh, lockpart);
+ mdt_lock_pdo_init(lh, MDT_WR_LOCK, NULL, 0);
+ rc = mdt_object_lock(info, obj, lh, lockpart, MDT_LOCAL_LOCK);
if (rc != 0)
GOTO(out, rc);
* This is the only place where object fid is assigned. It's constant
* after this point.
*/
+ LASSERT(fid_is_igif(f) || fid_ver(f) == 0);
top->lo_header->loh_fid = *f;
layers = &top->lo_header->loh_layers;
do {
#include "osd_oi.h"
#include "osd_igif.h"
-int lu_fid_is_igif(const struct lu_fid *fid)
-{
- return fid_seq(fid) == LUSTRE_ROOT_FID_SEQ;
-}
-
void lu_igif_to_id(const struct lu_fid *fid, struct osd_inode_id *id)
{
- LASSERT(lu_fid_is_igif(fid));
+ LASSERT(fid_is_igif(fid));
id->oii_ino = lu_igif_ino(fid);
id->oii_gen = lu_igif_gen(fid);
}
__u32 lu_igif_ino(const struct lu_fid *fid)
{
- LASSERT(lu_fid_is_igif(fid));
+ LASSERT(fid_is_igif(fid));
return fid_oid(fid);
}
__u32 lu_igif_gen(const struct lu_fid *fid)
{
- LASSERT(lu_fid_is_igif(fid));
+ LASSERT(fid_is_igif(fid));
return fid_ver(fid);
}
fid->f_seq = LUSTRE_ROOT_FID_SEQ;
fid->f_oid = ino;
fid->f_ver = gen;
- LASSERT(lu_fid_is_igif(fid));
+ LASSERT(fid_is_igif(fid));
}
struct lu_fid;
struct osd_inode_id;
-int lu_fid_is_igif(const struct lu_fid *fid);
void lu_igif_to_id(const struct lu_fid *fid, struct osd_inode_id *id);
__u32 lu_igif_ino(const struct lu_fid *fid);
__u32 lu_igif_gen(const struct lu_fid *fid);
#include "osd_oi.h"
/* osd_lookup(), struct osd_thread_info */
#include "osd_internal.h"
-/* lu_fid_is_igif() */
#include "osd_igif.h"
#include "dt_object.h"
{
int rc;
- if (lu_fid_is_igif(fid)) {
+ if (fid_is_igif(fid)) {
lu_igif_to_id(fid, id);
rc = 0;
} else {
struct dt_device *dev;
struct osd_inode_id *id;
- if (lu_fid_is_igif(fid))
+ if (fid_is_igif(fid))
return 0;
idx = oi->oi_dir;
struct dt_object *idx;
struct dt_device *dev;
- if (lu_fid_is_igif(fid))
+ if (fid_is_igif(fid))
return 0;
idx = oi->oi_dir;