}
static int mdt_unlock_slaves(struct mdt_thread_info *mti,
- struct mdt_object *obj, __u64 ibits,
- struct mdt_lock_handle *s0_lh,
- struct mdt_object *s0_obj,
+ struct mdt_object *obj,
struct ldlm_enqueue_info *einfo,
int decref)
{
union ldlm_policy_data *policy = &mti->mti_policy;
+ struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
int i;
- int rc;
- ENTRY;
-
- if (!S_ISDIR(obj->mot_header.loh_attr))
- RETURN(0);
- /* Unlock stripe 0 */
- if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) {
- LASSERT(s0_obj != NULL);
- mdt_object_unlock_put(mti, s0_obj, s0_lh, decref);
- }
+ LASSERT(S_ISDIR(obj->mot_header.loh_attr));
+ LASSERT(slave_locks);
memset(policy, 0, sizeof(*policy));
- policy->l_inodebits.bits = ibits;
-
- if (slave_locks != NULL) {
- LASSERT(s0_lh != NULL);
- for (i = 1; i < slave_locks->count; i++) {
- /* borrow s0_lh temporarily to do mdt unlock */
- mdt_lock_reg_init(s0_lh, einfo->ei_mode);
- s0_lh->mlh_rreg_lh = slave_locks->handles[i];
- mdt_object_unlock(mti, NULL, s0_lh, decref);
- slave_locks->handles[i].cookie = 0ull;
- }
+ policy->l_inodebits.bits = einfo->ei_inodebits;
+ mdt_lock_handle_init(lh);
+ mdt_lock_reg_init(lh, einfo->ei_mode);
+ for (i = 0; i < slave_locks->ha_count; i++) {
+ if (test_bit(i, (void *)slave_locks->ha_map))
+ lh->mlh_rreg_lh = slave_locks->ha_handles[i];
+ else
+ lh->mlh_reg_lh = slave_locks->ha_handles[i];
+ mdt_object_unlock(mti, NULL, lh, decref);
+ slave_locks->ha_handles[i].cookie = 0ull;
}
- rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
- policy);
- RETURN(rc);
+ return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
+ policy);
}
-static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
- struct lu_fid *fid)
+static inline int mdt_object_striped(struct mdt_thread_info *mti,
+ struct mdt_object *obj)
{
- struct lu_buf *buf = &mti->mti_buf;
- struct lmv_mds_md_v1 *lmv;
int rc;
- ENTRY;
if (!S_ISDIR(obj->mot_header.loh_attr))
- RETURN(0);
+ return 0;
- buf->lb_buf = mti->mti_xattr_buf;
- buf->lb_len = sizeof(mti->mti_xattr_buf);
- rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), buf,
+ rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), &LU_BUF_NULL,
XATTR_NAME_LMV);
- if (rc == -ERANGE) {
- rc = mdt_big_xattr_get(mti, obj, XATTR_NAME_LMV);
- if (rc > 0) {
- buf->lb_buf = mti->mti_big_lmm;
- buf->lb_len = mti->mti_big_lmmsize;
- }
- }
-
- if (rc == -ENODATA || rc == -ENOENT)
- RETURN(0);
-
if (rc <= 0)
- RETURN(rc);
-
- lmv = buf->lb_buf;
- if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
- RETURN(-EINVAL);
+ return rc == -ENODATA ? 0 : rc;
- fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
-
- RETURN(rc);
+ return 1;
}
/**
**/
static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
enum ldlm_mode mode, __u64 ibits,
- struct lu_fid *s0_fid,
- struct mdt_lock_handle *s0_lh,
- struct mdt_object **s0_objp,
struct ldlm_enqueue_info *einfo)
{
union ldlm_policy_data *policy = &mti->mti_policy;
- int rc;
- ENTRY;
-
- memset(einfo, 0, sizeof(*einfo));
-
- rc = mdt_init_slaves(mti, obj, s0_fid);
- if (rc <= 0)
- RETURN(rc);
LASSERT(S_ISDIR(obj->mot_header.loh_attr));
- if (!lu_fid_eq(s0_fid, mdt_object_fid(obj))) {
- /* Except migrating object, whose 0_stripe and master
- * object are the same object, 0_stripe and master
- * object are different, though they are in the same
- * MDT, to avoid adding osd_object_lock here, so we
- * will enqueue the stripe0 lock in MDT0 for now */
- *s0_objp = mdt_object_find(mti->mti_env, mti->mti_mdt, s0_fid);
- if (IS_ERR(*s0_objp))
- RETURN(PTR_ERR(*s0_objp));
-
- rc = mdt_reint_object_lock(mti, *s0_objp, s0_lh, ibits, true);
- if (rc < 0) {
- mdt_object_put(mti->mti_env, *s0_objp);
- RETURN(rc);
- }
- }
-
einfo->ei_type = LDLM_IBITS;
einfo->ei_mode = mode;
einfo->ei_cb_bl = mdt_remote_blocking_ast;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_enq_slave = 1;
einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
+ einfo->ei_inodebits = ibits;
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = ibits;
- rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
- policy);
- RETURN(rc);
+ return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
+ policy);
+}
+
+static inline int mdt_reint_striped_lock(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ __u64 ibits,
+ struct ldlm_enqueue_info *einfo,
+ bool cos_incompat)
+{
+ int rc;
+
+ LASSERT(!mdt_object_remote(o));
+
+ memset(einfo, 0, sizeof(*einfo));
+
+ rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
+ if (rc)
+ return rc;
+
+ rc = mdt_object_striped(info, o);
+ if (rc != 1) {
+ if (rc < 0)
+ mdt_object_unlock(info, o, lh, rc);
+ return rc;
+ }
+
+ rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
+ if (rc) {
+ mdt_object_unlock(info, o, lh, rc);
+ if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
+ rc = 0;
+ }
+
+ return rc;
+}
+
+static inline void
+mdt_reint_striped_unlock(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo, int decref)
+{
+ if (einfo->ei_cbdata)
+ mdt_unlock_slaves(info, o, einfo, decref);
+ mdt_object_unlock(info, o, lh, decref);
}
/*
*/
if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
struct mdt_lock_handle *lhc;
- struct mdt_lock_handle *s0_lh;
- struct mdt_object *s0_obj = NULL;
- struct ldlm_enqueue_info *einfo;
- struct lu_fid *s0_fid = &info->mti_tmp_fid1;
- bool cos_incompat = false;
-
- rc = mdt_init_slaves(info, child, s0_fid);
- if (rc > 0) {
- cos_incompat = true;
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+ bool cos_incompat;
+
+ rc = mdt_object_striped(info, child);
+ if (rc < 0)
+ GOTO(put_child, rc);
+
+ cos_incompat = rc;
+ if (cos_incompat) {
if (!mdt_object_remote(parent)) {
mdt_object_unlock(info, parent, lh, 1);
mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
}
}
- einfo = &info->mti_einfo;
lhc = &info->mti_lh[MDT_LH_CHILD];
mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PW);
- rc = mdt_reint_object_lock(info, child, lhc,
- MDS_INODELOCK_UPDATE,
- cos_incompat);
+ rc = mdt_reint_striped_lock(info, child, lhc,
+ MDS_INODELOCK_UPDATE, einfo,
+ cos_incompat);
if (rc)
GOTO(put_child, rc);
- mdt_object_unlock(info, child, lhc, rc);
-
- s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_handle_init(s0_lh);
- mdt_lock_reg_init(s0_lh, LCK_PW);
- rc = mdt_lock_slaves(info, child, LCK_PW, MDS_INODELOCK_UPDATE,
- s0_fid, s0_lh, &s0_obj, einfo);
- mdt_unlock_slaves(info, child, MDS_INODELOCK_UPDATE, s0_lh,
- s0_obj, einfo, rc);
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && rc == -EIO)
- rc = 0;
+
+ mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
}
/* Return fid & attr to client. */
(LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
__u64 lockpart = MDS_INODELOCK_UPDATE;
struct ldlm_enqueue_info *einfo = &info->mti_einfo;
- struct lu_fid *s0_fid = &info->mti_tmp_fid1;
- struct mdt_lock_handle *s0_lh = NULL;
- struct mdt_object *s0_obj = NULL;
- bool cos_incompat = false;
+ bool cos_incompat;
int rc;
ENTRY;
- rc = mdt_init_slaves(info, mo, s0_fid);
- if (rc > 0)
- cos_incompat = true;
+ rc = mdt_object_striped(info, mo);
+ if (rc < 0)
+ RETURN(rc);
+
+ cos_incompat = rc;
lh = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_reg_init(lh, LCK_PW);
if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
- rc = mdt_reint_object_lock(info, mo, lh, lockpart, cos_incompat);
+ rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
+ cos_incompat);
if (rc != 0)
RETURN(rc);
- s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_reg_init(s0_lh, LCK_PW);
- rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_fid, s0_lh, &s0_obj,
- einfo);
- if (rc != 0)
- GOTO(out_unlock, rc);
-
/* all attrs are packed into mti_attr in unpack_setattr */
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_SETATTR_WRITE);
mdt_dom_obj_lvb_update(info->mti_env, mo, false);
EXIT;
out_unlock:
- mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc);
- mdt_object_unlock(info, mo, lh, rc);
+ mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
return rc;
}
struct mdt_lock_handle *parent_lh;
struct mdt_lock_handle *child_lh;
struct ldlm_enqueue_info *einfo = &info->mti_einfo;
- struct lu_fid *s0_fid = &info->mti_tmp_fid2;
- struct mdt_lock_handle *s0_lh = NULL;
- struct mdt_object *s0_obj = NULL;
__u64 lock_ibits;
bool cos_incompat = false;
int no_name = 0;
if (IS_ERR(mc))
GOTO(unlock_parent, rc = PTR_ERR(mc));
- if (!cos_incompat && mdt_init_slaves(info, mc, s0_fid) > 0) {
- cos_incompat = true;
- mdt_object_put(info->mti_env, mc);
- mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
- goto relock;
+ if (!cos_incompat) {
+ rc = mdt_object_striped(info, mc);
+ if (rc < 0)
+ GOTO(unlock_parent, rc = PTR_ERR(mc));
+
+ cos_incompat = rc;
+ if (cos_incompat) {
+ mdt_object_put(info->mti_env, mc);
+ mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
+ goto relock;
+ }
}
child_lh = &info->mti_lh[MDT_LH_CHILD];
lock_ibits &= ~MDS_INODELOCK_LOOKUP;
}
- rc = mdt_reint_object_lock(info, mc, child_lh, lock_ibits,
- cos_incompat);
+ rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
+ cos_incompat);
if (rc != 0)
- GOTO(unlock_child, rc);
+ GOTO(put_child, rc);
/*
* Now we can only make sure we need MA_INODE, in mdd layer, will check
ma->ma_need = MA_INODE;
ma->ma_valid = 0;
- s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_reg_init(s0_lh, LCK_EX);
- rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_fid,
- s0_lh, &s0_obj, einfo);
- if (rc != 0)
- GOTO(unlock_child, rc);
-
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_UNLINK_WRITE);
/* save version when object is locked */
EXIT;
unlock_child:
- mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo,
- rc);
- mdt_object_unlock(info, mc, child_lh, rc);
+ mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
put_child:
mdt_object_put(info->mti_env, mc);
unlock_parent: