* Operations implemented for each directory object.
*/
struct md_dir_operations {
- int (*mdo_is_subdir) (const struct lu_env *env, struct md_object *obj,
- const struct lu_fid *fid, struct lu_fid *sfid);
+ int (*mdo_is_subdir)(const struct lu_env *env, struct md_object *obj,
+ const struct lu_fid *fid);
- int (*mdo_lookup)(const struct lu_env *env, struct md_object *obj,
- const struct lu_name *lname, struct lu_fid *fid,
- struct md_op_spec *spec);
+ int (*mdo_lookup)(const struct lu_env *env, struct md_object *obj,
+ const struct lu_name *lname, struct lu_fid *fid,
+ struct md_op_spec *spec);
- mdl_mode_t (*mdo_lock_mode)(const struct lu_env *env,
- struct md_object *obj,
- mdl_mode_t mode);
+ mdl_mode_t (*mdo_lock_mode)(const struct lu_env *env,
+ struct md_object *obj,
+ mdl_mode_t mode);
- int (*mdo_create)(const struct lu_env *env, struct md_object *pobj,
- const struct lu_name *lname, struct md_object *child,
- struct md_op_spec *spec,
- struct md_attr *ma);
+ int (*mdo_create)(const struct lu_env *env, struct md_object *pobj,
+ const struct lu_name *lname, struct md_object *child,
+ struct md_op_spec *spec,
+ struct md_attr *ma);
- /** This method is used for creating data object for this meta object*/
- int (*mdo_create_data)(const struct lu_env *env, struct md_object *p,
- struct md_object *o,
- const struct md_op_spec *spec,
- struct md_attr *ma);
+ /** This method is used for creating data object for this meta object*/
+ int (*mdo_create_data)(const struct lu_env *env, struct md_object *p,
+ struct md_object *o,
+ const struct md_op_spec *spec,
+ struct md_attr *ma);
- int (*mdo_rename)(const struct lu_env *env, struct md_object *spobj,
- struct md_object *tpobj, const struct lu_fid *lf,
- const struct lu_name *lsname, struct md_object *tobj,
- const struct lu_name *ltname, struct md_attr *ma);
+ int (*mdo_rename)(const struct lu_env *env, struct md_object *spobj,
+ struct md_object *tpobj, const struct lu_fid *lf,
+ const struct lu_name *lsname, struct md_object *tobj,
+ const struct lu_name *ltname, struct md_attr *ma);
- int (*mdo_link)(const struct lu_env *env, struct md_object *tgt_obj,
- struct md_object *src_obj, const struct lu_name *lname,
- struct md_attr *ma);
+ int (*mdo_link)(const struct lu_env *env, struct md_object *tgt_obj,
+ struct md_object *src_obj, const struct lu_name *lname,
+ struct md_attr *ma);
int (*mdo_unlink)(const struct lu_env *env, struct md_object *pobj,
struct md_object *cobj, const struct lu_name *lname,
}
static inline int mdo_is_subdir(const struct lu_env *env,
- struct md_object *mo,
- const struct lu_fid *fid,
- struct lu_fid *sfid)
+ struct md_object *mo,
+ const struct lu_fid *fid)
{
- LASSERT(mo->mo_dir_ops->mdo_is_subdir);
- return mo->mo_dir_ops->mdo_is_subdir(env, mo, fid, sfid);
+ LASSERT(mo->mo_dir_ops->mdo_is_subdir);
+ return mo->mo_dir_ops->mdo_is_subdir(env, mo, fid);
}
static inline int mdo_link(const struct lu_env *env,
}
/*
- * return 1: if lf is the fid of the ancestor of p1;
+ * return 1: if \a tfid is the fid of the ancestor of \a mo;
* return 0: if not;
- *
- * return -EREMOTE: if remote object is found, in this
- * case fid of remote object is saved to @pf;
- *
* otherwise: values < 0, errors.
*/
static int mdd_is_parent(const struct lu_env *env,
struct mdd_device *mdd,
- struct mdd_object *p1,
+ struct mdd_object *mo,
const struct lu_attr *attr,
- const struct lu_fid *lf,
- struct lu_fid *pf)
+ const struct lu_fid *tfid)
{
- struct mdd_object *parent = NULL;
- struct lu_fid *pfid;
- int rc;
- ENTRY;
+ struct mdd_object *mp;
+ struct lu_fid *pfid;
+ int rc;
- LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
- pfid = &mdd_env_info(env)->mti_fid;
+ LASSERT(!lu_fid_eq(mdo2fid(mo), tfid));
+ pfid = &mdd_env_info(env)->mti_fid;
- /* Check for root first. */
- if (mdd_is_root(mdd, mdo2fid(p1)))
- RETURN(0);
+ if (mdd_is_root(mdd, mdo2fid(mo)))
+ return 0;
+
+ if (mdd_is_root(mdd, tfid))
+ return 1;
- for(;;) {
- /* this is done recursively */
- rc = mdd_parent_fid(env, p1, attr, pfid);
+ rc = mdd_parent_fid(env, mo, attr, pfid);
+ if (rc)
+ return rc;
+
+ while (1) {
+ if (lu_fid_eq(pfid, tfid))
+ return 1;
+
+ if (mdd_is_root(mdd, pfid))
+ return 0;
+
+ mp = mdd_object_find(env, mdd, pfid);
+ if (IS_ERR(mp))
+ return PTR_ERR(mp);
+
+ if (!mdd_object_exists(mp)) {
+ mdd_object_put(env, mp);
+ return -ENOENT;
+ }
+
+ rc = mdd_parent_fid(env, mp, attr, pfid);
+ mdd_object_put(env, mp);
if (rc)
- GOTO(out, rc);
- if (mdd_is_root(mdd, pfid))
- GOTO(out, rc = 0);
- if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
- GOTO(out, rc = 0);
- if (lu_fid_eq(pfid, lf))
- GOTO(out, rc = 1);
- if (parent != NULL)
- mdd_object_put(env, parent);
-
- parent = mdd_object_find(env, mdd, pfid);
- if (IS_ERR(parent))
- GOTO(out, rc = PTR_ERR(parent));
-
- if (!mdd_object_exists(parent))
- GOTO(out, rc = -EINVAL);
-
- p1 = parent;
- }
- EXIT;
-out:
- if (parent && !IS_ERR(parent))
- mdd_object_put(env, parent);
- return rc;
+ return rc;
+ }
+
+ return 0;
}
/*
*
* returns 1: if fid is ancestor of @mo;
* returns 0: if fid is not an ancestor of @mo;
- *
- * returns EREMOTE if remote object is found, fid of remote object is saved to
- * @fid;
- *
* returns < 0: if error
*/
int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
- const struct lu_fid *fid, struct lu_fid *sfid)
+ const struct lu_fid *fid)
{
struct mdd_device *mdd = mdo2mdd(mo);
struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
int rc;
ENTRY;
+ if (!mdd_object_exists(md2mdd_obj(mo)))
+ RETURN(-ENOENT);
+
if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
- RETURN(0);
+ RETURN(-ENOTDIR);
rc = mdd_la_get(env, md2mdd_obj(mo), attr);
if (rc != 0)
RETURN(rc);
- rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
- if (rc == 0) {
- /* found root */
- fid_zero(sfid);
- } else if (rc == 1) {
- /* found @fid is parent */
- *sfid = *fid;
- rc = 0;
- }
+ rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
RETURN(rc);
}
} else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
rc = MDD_RN_TGTSRC;
} else {
- rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
- NULL);
+ rc = mdd_is_parent(env, mdd, src_pobj, pattr,
+ mdo2fid(tgt_pobj));
if (rc == -EREMOTE)
rc = 0;
int mdd_write_locked(const struct lu_env *env, struct mdd_object *obj);
/* mdd_dir.c */
-int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
- const struct lu_fid *fid, struct lu_fid *sfid);
int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
const struct lu_attr *pattr, struct mdd_object *cobj,
bool check_perm);
return rc;
}
-static int mdt_attr_get_pfid(struct mdt_thread_info *info,
- struct mdt_object *o, struct lu_fid *pfid)
+int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
+ struct lu_fid *pfid)
{
struct lu_buf *buf = &info->mti_buf;
struct link_ea_header *leh;
const char *name);
int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
struct md_attr *ma, const char *name);
+int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
+ struct lu_fid *pfid);
int mdt_write_get(struct mdt_object *o);
void mdt_write_put(struct mdt_object *o);
int mdt_write_read(struct mdt_object *o);
EXIT;
}
-/*
- * This is is_subdir() variant, it is CMD if cmm forwards it to correct
- * target. Source should not be ancestor of target dir. May be other rename
- * checks can be moved here later.
- */
-static int mdt_is_subdir(struct mdt_thread_info *info,
- struct mdt_object *dir,
- const struct lu_fid *fid)
-{
- struct lu_fid dir_fid = dir->mot_header.loh_fid;
- int rc = 0;
- ENTRY;
-
- /* If the source and target are in the same directory, they can not
- * be parent/child relationship, so subdir check is not needed */
- if (lu_fid_eq(&dir_fid, fid))
- return 0;
-
- if (!mdt_object_exists(dir))
- RETURN(-ENOENT);
-
- rc = mdo_is_subdir(info->mti_env, mdt_object_child(dir),
- fid, &dir_fid);
- if (rc < 0) {
- CERROR("%s: failed subdir check in "DFID" for "DFID
- ": rc = %d\n", mdt_obd_name(info->mti_mdt),
- PFID(&dir_fid), PFID(fid), rc);
- /* Return EINVAL only if a parent is the @fid */
- if (rc == -EINVAL)
- rc = -EIO;
- } else {
- /* check the found fid */
- if (lu_fid_eq(&dir_fid, fid))
- rc = -EINVAL;
- }
-
- RETURN(rc);
-}
-
/* Update object linkEA */
struct mdt_lock_list {
struct mdt_object *mll_obj;
}
/*
+ * determine lock order of sobj and tobj
+ *
+ * there are two situations we need to lock tobj before sobj:
+ * 1. sobj is child of tobj
+ * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
+ * larger than that of tobj
+ *
+ * \retval 1 lock tobj before sobj
+ * \retval 0 lock sobj before tobj
+ * \retval -ev negative errno upon error
+ */
+static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
+ struct mdt_object *sobj,
+ struct mdt_object *tobj)
+{
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_fid *spfid = &info->mti_tmp_fid1;
+ struct lu_fid *tpfid = &info->mti_tmp_fid2;
+ struct lmv_mds_md_v1 *lmv;
+ __u32 sindex;
+ __u32 tindex;
+ int rc;
+
+ /* sobj and tobj are the same */
+ if (sobj == tobj)
+ return 0;
+
+ if (fid_is_root(mdt_object_fid(sobj)))
+ return 0;
+
+ if (fid_is_root(mdt_object_fid(tobj)))
+ return 1;
+
+ /* check whether sobj is child of tobj */
+ rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
+ mdt_object_fid(tobj));
+ if (rc < 0)
+ return rc;
+
+ if (rc == 1)
+ return 1;
+
+ /* check whether sobj and tobj are children of the same parent */
+ rc = mdt_attr_get_pfid(info, sobj, spfid);
+ if (rc)
+ return rc;
+
+ rc = mdt_attr_get_pfid(info, tobj, tpfid);
+ if (rc)
+ return rc;
+
+ if (!lu_fid_eq(spfid, tpfid))
+ return 0;
+
+ /* check whether sobj and tobj are sibling stripes */
+ ma->ma_need = MA_LMV;
+ ma->ma_valid = 0;
+ ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
+ ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
+ rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ return rc;
+
+ if (!(ma->ma_valid & MA_LMV))
+ return 0;
+
+ lmv = &ma->ma_lmv->lmv_md_v1;
+ if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
+ return 0;
+ sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
+
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ return rc;
+
+ if (!(ma->ma_valid & MA_LMV))
+ return -ENODATA;
+
+ lmv = &ma->ma_lmv->lmv_md_v1;
+ if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
+ return -EINVAL;
+ tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
+
+ /* check stripe index of sobj and tobj */
+ if (sindex == tindex)
+ return -EINVAL;
+
+ return sindex < tindex ? 0 : 1;
+}
+
+/*
* VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
* 2 - srcdir child; 3 - tgtdir child.
* Update on disk version of srcdir child.
mtgtdir = msrcdir;
mdt_object_get(info->mti_env, mtgtdir);
} else {
- /* Check if the @msrcdir is not a child of the @mtgtdir,
- * otherwise a reverse locking must take place. */
- rc = mdt_is_subdir(info, msrcdir, rr->rr_fid2);
- if (rc == -EINVAL)
- reverse = true;
- else if (rc)
- GOTO(out_put_srcdir, rc);
-
mtgtdir = mdt_object_find_check(info, rr->rr_fid2, 1);
if (IS_ERR(mtgtdir))
GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
}
+ rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
+ if (rc < 0)
+ GOTO(out_put_tgtdir, rc);
+
+ reverse = rc;
+
/* source needs to be looked up after locking source parent, otherwise
* this rename may race with unlink source, and cause rename hang, see
* sanityn.sh 55b, so check parents first, if later we found source is
/* Check if @mtgtdir is subdir of @mold, before locking child
* to avoid reverse locking. */
if (mtgtdir != msrcdir) {
- rc = mdt_is_subdir(info, mtgtdir, old_fid);
- if (rc)
+ rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
+ old_fid);
+ if (rc) {
+ if (rc == 1)
+ rc = -EINVAL;
GOTO(out_put_old, rc);
+ }
}
tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
/* Check if @msrcdir is subdir of @mnew, before locking child
* to avoid reverse locking. */
if (mtgtdir != msrcdir) {
- rc = mdt_is_subdir(info, msrcdir, new_fid);
- if (rc)
+ rc = mdo_is_subdir(info->mti_env,
+ mdt_object_child(msrcdir), new_fid);
+ if (rc) {
+ if (rc == 1)
+ rc = -EINVAL;
GOTO(out_unlock_old, rc);
+ }
}
/* We used to acquire MDS_INODELOCK_FULL here but we
}
run_test 80b "Accessing directory during migration"
-test_81() {
+test_81a() {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
rm -rf $DIR1/$tdir
return 0
}
-run_test 81 "rename and stat under striped directory"
+run_test 81a "rename and stat under striped directory"
+
+test_81b() {
+ [ $MDSCOUNT -lt 2 ] &&
+ skip "We need at least 2 MDTs for this test"
+
+ local total
+ local setattr_pid
+
+ total=1000
+
+ $LFS mkdir -c $MDSCOUNT $DIR1/$tdir || error "$LFS mkdir"
+ createmany -o $DIR1/$tdir/$tfile. $total || error "createmany"
+
+ (
+ while true; do
+ touch $DIR1/$tdir
+ done
+ ) &
+ setattr_pid=$!
+
+ for i in $(seq $total); do
+ mrename $DIR2/$tdir/$tfile.$i $DIR2/$tdir/$tfile-new.$i \
+ > /dev/null
+ done
+
+ kill -9 $setattr_pid
+}
+run_test 81b "rename under striped directory doesn't deadlock"
test_82() {
[[ $(lustre_version_code $SINGLEMDS) -gt $(version_code 2.6.91) ]] ||