idx, i, PFID(&fid));
idx_array[i] = idx;
/* Set the start index for next stripe allocation */
- if (!is_specific && i < stripe_count - 1)
+ if (!is_specific && i < stripe_count - 1) {
+ /*
+ * for large dir test, put all other slaves on one
+ * remote MDT, otherwise we may save too many local
+ * slave locks which will exceed RS_MAX_LOCKS.
+ */
+ if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)))
+ idx = master_index;
idx_array[i + 1] = (idx + 1) %
(lod->lod_remote_mdt_count + 1);
+ }
/* tgt_dt and fid must be ready after search avaible OSP
* in the above loop */
LASSERT(tgt_dt != NULL);
}
/**
- * Release LDLM locks on the stripes of a striped directory.
- *
- * Iterates over all the locks taken on the stripe objects and
- * cancel them.
- *
- * \param[in] env execution environment
- * \param[in] dt striped object
- * \param[in] einfo lock description
- * \param[in] policy data describing requested lock
- *
- * \retval 0 on success
- * \retval negative if failed
- */
-static int lod_object_unlock_internal(const struct lu_env *env,
- struct dt_object *dt,
- struct ldlm_enqueue_info *einfo,
- union ldlm_policy_data *policy)
-{
- struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
- int rc = 0;
- int i;
- ENTRY;
-
- if (slave_locks == NULL)
- RETURN(0);
-
- for (i = 1; i < slave_locks->count; i++) {
- if (lustre_handle_is_used(&slave_locks->handles[i]))
- ldlm_lock_decref_and_cancel(&slave_locks->handles[i],
- einfo->ei_mode);
- }
-
- RETURN(rc);
-}
-
-/**
* Implementation of dt_object_operations::do_object_unlock.
*
* Used to release LDLM lock(s).
LASSERT(!dt_object_remote(dt_object_child(dt)));
/* locks were unlocked in MDT layer */
- for (i = 1; i < slave_locks->count; i++) {
- LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
+ for (i = 0; i < slave_locks->ha_count; i++)
+ LASSERT(!lustre_handle_is_used(&slave_locks->ha_handles[i]));
+
+ /*
+ * NB, ha_count may not equal to ldo_dir_stripe_count, because dir
+ * layout may change, e.g., shrink dir layout after migration.
+ */
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++)
dt_invalidate(env, lo->ldo_stripe[i]);
- }
- slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
- sizeof(slave_locks->handles[0]);
+ slave_locks_size = offsetof(typeof(*slave_locks),
+ ha_handles[slave_locks->ha_count]);
OBD_FREE(slave_locks, slave_locks_size);
einfo->ei_cbdata = NULL;
struct ldlm_enqueue_info *einfo,
union ldlm_policy_data *policy)
{
- struct lod_object *lo = lod_dt_obj(dt);
- int rc = 0;
- int i;
- int slave_locks_size;
+ struct lod_object *lo = lod_dt_obj(dt);
+ int slave_locks_size;
struct lustre_handle_array *slave_locks = NULL;
+ int i;
+ int rc;
ENTRY;
/* remote object lock */
}
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
- GOTO(out, rc = -ENOTDIR);
+ RETURN(-ENOTDIR);
rc = lod_load_striping(env, lo);
if (rc != 0)
- GOTO(out, rc);
+ RETURN(rc);
/* No stripes */
- if (lo->ldo_dir_stripe_count <= 1) {
- /*
- * NB, ei_cbdata stores pointer to slave locks, if no locks
- * taken, make sure it's set to NULL, otherwise MDT will try to
- * unlock them.
- */
- einfo->ei_cbdata = NULL;
- GOTO(out, rc = 0);
- }
+ if (lo->ldo_dir_stripe_count <= 1)
+ RETURN(0);
- slave_locks_size = sizeof(*slave_locks) + lo->ldo_dir_stripe_count *
- sizeof(slave_locks->handles[0]);
+ slave_locks_size = offsetof(typeof(*slave_locks),
+ ha_handles[lo->ldo_dir_stripe_count]);
/* Freed in lod_object_unlock */
OBD_ALLOC(slave_locks, slave_locks_size);
- if (slave_locks == NULL)
- GOTO(out, rc = -ENOMEM);
- slave_locks->count = lo->ldo_dir_stripe_count;
+ if (!slave_locks)
+ RETURN(-ENOMEM);
+ slave_locks->ha_count = lo->ldo_dir_stripe_count;
/* striped directory lock */
- for (i = 1; i < lo->ldo_dir_stripe_count; i++) {
- struct lustre_handle lockh;
- struct ldlm_res_id *res_id;
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ struct lustre_handle lockh;
+ struct ldlm_res_id *res_id;
res_id = &lod_env_info(env)->lti_res_id;
fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
einfo->ei_res_id = res_id;
LASSERT(lo->ldo_stripe[i] != NULL);
- if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+ if (dt_object_remote(lo->ldo_stripe[i])) {
+ set_bit(i, (void *)slave_locks->ha_map);
rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
einfo, policy);
} else {
struct ldlm_namespace *ns = einfo->ei_namespace;
ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
ldlm_completion_callback completion = einfo->ei_cb_cp;
- __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB;
if (einfo->ei_mode == LCK_PW ||
einfo->ei_mode == LCK_EX)
dlmflags |= LDLM_FL_COS_INCOMPAT;
- /* This only happens if there are mulitple stripes
- * on the master MDT, i.e. except stripe0, there are
- * other stripes on the Master MDT as well, Only
- * happens in the test case right now. */
LASSERT(ns != NULL);
rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
policy, einfo->ei_mode,
NULL, 0, LVB_T_NONE,
NULL, &lockh);
}
- if (rc != 0)
- break;
- slave_locks->handles[i] = lockh;
+ if (rc) {
+ while (i--)
+ ldlm_lock_decref_and_cancel(
+ &slave_locks->ha_handles[i],
+ einfo->ei_mode);
+ OBD_FREE(slave_locks, slave_locks_size);
+ RETURN(rc);
+ }
+ slave_locks->ha_handles[i] = lockh;
}
einfo->ei_cbdata = slave_locks;
- if (rc != 0 && slave_locks != NULL) {
- lod_object_unlock_internal(env, dt, einfo, policy);
- OBD_FREE(slave_locks, slave_locks_size);
- }
- EXIT;
-out:
- if (rc != 0)
- einfo->ei_cbdata = NULL;
- RETURN(rc);
+ RETURN(0);
}
/**
}
static int mdt_unlock_slaves(struct mdt_thread_info *mti,
- struct mdt_object *obj, __u64 ibits,
- struct mdt_lock_handle *s0_lh,
- struct mdt_object *s0_obj,
+ struct mdt_object *obj,
struct ldlm_enqueue_info *einfo,
int decref)
{
union ldlm_policy_data *policy = &mti->mti_policy;
+ struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
int i;
- int rc;
- ENTRY;
-
- if (!S_ISDIR(obj->mot_header.loh_attr))
- RETURN(0);
- /* Unlock stripe 0 */
- if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) {
- LASSERT(s0_obj != NULL);
- mdt_object_unlock_put(mti, s0_obj, s0_lh, decref);
- }
+ LASSERT(S_ISDIR(obj->mot_header.loh_attr));
+ LASSERT(slave_locks);
memset(policy, 0, sizeof(*policy));
- policy->l_inodebits.bits = ibits;
-
- if (slave_locks != NULL) {
- LASSERT(s0_lh != NULL);
- for (i = 1; i < slave_locks->count; i++) {
- /* borrow s0_lh temporarily to do mdt unlock */
- mdt_lock_reg_init(s0_lh, einfo->ei_mode);
- s0_lh->mlh_rreg_lh = slave_locks->handles[i];
- mdt_object_unlock(mti, NULL, s0_lh, decref);
- slave_locks->handles[i].cookie = 0ull;
- }
+ policy->l_inodebits.bits = einfo->ei_inodebits;
+ mdt_lock_handle_init(lh);
+ mdt_lock_reg_init(lh, einfo->ei_mode);
+ for (i = 0; i < slave_locks->ha_count; i++) {
+ if (test_bit(i, (void *)slave_locks->ha_map))
+ lh->mlh_rreg_lh = slave_locks->ha_handles[i];
+ else
+ lh->mlh_reg_lh = slave_locks->ha_handles[i];
+ mdt_object_unlock(mti, NULL, lh, decref);
+ slave_locks->ha_handles[i].cookie = 0ull;
}
- rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
- policy);
- RETURN(rc);
+ return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
+ policy);
}
-static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
- struct lu_fid *fid)
+static inline int mdt_object_striped(struct mdt_thread_info *mti,
+ struct mdt_object *obj)
{
- struct lu_buf *buf = &mti->mti_buf;
- struct lmv_mds_md_v1 *lmv;
int rc;
- ENTRY;
if (!S_ISDIR(obj->mot_header.loh_attr))
- RETURN(0);
+ return 0;
- buf->lb_buf = mti->mti_xattr_buf;
- buf->lb_len = sizeof(mti->mti_xattr_buf);
- rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), buf,
+ rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), &LU_BUF_NULL,
XATTR_NAME_LMV);
- if (rc == -ERANGE) {
- rc = mdt_big_xattr_get(mti, obj, XATTR_NAME_LMV);
- if (rc > 0) {
- buf->lb_buf = mti->mti_big_lmm;
- buf->lb_len = mti->mti_big_lmmsize;
- }
- }
-
- if (rc == -ENODATA || rc == -ENOENT)
- RETURN(0);
-
if (rc <= 0)
- RETURN(rc);
-
- lmv = buf->lb_buf;
- if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
- RETURN(-EINVAL);
+ return rc == -ENODATA ? 0 : rc;
- fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
-
- RETURN(rc);
+ return 1;
}
/**
**/
static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
enum ldlm_mode mode, __u64 ibits,
- struct lu_fid *s0_fid,
- struct mdt_lock_handle *s0_lh,
- struct mdt_object **s0_objp,
struct ldlm_enqueue_info *einfo)
{
union ldlm_policy_data *policy = &mti->mti_policy;
- int rc;
- ENTRY;
-
- memset(einfo, 0, sizeof(*einfo));
-
- rc = mdt_init_slaves(mti, obj, s0_fid);
- if (rc <= 0)
- RETURN(rc);
LASSERT(S_ISDIR(obj->mot_header.loh_attr));
- if (!lu_fid_eq(s0_fid, mdt_object_fid(obj))) {
- /* Except migrating object, whose 0_stripe and master
- * object are the same object, 0_stripe and master
- * object are different, though they are in the same
- * MDT, to avoid adding osd_object_lock here, so we
- * will enqueue the stripe0 lock in MDT0 for now */
- *s0_objp = mdt_object_find(mti->mti_env, mti->mti_mdt, s0_fid);
- if (IS_ERR(*s0_objp))
- RETURN(PTR_ERR(*s0_objp));
-
- rc = mdt_reint_object_lock(mti, *s0_objp, s0_lh, ibits, true);
- if (rc < 0) {
- mdt_object_put(mti->mti_env, *s0_objp);
- RETURN(rc);
- }
- }
-
einfo->ei_type = LDLM_IBITS;
einfo->ei_mode = mode;
einfo->ei_cb_bl = mdt_remote_blocking_ast;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_enq_slave = 1;
einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
+ einfo->ei_inodebits = ibits;
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = ibits;
- rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
- policy);
- RETURN(rc);
+ return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
+ policy);
+}
+
+static inline int mdt_reint_striped_lock(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ __u64 ibits,
+ struct ldlm_enqueue_info *einfo,
+ bool cos_incompat)
+{
+ int rc;
+
+ LASSERT(!mdt_object_remote(o));
+
+ memset(einfo, 0, sizeof(*einfo));
+
+ rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
+ if (rc)
+ return rc;
+
+ rc = mdt_object_striped(info, o);
+ if (rc != 1) {
+ if (rc < 0)
+ mdt_object_unlock(info, o, lh, rc);
+ return rc;
+ }
+
+ rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
+ if (rc) {
+ mdt_object_unlock(info, o, lh, rc);
+ if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
+ rc = 0;
+ }
+
+ return rc;
+}
+
+static inline void
+mdt_reint_striped_unlock(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo, int decref)
+{
+ if (einfo->ei_cbdata)
+ mdt_unlock_slaves(info, o, einfo, decref);
+ mdt_object_unlock(info, o, lh, decref);
}
/*
*/
if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
struct mdt_lock_handle *lhc;
- struct mdt_lock_handle *s0_lh;
- struct mdt_object *s0_obj = NULL;
- struct ldlm_enqueue_info *einfo;
- struct lu_fid *s0_fid = &info->mti_tmp_fid1;
- bool cos_incompat = false;
-
- rc = mdt_init_slaves(info, child, s0_fid);
- if (rc > 0) {
- cos_incompat = true;
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+ bool cos_incompat;
+
+ rc = mdt_object_striped(info, child);
+ if (rc < 0)
+ GOTO(put_child, rc);
+
+ cos_incompat = rc;
+ if (cos_incompat) {
if (!mdt_object_remote(parent)) {
mdt_object_unlock(info, parent, lh, 1);
mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
}
}
- einfo = &info->mti_einfo;
lhc = &info->mti_lh[MDT_LH_CHILD];
mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PW);
- rc = mdt_reint_object_lock(info, child, lhc,
- MDS_INODELOCK_UPDATE,
- cos_incompat);
+ rc = mdt_reint_striped_lock(info, child, lhc,
+ MDS_INODELOCK_UPDATE, einfo,
+ cos_incompat);
if (rc)
GOTO(put_child, rc);
- mdt_object_unlock(info, child, lhc, rc);
-
- s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_handle_init(s0_lh);
- mdt_lock_reg_init(s0_lh, LCK_PW);
- rc = mdt_lock_slaves(info, child, LCK_PW, MDS_INODELOCK_UPDATE,
- s0_fid, s0_lh, &s0_obj, einfo);
- mdt_unlock_slaves(info, child, MDS_INODELOCK_UPDATE, s0_lh,
- s0_obj, einfo, rc);
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && rc == -EIO)
- rc = 0;
+
+ mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
}
/* Return fid & attr to client. */
(LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
__u64 lockpart = MDS_INODELOCK_UPDATE;
struct ldlm_enqueue_info *einfo = &info->mti_einfo;
- struct lu_fid *s0_fid = &info->mti_tmp_fid1;
- struct mdt_lock_handle *s0_lh = NULL;
- struct mdt_object *s0_obj = NULL;
- bool cos_incompat = false;
+ bool cos_incompat;
int rc;
ENTRY;
- rc = mdt_init_slaves(info, mo, s0_fid);
- if (rc > 0)
- cos_incompat = true;
+ rc = mdt_object_striped(info, mo);
+ if (rc < 0)
+ RETURN(rc);
+
+ cos_incompat = rc;
lh = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_reg_init(lh, LCK_PW);
if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
- rc = mdt_reint_object_lock(info, mo, lh, lockpart, cos_incompat);
+ rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
+ cos_incompat);
if (rc != 0)
RETURN(rc);
- s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_reg_init(s0_lh, LCK_PW);
- rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_fid, s0_lh, &s0_obj,
- einfo);
- if (rc != 0)
- GOTO(out_unlock, rc);
-
/* all attrs are packed into mti_attr in unpack_setattr */
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_SETATTR_WRITE);
mdt_dom_obj_lvb_update(info->mti_env, mo, false);
EXIT;
out_unlock:
- mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc);
- mdt_object_unlock(info, mo, lh, rc);
+ mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
return rc;
}
struct mdt_lock_handle *parent_lh;
struct mdt_lock_handle *child_lh;
struct ldlm_enqueue_info *einfo = &info->mti_einfo;
- struct lu_fid *s0_fid = &info->mti_tmp_fid2;
- struct mdt_lock_handle *s0_lh = NULL;
- struct mdt_object *s0_obj = NULL;
__u64 lock_ibits;
bool cos_incompat = false;
int no_name = 0;
if (IS_ERR(mc))
GOTO(unlock_parent, rc = PTR_ERR(mc));
- if (!cos_incompat && mdt_init_slaves(info, mc, s0_fid) > 0) {
- cos_incompat = true;
- mdt_object_put(info->mti_env, mc);
- mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
- goto relock;
+ if (!cos_incompat) {
+ rc = mdt_object_striped(info, mc);
+ if (rc < 0)
+ GOTO(unlock_parent, rc = PTR_ERR(mc));
+
+ cos_incompat = rc;
+ if (cos_incompat) {
+ mdt_object_put(info->mti_env, mc);
+ mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
+ goto relock;
+ }
}
child_lh = &info->mti_lh[MDT_LH_CHILD];
lock_ibits &= ~MDS_INODELOCK_LOOKUP;
}
- rc = mdt_reint_object_lock(info, mc, child_lh, lock_ibits,
- cos_incompat);
+ rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
+ cos_incompat);
if (rc != 0)
- GOTO(unlock_child, rc);
+ GOTO(put_child, rc);
/*
* Now we can only make sure we need MA_INODE, in mdd layer, will check
ma->ma_need = MA_INODE;
ma->ma_valid = 0;
- s0_lh = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_reg_init(s0_lh, LCK_EX);
- rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_fid,
- s0_lh, &s0_obj, einfo);
- if (rc != 0)
- GOTO(unlock_child, rc);
-
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_UNLINK_WRITE);
/* save version when object is locked */
EXIT;
unlock_child:
- mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo,
- rc);
- mdt_object_unlock(info, mc, child_lh, rc);
+ mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
put_child:
mdt_object_put(info->mti_env, mc);
unlock_parent: