* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
}
static int mdt_unlock_slaves(struct mdt_thread_info *mti,
- struct mdt_object *obj, __u64 ibits,
- struct mdt_lock_handle *s0_lh,
- struct mdt_object *s0_obj,
+ struct mdt_object *obj,
struct ldlm_enqueue_info *einfo,
int decref)
{
union ldlm_policy_data *policy = &mti->mti_policy;
+ struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
int i;
- int rc;
- ENTRY;
-
- if (!S_ISDIR(obj->mot_header.loh_attr))
- RETURN(0);
- /* Unlock stripe 0 */
- if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) {
- LASSERT(s0_obj != NULL);
- mdt_object_unlock_put(mti, s0_obj, s0_lh, decref);
- }
+ LASSERT(S_ISDIR(obj->mot_header.loh_attr));
+ LASSERT(slave_locks);
memset(policy, 0, sizeof(*policy));
- policy->l_inodebits.bits = ibits;
-
- if (slave_locks != NULL) {
- LASSERT(s0_lh != NULL);
- for (i = 1; i < slave_locks->count; i++) {
- /* borrow s0_lh temporarily to do mdt unlock */
- mdt_lock_reg_init(s0_lh, einfo->ei_mode);
- s0_lh->mlh_rreg_lh = slave_locks->handles[i];
- mdt_object_unlock(mti, NULL, s0_lh, decref);
- slave_locks->handles[i].cookie = 0ull;
- }
+ policy->l_inodebits.bits = einfo->ei_inodebits;
+ mdt_lock_handle_init(lh);
+ mdt_lock_reg_init(lh, einfo->ei_mode);
+ for (i = 0; i < slave_locks->ha_count; i++) {
+ if (test_bit(i, (void *)slave_locks->ha_map))
+ lh->mlh_rreg_lh = slave_locks->ha_handles[i];
+ else
+ lh->mlh_reg_lh = slave_locks->ha_handles[i];
+ mdt_object_unlock(mti, NULL, lh, decref);
+ slave_locks->ha_handles[i].cookie = 0ull;
}
- rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
- policy);
- RETURN(rc);
+ return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
+ policy);
}
-static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
- struct lu_fid *fid)
+static inline int mdt_object_striped(struct mdt_thread_info *mti,
+ struct mdt_object *obj)
{
- struct lu_buf *buf = &mti->mti_buf;
- struct lmv_mds_md_v1 *lmv;
int rc;
- ENTRY;
if (!S_ISDIR(obj->mot_header.loh_attr))
- RETURN(0);
+ return 0;
- buf->lb_buf = mti->mti_xattr_buf;
- buf->lb_len = sizeof(mti->mti_xattr_buf);
- rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), buf,
+ rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), &LU_BUF_NULL,
XATTR_NAME_LMV);
- if (rc == -ERANGE) {
- rc = mdt_big_xattr_get(mti, obj, XATTR_NAME_LMV);
- if (rc > 0) {
- buf->lb_buf = mti->mti_big_lmm;
- buf->lb_len = mti->mti_big_lmmsize;
- }
- }
-
- if (rc == -ENODATA || rc == -ENOENT)
- RETURN(0);
-
if (rc <= 0)
- RETURN(rc);
-
- lmv = buf->lb_buf;
- if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
- RETURN(-EINVAL);
+ return rc == -ENODATA ? 0 : rc;
- fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
-
- RETURN(rc);
+ return 1;
}
/**
**/
static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
enum ldlm_mode mode, __u64 ibits,
- struct lu_fid *s0_fid,
- struct mdt_lock_handle *s0_lh,
- struct mdt_object **s0_objp,
struct ldlm_enqueue_info *einfo)
{
union ldlm_policy_data *policy = &mti->mti_policy;
- int rc;
- ENTRY;
-
- memset(einfo, 0, sizeof(*einfo));
-
- rc = mdt_init_slaves(mti, obj, s0_fid);
- if (rc <= 0)
- RETURN(rc);
LASSERT(S_ISDIR(obj->mot_header.loh_attr));
- if (!lu_fid_eq(s0_fid, mdt_object_fid(obj))) {
- /* Except migrating object, whose 0_stripe and master
- * object are the same object, 0_stripe and master
- * object are different, though they are in the same
- * MDT, to avoid adding osd_object_lock here, so we
- * will enqueue the stripe0 lock in MDT0 for now */
- *s0_objp = mdt_object_find(mti->mti_env, mti->mti_mdt, s0_fid);
- if (IS_ERR(*s0_objp))
- RETURN(PTR_ERR(*s0_objp));
-
- rc = mdt_reint_object_lock(mti, *s0_objp, s0_lh, ibits, true);
- if (rc < 0) {
- mdt_object_put(mti->mti_env, *s0_objp);
- RETURN(rc);
- }
- }
-
einfo->ei_type = LDLM_IBITS;
einfo->ei_mode = mode;
einfo->ei_cb_bl = mdt_remote_blocking_ast;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_enq_slave = 1;
einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
+ einfo->ei_inodebits = ibits;
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = ibits;
- rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
- policy);
- RETURN(rc);
+ return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
+ policy);
+}
+
+static inline int mdt_reint_striped_lock(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ __u64 ibits,
+ struct ldlm_enqueue_info *einfo,
+ bool cos_incompat)
+{
+ int rc;
+
+ LASSERT(!mdt_object_remote(o));
+
+ memset(einfo, 0, sizeof(*einfo));
+
+ rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
+ if (rc)
+ return rc;
+
+ rc = mdt_object_striped(info, o);
+ if (rc != 1) {
+ if (rc < 0)
+ mdt_object_unlock(info, o, lh, rc);
+ return rc;
+ }
+
+ rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
+ if (rc) {
+ mdt_object_unlock(info, o, lh, rc);
+ if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
+ rc = 0;
+ }
+
+ return rc;
+}
+
+static inline void
+mdt_reint_striped_unlock(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo, int decref)
+{
+ if (einfo->ei_cbdata)
+ mdt_unlock_slaves(info, o, einfo, decref);
+ mdt_object_unlock(info, o, lh, decref);
}
/*
* 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
* check.
*/
-static int mdt_md_create(struct mdt_thread_info *info)
+static int mdt_create(struct mdt_thread_info *info)
{
struct mdt_device *mdt = info->mti_mdt;
struct mdt_object *parent;
*/
if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
struct mdt_lock_handle *lhc;
- struct mdt_lock_handle *s0_lh;
- struct mdt_object *s0_obj = NULL;
- struct ldlm_enqueue_info *einfo;
- struct lu_fid *s0_fid = &info->mti_tmp_fid1;
- bool cos_incompat = false;
-
- rc = mdt_init_slaves(info, child, s0_fid);
- if (rc > 0) {
- cos_incompat = true;
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+ bool cos_incompat;
+
+ rc = mdt_object_striped(info, child);
+ if (rc < 0)
+ GOTO(put_child, rc);
+
+ cos_incompat = rc;
+ if (cos_incompat) {
if (!mdt_object_remote(parent)) {
mdt_object_unlock(info, parent, lh, 1);
mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
}
}
- einfo = &info->mti_einfo;
lhc = &info->mti_lh[MDT_LH_CHILD];
mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PW);
- rc = mdt_reint_object_lock(info, child, lhc,
- MDS_INODELOCK_UPDATE,
- cos_incompat);
+ rc = mdt_reint_striped_lock(info, child, lhc,
+ MDS_INODELOCK_UPDATE, einfo,
+ cos_incompat);
if (rc)
GOTO(put_child, rc);
- mdt_object_unlock(info, child, lhc, rc);
-
- s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_handle_init(s0_lh);
- mdt_lock_reg_init(s0_lh, LCK_PW);
- rc = mdt_lock_slaves(info, child, LCK_PW, MDS_INODELOCK_UPDATE,
- s0_fid, s0_lh, &s0_obj, einfo);
- mdt_unlock_slaves(info, child, MDS_INODELOCK_UPDATE, s0_lh,
- s0_obj, einfo, rc);
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && rc == -EIO)
- rc = 0;
+
+ mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
}
/* Return fid & attr to client. */
(LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
__u64 lockpart = MDS_INODELOCK_UPDATE;
struct ldlm_enqueue_info *einfo = &info->mti_einfo;
- struct lu_fid *s0_fid = &info->mti_tmp_fid1;
- struct mdt_lock_handle *s0_lh = NULL;
- struct mdt_object *s0_obj = NULL;
- bool cos_incompat = false;
+ bool cos_incompat;
int rc;
ENTRY;
- rc = mdt_init_slaves(info, mo, s0_fid);
- if (rc > 0)
- cos_incompat = true;
+ rc = mdt_object_striped(info, mo);
+ if (rc < 0)
+ RETURN(rc);
+
+ cos_incompat = rc;
- lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_reg_init(lh, LCK_PW);
+ lh = &info->mti_lh[MDT_LH_PARENT];
+ mdt_lock_reg_init(lh, LCK_PW);
/* Even though the new MDT will grant PERM lock to the old
* client, but the old client will almost ignore that during
if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
- rc = mdt_reint_object_lock(info, mo, lh, lockpart, cos_incompat);
+ rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
+ cos_incompat);
if (rc != 0)
RETURN(rc);
- s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_reg_init(s0_lh, LCK_PW);
- rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_fid, s0_lh, &s0_obj,
- einfo);
- if (rc != 0)
- GOTO(out_unlock, rc);
-
- /* all attrs are packed into mti_attr in unpack_setattr */
- mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
- OBD_FAIL_MDS_REINT_SETATTR_WRITE);
-
- /* This is only for set ctime when rename's source is on remote MDS. */
- if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
- ma->ma_attr_flags |= MDS_VTX_BYPASS;
+ /* all attrs are packed into mti_attr in unpack_setattr */
+ mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+ OBD_FAIL_MDS_REINT_SETATTR_WRITE);
- /* VBR: update version if attr changed are important for recovery */
- if (do_vbr) {
- /* update on-disk version of changed object */
+ /* VBR: update version if attr changed are important for recovery */
+ if (do_vbr) {
+ /* update on-disk version of changed object */
tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
- rc = mdt_version_get_check_save(info, mo, 0);
- if (rc)
- GOTO(out_unlock, rc);
- }
+ rc = mdt_version_get_check_save(info, mo, 0);
+ if (rc)
+ GOTO(out_unlock, rc);
+ }
/* Ensure constant striping during chown(). See LU-2789. */
if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
mutex_lock(&mo->mot_lov_mutex);
- /* all attrs are packed into mti_attr in unpack_setattr */
- rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
+ /* all attrs are packed into mti_attr in unpack_setattr */
+ rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
mutex_unlock(&mo->mot_lov_mutex);
- if (rc != 0)
- GOTO(out_unlock, rc);
-
- EXIT;
+ if (rc != 0)
+ GOTO(out_unlock, rc);
+ mdt_dom_obj_lvb_update(info->mti_env, mo, false);
+ EXIT;
out_unlock:
- mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc);
- mdt_object_unlock(info, mo, lh, rc);
- return rc;
+ mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
+ return rc;
}
/**
/* If an up2date copy exists in the backend, add dirty flag */
if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
&& !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
- struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD];
-
ma->ma_hsm.mh_flags |= HS_DIRTY;
- mdt_lock_reg_init(lh, LCK_PW);
- rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR);
- if (rc != 0)
- RETURN(rc);
-
rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
if (rc)
CERROR("file attribute change error for "DFID": %d\n",
PFID(mdt_object_fid(mo)), rc);
- mdt_object_unlock(info, mo, lh, rc);
}
RETURN(rc);
if (mdt_object_remote(mo))
GOTO(out_put, rc = -EREMOTE);
- if ((ma->ma_attr.la_valid & LA_SIZE) ||
- (rr->rr_flags & MRF_OPEN_TRUNC)) {
+ if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
/* Check write access for the O_TRUNC case */
if (mdt_write_read(mo) < 0)
GOTO(out_put, rc = -ETXTBSY);
+
+ /* LU-10286: compatibility check for FLR.
+ * Please check the comment in mdt_finish_open() for details */
+ if (!exp_connect_flr(info->mti_exp)) {
+ rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
+ if (rc < 0 && rc != -ENODATA)
+ GOTO(out_put, rc);
+
+ if (rc > 0 && mdt_lmm_is_flr(info->mti_big_lmm))
+ GOTO(out_put, rc = -EOPNOTSUPP);
+ }
+
+ /* For truncate, the file size sent from client
+ * is believable, but the blocks are incorrect,
+ * which makes the block size in LSOM attribute
+ * inconsisent with the real block size.
+ */
+ rc = mdt_lsom_update(info, mo, true);
+ if (rc)
+ GOTO(out_put, rc);
}
if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
- EXIT;
+ EXIT;
out_put:
- mdt_object_put(info->mti_env, mo);
+ mdt_object_put(info->mti_env, mo);
out:
- if (rc == 0)
+ if (rc == 0)
mdt_counter_incr(req, LPROC_MDT_SETATTR);
mdt_client_compatibility(info);
RETURN(err_serious(-EOPNOTSUPP));
}
- rc = mdt_md_create(info);
+ rc = mdt_create(info);
RETURN(rc);
}
struct mdt_lock_handle *parent_lh;
struct mdt_lock_handle *child_lh;
struct ldlm_enqueue_info *einfo = &info->mti_einfo;
- struct lu_fid *s0_fid = &info->mti_tmp_fid2;
- struct mdt_lock_handle *s0_lh = NULL;
- struct mdt_object *s0_obj = NULL;
__u64 lock_ibits;
- bool cos_incompat = false;
+ bool cos_incompat = false, discard = false;
int no_name = 0;
int rc;
+
ENTRY;
DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
if (IS_ERR(mc))
GOTO(unlock_parent, rc = PTR_ERR(mc));
- if (!cos_incompat && mdt_init_slaves(info, mc, s0_fid) > 0) {
- cos_incompat = true;
- mdt_object_put(info->mti_env, mc);
- mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
- goto relock;
+ if (!cos_incompat) {
+ rc = mdt_object_striped(info, mc);
+ if (rc < 0)
+ GOTO(unlock_parent, rc = PTR_ERR(mc));
+
+ cos_incompat = rc;
+ if (cos_incompat) {
+ mdt_object_put(info->mti_env, mc);
+ mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
+ goto relock;
+ }
}
child_lh = &info->mti_lh[MDT_LH_CHILD];
rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
&child_lh->mlh_rreg_lh,
child_lh->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP, false, false);
+ MDS_INODELOCK_LOOKUP, false);
if (rc != ELDLM_OK)
GOTO(put_child, rc);
lock_ibits &= ~MDS_INODELOCK_LOOKUP;
}
- rc = mdt_reint_object_lock(info, mc, child_lh, lock_ibits,
- cos_incompat);
+ rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
+ cos_incompat);
if (rc != 0)
- GOTO(unlock_child, rc);
+ GOTO(put_child, rc);
/*
* Now we can only make sure we need MA_INODE, in mdd layer, will check
ma->ma_need = MA_INODE;
ma->ma_valid = 0;
- s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_reg_init(s0_lh, LCK_EX);
- rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_fid,
- s0_lh, &s0_obj, einfo);
- if (rc != 0)
- GOTO(unlock_child, rc);
-
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_UNLINK_WRITE);
/* save version when object is locked */
mdt_object_child(mc), &rr->rr_name, ma, no_name);
mutex_unlock(&mc->mot_lov_mutex);
+ if (rc != 0)
+ GOTO(unlock_child, rc);
- if (rc == 0 && !lu_object_is_dying(&mc->mot_header))
+ if (!lu_object_is_dying(&mc->mot_header)) {
rc = mdt_attr_get_complex(info, mc, ma);
- if (rc == 0)
- mdt_handle_last_unlink(info, mc, ma);
+ if (rc)
+ GOTO(out_stat, rc);
+ } else {
+ discard = true;
+ }
+ mdt_handle_last_unlink(info, mc, ma);
- if (ma->ma_valid & MA_INODE) {
- switch (ma->ma_attr.la_mode & S_IFMT) {
- case S_IFDIR:
+out_stat:
+ if (ma->ma_valid & MA_INODE) {
+ switch (ma->ma_attr.la_mode & S_IFMT) {
+ case S_IFDIR:
mdt_counter_incr(req, LPROC_MDT_RMDIR);
- break;
- case S_IFREG:
- case S_IFLNK:
- case S_IFCHR:
- case S_IFBLK:
- case S_IFIFO:
- case S_IFSOCK:
+ break;
+ case S_IFREG:
+ case S_IFLNK:
+ case S_IFCHR:
+ case S_IFBLK:
+ case S_IFIFO:
+ case S_IFSOCK:
mdt_counter_incr(req, LPROC_MDT_UNLINK);
- break;
- default:
- LASSERTF(0, "bad file type %o unlinking\n",
- ma->ma_attr.la_mode);
- }
- }
+ break;
+ default:
+ LASSERTF(0, "bad file type %o unlinking\n",
+ ma->ma_attr.la_mode);
+ }
+ }
- EXIT;
+ EXIT;
unlock_child:
- mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo,
- rc);
- mdt_object_unlock(info, mc, child_lh, rc);
+ mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
put_child:
+ if (discard)
+ mdt_dom_check_and_discard(info, mc);
mdt_object_put(info->mti_env, mc);
unlock_parent:
mdt_object_unlock(info, mp, parent_lh, rc);
rc = mdt_remote_object_lock(info, obj,
&LUSTRE_BFL_FID, lh,
LCK_EX,
- MDS_INODELOCK_UPDATE, false, false);
+ MDS_INODELOCK_UPDATE, false);
mdt_object_put(info->mti_env, obj);
} else {
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
EXIT;
}
-/*
- * This is is_subdir() variant, it is CMD if cmm forwards it to correct
- * target. Source should not be ancestor of target dir. May be other rename
- * checks can be moved here later.
- */
-static int mdt_is_subdir(struct mdt_thread_info *info,
- struct mdt_object *dir,
- const struct lu_fid *fid)
-{
- struct lu_fid dir_fid = dir->mot_header.loh_fid;
- int rc = 0;
- ENTRY;
-
- /* If the source and target are in the same directory, they can not
- * be parent/child relationship, so subdir check is not needed */
- if (lu_fid_eq(&dir_fid, fid))
- return 0;
-
- if (!mdt_object_exists(dir))
- RETURN(-ENOENT);
-
- rc = mdo_is_subdir(info->mti_env, mdt_object_child(dir),
- fid, &dir_fid);
- if (rc < 0) {
- CERROR("%s: failed subdir check in "DFID" for "DFID
- ": rc = %d\n", mdt_obd_name(info->mti_mdt),
- PFID(&dir_fid), PFID(fid), rc);
- /* Return EINVAL only if a parent is the @fid */
- if (rc == -EINVAL)
- rc = -EIO;
- } else {
- /* check the found fid */
- if (lu_fid_eq(&dir_fid, fid))
- rc = -EINVAL;
- }
-
- RETURN(rc);
-}
-
/* Update object linkEA */
struct mdt_lock_list {
struct mdt_object *mll_obj;
struct mdt_lock_list *mll;
struct lu_name name;
struct lu_fid fid;
+ __u64 ibits;
linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
&name, &fid);
* cannot be gotten because of conflicting locks, then drop all
* current locks, send an AST to the client, and start again. */
mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
- rc = mdt_reint_object_lock_try(info, mdt_pobj, &mll->mll_lh,
- MDS_INODELOCK_UPDATE, true);
- if (rc == 0) {
- mdt_unlock_list(info, lock_list, rc);
+ ibits = 0;
+ rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh, &ibits,
+ MDS_INODELOCK_UPDATE, true);
+ if (!(ibits & MDS_INODELOCK_UPDATE)) {
+ mdt_unlock_list(info, lock_list, 0);
CDEBUG(D_INFO, "%s: busy lock on "DFID" %s retry %d\n",
mdt_obd_name(mdt), PFID(&fid), name.ln_name,
GOTO(out, rc = -EBUSY);
}
+ mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
MDS_INODELOCK_UPDATE);
if (rc != 0) {
if (IS_ERR(mold))
GOTO(out_unlock_parent, rc = PTR_ERR(mold));
+ if (!mdt_object_exists(mold)) {
+ LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+ &mold->mot_obj,
+ "object does not exist");
+ GOTO(out_put_child, rc = -ENOENT);
+ }
+
if (mdt_object_remote(mold)) {
CDEBUG(D_OTHER, "%s: source "DFID" is on the remote MDT\n",
mdt_obd_name(info->mti_mdt), PFID(old_fid));
rc = mdt_remote_object_lock(info, msrcdir, mdt_object_fid(mold),
&lh_childp->mlh_rreg_lh,
lh_childp->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP, false, false);
+ MDS_INODELOCK_LOOKUP, false);
if (rc != ELDLM_OK)
GOTO(out_unlock_list, rc);
mdt_object_fid(mnew),
&lh_tgtp->mlh_rreg_lh,
lh_tgtp->mlh_rreg_mode,
- MDS_INODELOCK_UPDATE, false, false);
+ MDS_INODELOCK_UPDATE, false);
if (rc != 0) {
lh_tgtp = NULL;
GOTO(out_put_new, rc);
}
/*
+ * determine lock order of sobj and tobj
+ *
+ * there are two situations we need to lock tobj before sobj:
+ * 1. sobj is child of tobj
+ * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
+ * larger than that of tobj
+ *
+ * \retval 1 lock tobj before sobj
+ * \retval 0 lock sobj before tobj
+ * \retval -ev negative errno upon error
+ */
+static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
+ struct mdt_object *sobj,
+ struct mdt_object *tobj)
+{
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_fid *spfid = &info->mti_tmp_fid1;
+ struct lu_fid *tpfid = &info->mti_tmp_fid2;
+ struct lmv_mds_md_v1 *lmv;
+ __u32 sindex;
+ __u32 tindex;
+ int rc;
+
+ /* sobj and tobj are the same */
+ if (sobj == tobj)
+ return 0;
+
+ if (fid_is_root(mdt_object_fid(sobj)))
+ return 0;
+
+ if (fid_is_root(mdt_object_fid(tobj)))
+ return 1;
+
+ /* check whether sobj is child of tobj */
+ rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
+ mdt_object_fid(tobj));
+ if (rc < 0)
+ return rc;
+
+ if (rc == 1)
+ return 1;
+
+ /* check whether sobj and tobj are children of the same parent */
+ rc = mdt_attr_get_pfid(info, sobj, spfid);
+ if (rc)
+ return rc;
+
+ rc = mdt_attr_get_pfid(info, tobj, tpfid);
+ if (rc)
+ return rc;
+
+ if (!lu_fid_eq(spfid, tpfid))
+ return 0;
+
+ /* check whether sobj and tobj are sibling stripes */
+ ma->ma_need = MA_LMV;
+ ma->ma_valid = 0;
+ ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
+ ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
+ rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ return rc;
+
+ if (!(ma->ma_valid & MA_LMV))
+ return 0;
+
+ lmv = &ma->ma_lmv->lmv_md_v1;
+ if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
+ return 0;
+ sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
+
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ return rc;
+
+ if (!(ma->ma_valid & MA_LMV))
+ return -ENODATA;
+
+ lmv = &ma->ma_lmv->lmv_md_v1;
+ if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
+ return -EINVAL;
+ tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
+
+ /* check stripe index of sobj and tobj */
+ if (sindex == tindex)
+ return -EINVAL;
+
+ return sindex < tindex ? 0 : 1;
+}
+
+/*
* VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
* 2 - srcdir child; 3 - tgtdir child.
* Update on disk version of srcdir child.
struct lu_fid *new_fid = &info->mti_tmp_fid2;
__u64 lock_ibits;
bool reverse = false;
- bool cos_incompat;
+ bool cos_incompat, discard = false;
int rc;
ENTRY;
mtgtdir = msrcdir;
mdt_object_get(info->mti_env, mtgtdir);
} else {
- /* Check if the @msrcdir is not a child of the @mtgtdir,
- * otherwise a reverse locking must take place. */
- rc = mdt_is_subdir(info, msrcdir, rr->rr_fid2);
- if (rc == -EINVAL)
- reverse = true;
- else if (rc)
- GOTO(out_put_srcdir, rc);
-
mtgtdir = mdt_object_find_check(info, rr->rr_fid2, 1);
if (IS_ERR(mtgtdir))
GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
}
+ rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
+ if (rc < 0)
+ GOTO(out_put_tgtdir, rc);
+
+ reverse = rc;
+
/* source needs to be looked up after locking source parent, otherwise
* this rename may race with unlink source, and cause rename hang, see
* sanityn.sh 55b, so check parents first, if later we found source is
if (IS_ERR(mold))
GOTO(out_unlock_parents, rc = PTR_ERR(mold));
+ if (!mdt_object_exists(mold)) {
+ LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+ &mold->mot_obj,
+ "object does not exist");
+ GOTO(out_put_old, rc = -ENOENT);
+ }
+
/* Check if @mtgtdir is subdir of @mold, before locking child
* to avoid reverse locking. */
if (mtgtdir != msrcdir) {
- rc = mdt_is_subdir(info, mtgtdir, old_fid);
- if (rc)
+ rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
+ old_fid);
+ if (rc) {
+ if (rc == 1)
+ rc = -EINVAL;
GOTO(out_put_old, rc);
+ }
}
tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
if (IS_ERR(mnew))
GOTO(out_put_old, rc = PTR_ERR(mnew));
+ if (!mdt_object_exists(mnew)) {
+ LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+ &mnew->mot_obj,
+ "object does not exist");
+ GOTO(out_put_new, rc = -ENOENT);
+ }
+
if (mdt_object_remote(mnew)) {
struct mdt_body *repbody;
&lh_oldp->mlh_rreg_lh,
lh_oldp->mlh_rreg_mode,
MDS_INODELOCK_LOOKUP,
- false, false);
+ false);
if (rc != ELDLM_OK)
GOTO(out_put_new, rc);
/* Check if @msrcdir is subdir of @mnew, before locking child
* to avoid reverse locking. */
if (mtgtdir != msrcdir) {
- rc = mdt_is_subdir(info, msrcdir, new_fid);
- if (rc)
+ rc = mdo_is_subdir(info->mti_env,
+ mdt_object_child(msrcdir), new_fid);
+ if (rc) {
+ if (rc == 1)
+ rc = -EINVAL;
GOTO(out_unlock_old, rc);
+ }
}
/* We used to acquire MDS_INODELOCK_FULL here but we
&lh_oldp->mlh_rreg_lh,
lh_oldp->mlh_rreg_mode,
MDS_INODELOCK_LOOKUP,
- false, false);
+ false);
if (rc != ELDLM_OK)
GOTO(out_put_old, rc);
/* handle last link of tgt object */
if (rc == 0) {
mdt_counter_incr(req, LPROC_MDT_RENAME);
- if (mnew)
+ if (mnew) {
mdt_handle_last_unlink(info, mnew, ma);
+ discard = true;
+ }
mdt_rename_counter_tally(info, info->mti_mdt, req,
msrcdir, mtgtdir);
out_unlock_old:
mdt_object_unlock(info, mold, lh_oldp, rc);
out_put_new:
- if (mnew != NULL)
+ if (mnew != NULL) {
+ if (discard)
+ mdt_dom_check_and_discard(info, mnew);
mdt_object_put(info->mti_env, mnew);
+ }
out_put_old:
mdt_object_put(info->mti_env, mold);
out_unlock_parents:
return mdt_reint_rename_or_migrate(info, lhc, false);
}
+static int mdt_reint_resync(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
+{
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct md_attr *ma = &info->mti_attr;
+ struct mdt_object *mo;
+ struct ldlm_lock *lease;
+ struct mdt_body *repbody;
+ struct md_layout_change layout = { 0 };
+ bool lease_broken;
+ int rc, rc2;
+ ENTRY;
+
+ DEBUG_REQ(D_INODE, req, DFID": FLR file resync\n", PFID(rr->rr_fid1));
+
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
+
+ mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+ if (IS_ERR(mo))
+ GOTO(out, rc = PTR_ERR(mo));
+
+ if (!mdt_object_exists(mo))
+ GOTO(out_obj, rc = -ENOENT);
+
+ if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
+ GOTO(out_obj, rc = -EINVAL);
+
+ if (mdt_object_remote(mo))
+ GOTO(out_obj, rc = -EREMOTE);
+
+ lease = ldlm_handle2lock(rr->rr_handle);
+ if (lease == NULL)
+ GOTO(out_obj, rc = -ESTALE);
+
+ /* It's really necessary to grab open_sem and check if the lease lock
+ * has been lost. There would exist a concurrent writer coming in and
+ * generating some dirty data in memory cache, the writeback would fail
+ * after the layout version is increased by MDS_REINT_RESYNC RPC. */
+ if (!down_write_trylock(&mo->mot_open_sem))
+ GOTO(out_put_lease, rc = -EBUSY);
+
+ lock_res_and_lock(lease);
+ lease_broken = ldlm_is_cancel(lease);
+ unlock_res_and_lock(lease);
+ if (lease_broken)
+ GOTO(out_unlock, rc = -EBUSY);
+
+ /* the file has yet opened by anyone else after we took the lease. */
+ layout.mlc_opc = MD_LAYOUT_RESYNC;
+ rc = mdt_layout_change(info, mo, &layout);
+ if (rc)
+ GOTO(out_unlock, rc);
+
+ ma->ma_need = MA_INODE;
+ ma->ma_valid = 0;
+ rc = mdt_attr_get_complex(info, mo, ma);
+ if (rc != 0)
+ GOTO(out_unlock, rc);
+
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
+
+ EXIT;
+out_unlock:
+ up_write(&mo->mot_open_sem);
+out_put_lease:
+ LDLM_LOCK_PUT(lease);
+out_obj:
+ mdt_object_put(info->mti_env, mo);
+out:
+ mdt_client_compatibility(info);
+ rc2 = mdt_fix_reply(info);
+ if (rc == 0)
+ rc = rc2;
+ return rc;
+}
+
struct mdt_reinter {
int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
enum lprocfs_extra_opc mr_extra_opc;
.mr_handler = &mdt_reint_migrate,
.mr_extra_opc = MDS_REINT_RENAME,
},
+ [REINT_RESYNC] = {
+ .mr_handler = &mdt_reint_resync,
+ .mr_extra_opc = MDS_REINT_RESYNC,
+ },
};
int mdt_reint_rec(struct mdt_thread_info *info,