mutex_unlock(&cdt->cdt_restore_lock);
/* get the layout lock */
- mdt_lock_reg_init(&lh, LCK_EX);
obj = mdt_object_find_lock(mti, &crh->crh_fid, &lh,
- MDS_INODELOCK_LAYOUT);
+ MDS_INODELOCK_LAYOUT, LCK_EX);
if (IS_ERR(obj)) {
mutex_lock(&cdt->cdt_restore_lock);
GOTO(out_ldel, rc = PTR_ERR(obj));
/* we already have layout lock on obj so take only
* on dfid */
dlh = &mti->mti_lh[MDT_LH_OLD];
- mdt_lock_reg_init(dlh, LCK_EX);
- dobj = mdt_object_find_lock(mti, dfid, dlh, MDS_INODELOCK_LAYOUT);
+ dobj = mdt_object_find_lock(mti, dfid, dlh, MDS_INODELOCK_LAYOUT,
+ LCK_EX);
if (IS_ERR(dobj))
GOTO(out, rc = PTR_ERR(dobj));
if (!IS_ERR_OR_NULL(obj)) {
/* flush UPDATE lock so attributes are upadated */
lh = &mti->mti_lh[MDT_LH_OLD];
- mdt_lock_reg_init(lh, LCK_EX);
- mdt_object_lock(mti, obj, lh, MDS_INODELOCK_UPDATE);
+ mdt_object_lock(mti, obj, lh, MDS_INODELOCK_UPDATE,
+ LCK_EX, false);
mdt_object_unlock(mti, obj, lh, 1);
}
}
rep->lock_policy_res1 |= op_flag;
}
+/* assert lock is unlocked before reuse */
+static inline void mdt_lock_handle_assert(struct mdt_lock_handle *lh)
+{
+ LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+ LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
+ LASSERT(!lustre_handle_is_used(&lh->mlh_rreg_lh));
+}
+
void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
{
+ mdt_lock_handle_assert(lh);
lh->mlh_pdo_hash = 0;
lh->mlh_reg_mode = lm;
lh->mlh_rreg_mode = lm;
void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
const struct lu_name *lname)
{
+ mdt_lock_handle_assert(lh);
lh->mlh_reg_mode = lock_mode;
lh->mlh_pdo_mode = LCK_MINMODE;
lh->mlh_rreg_mode = lock_mode;
if (layout->mlc_opc == MD_LAYOUT_WRITE)
lockpart |= MDS_INODELOCK_UPDATE;
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, LCK_EX);
- rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
+ rc = mdt_object_lock(info, obj, lhc, lockpart, LCK_EX, false);
if (rc)
RETURN(rc);
}
GOTO(put, rc = -EPROTO);
lh1 = &info->mti_lh[MDT_LH_NEW];
- mdt_lock_reg_init(lh1, LCK_EX);
lh2 = &info->mti_lh[MDT_LH_OLD];
- mdt_lock_reg_init(lh2, LCK_EX);
-
rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, LCK_EX, false);
if (rc < 0)
GOTO(put, rc);
rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, LCK_EX, false);
if (rc < 0)
GOTO(unlock1, rc);
if (rc < 0) {
RETURN(rc);
} else if (rc > 0) {
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, LCK_PR);
-
/*
* Object's name entry is on another MDS, it will
* request PERM lock only because LOOKUP lock is owned
child_bits &= ~(MDS_INODELOCK_LOOKUP |
MDS_INODELOCK_LAYOUT);
child_bits |= MDS_INODELOCK_PERM;
-
- rc = mdt_object_lock(info, child, lhc, child_bits);
+ rc = mdt_object_lock(info, child, lhc, child_bits,
+ LCK_PR, false);
if (rc < 0)
RETURN(rc);
}
/* step 1: lock parent only if parent is a directory */
if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
lhp = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lhp, LCK_PR, lname);
- rc = mdt_object_lock(info, parent, lhp,
- MDS_INODELOCK_UPDATE);
+ rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PR,
+ false);
if (unlikely(rc != 0))
RETURN(rc);
}
if (rc < 0) {
GOTO(out_child, rc);
} else if (rc > 0) {
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, LCK_PR);
-
if (!(child_bits & MDS_INODELOCK_UPDATE) &&
!mdt_object_remote(child)) {
struct md_attr *ma = &info->mti_attr;
/* try layout lock, it may fail to be granted due to
* contention at LOOKUP or UPDATE */
rc = mdt_object_lock_try(info, child, lhc, &child_bits,
- try_bits, false);
+ try_bits, LCK_PR, false);
if (child_bits & MDS_INODELOCK_LAYOUT)
ma_need |= MA_LOV;
} else {
/* Do not enqueue the UPDATE lock from MDT(cross-MDT),
* client will enqueue the lock to the remote MDT */
if (mdt_object_remote(child))
- child_bits &= ~MDS_INODELOCK_UPDATE;
- rc = mdt_object_lock(info, child, lhc, child_bits);
+ rc = mdt_object_lookup_lock(info, NULL, child,
+ lhc, LCK_PR, false);
+ else
+ rc = mdt_object_lock(info, child, lhc,
+ child_bits, LCK_PR, false);
}
if (unlikely(rc != 0))
GOTO(out_child, rc);
if (IS_ERR(pobj))
GOTO(out, rc = PTR_ERR(pobj));
+ if (mdt_object_remote(pobj))
+ cos_incompat = true;
+
parent_lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(parent_lh, LCK_PW, name);
- rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
+ rc = mdt_parent_lock(info, pobj, parent_lh, name, LCK_PW, cos_incompat);
if (rc != 0)
GOTO(put_parent, rc);
- if (mdt_object_remote(pobj))
- cos_incompat = true;
-
rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
name, child_fid, &info->mti_spec);
if (rc != 0)
GOTO(unlock_parent, rc = -EREMCHG);
child_lh = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(child_lh, LCK_EX);
- rc = mdt_reint_striped_lock(info, obj, child_lh,
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
- einfo, cos_incompat);
+ rc = mdt_object_stripes_lock(info, pobj, obj, child_lh, einfo,
+ MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE,
+ LCK_EX, cos_incompat);
if (rc != 0)
GOTO(unlock_parent, rc);
mutex_unlock(&obj->mot_lov_mutex);
unlock_child:
- mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
+ mdt_object_stripes_unlock(info, obj, child_lh, einfo, 1);
unlock_parent:
mdt_object_unlock(info, pobj, parent_lh, 1);
put_parent:
mdt_object_get(NULL, lock->l_ast_data);
}
-int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
- struct mdt_object *o, const struct lu_fid *fid,
- struct lustre_handle *lh, enum ldlm_mode mode,
- __u64 *ibits, __u64 trybits, bool cache)
+static int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
+ struct mdt_object *obj,
+ struct lustre_handle *lh,
+ enum ldlm_mode mode,
+ union ldlm_policy_data *policy,
+ struct ldlm_res_id *res_id,
+ bool cache)
{
struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo;
- union ldlm_policy_data *policy = &mti->mti_policy;
- struct ldlm_res_id *res_id = &mti->mti_res_id;
- int rc = 0;
- ENTRY;
-
- LASSERT(mdt_object_remote(o));
+ int rc;
- fid_build_reg_res_name(fid, res_id);
+ LASSERT(mdt_object_remote(obj));
memset(einfo, 0, sizeof(*einfo));
einfo->ei_type = LDLM_IBITS;
einfo->ei_enq_slave = 0;
einfo->ei_res_id = res_id;
einfo->ei_req_slot = 1;
-
if (cache) {
/*
* if we cache lock, couple lock with mdt_object, so that object
* can be easily found in lock ASTs.
*/
- einfo->ei_cbdata = o;
+ einfo->ei_cbdata = obj;
einfo->ei_cb_created = mdt_remote_object_lock_created_cb;
}
- memset(policy, 0, sizeof(*policy));
- policy->l_inodebits.bits = *ibits;
- policy->l_inodebits.try_bits = trybits;
-
- rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
+ rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), lh, einfo,
policy);
-
- /* Return successfully acquired bits to a caller */
- if (rc == 0) {
- struct ldlm_lock *lock = ldlm_handle2lock(lh);
-
- LASSERT(lock);
- *ibits = lock->l_policy_data.l_inodebits.bits;
- LDLM_LOCK_PUT(lock);
+ if (rc) {
+ lh->cookie = 0ull;
+ return rc;
}
- RETURN(rc);
-}
-int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
- const struct lu_fid *fid, struct lustre_handle *lh,
- enum ldlm_mode mode, __u64 ibits, bool cache)
-{
- return mdt_remote_object_lock_try(mti, o, fid, lh, mode, &ibits, 0,
- cache);
+ /* other components like LFSCK can use lockless access
+ * and populate cache, so we better invalidate it
+ */
+ if (policy->l_inodebits.bits &
+ (MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR))
+ mo_invalidate(mti->mti_env, mdt_object_child(obj));
+
+ return 0;
}
-int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 *ibits,
- __u64 trybits, bool cos_incompat)
+/*
+ * Helper function to take PDO and hash lock.
+ *
+ * if \a pdo_lock is false, don't take PDO lock, this is case in rename.
+ */
+int mdt_object_pdo_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, const struct lu_name *name,
+ enum ldlm_mode mode, bool pdo_lock, bool cos_incompat)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
union ldlm_policy_data *policy = &info->mti_policy;
struct ldlm_res_id *res_id = &info->mti_res_id;
- __u64 dlmflags = 0, *cookie = NULL;
+ /*
+ * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it is never going to
+ * be sent to client and we do not want it slowed down due to possible
+ * cancels.
+ */
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+ __u64 *cookie = NULL;
int rc;
- ENTRY;
- LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
- LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
- LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
- LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+ LASSERT(obj);
+
+ /* check for exists after object is locked */
+ if (!mdt_object_exists(obj))
+ /* Non-existent object shouldn't have PDO lock */
+ return -ESTALE;
+
+ /* Non-dir object shouldn't have PDO lock */
+ if (!S_ISDIR(lu_object_attr(&obj->mot_obj)))
+ return -ENOTDIR;
if (cos_incompat) {
- LASSERT(lh->mlh_reg_mode == LCK_PW ||
- lh->mlh_reg_mode == LCK_EX);
+ LASSERT(mode == LCK_PW || mode == LCK_EX);
dlmflags |= LDLM_FL_COS_INCOMPAT;
} else if (mdt_cos_is_enabled(info->mti_mdt)) {
dlmflags |= LDLM_FL_COS_ENABLED;
}
- /* Only enqueue LOOKUP lock for remote object */
- LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP));
-
- /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
- if (lh->mlh_type == MDT_REG_LOCK && lh->mlh_reg_mode == LCK_EX &&
- *ibits == MDS_INODELOCK_OPEN)
- dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
-
- if (lh->mlh_type == MDT_PDO_LOCK) {
- /* check for exists after object is locked */
- if (mdt_object_exists(o) == 0) {
- /* Non-existent object shouldn't have PDO lock */
- RETURN(-ESTALE);
- } else {
- /* Non-dir object shouldn't have PDO lock */
- if (!S_ISDIR(lu_object_attr(&o->mot_obj)))
- RETURN(-ENOTDIR);
- }
- }
-
- fid_build_reg_res_name(mdt_object_fid(o), res_id);
- dlmflags |= LDLM_FL_ATOMIC_CB;
-
+ policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ policy->l_inodebits.try_bits = 0;
+ policy->l_inodebits.li_gid = 0;
+ fid_build_reg_res_name(mdt_object_fid(obj), res_id);
if (info->mti_exp)
cookie = &info->mti_exp->exp_handle.h_cookie;
- /*
- * Take PDO lock on whole directory and build correct @res_id for lock
- * on part of directory.
- */
- if (lh->mlh_pdo_hash != 0) {
- LASSERT(lh->mlh_type == MDT_PDO_LOCK);
- mdt_lock_pdo_mode(info, o, lh);
- if (lh->mlh_pdo_mode != LCK_NL) {
- /*
- * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
- * is never going to be sent to client and we do not
- * want it slowed down due to possible cancels.
- */
- policy->l_inodebits.bits =
- *ibits & MDS_INODELOCK_UPDATE;
- policy->l_inodebits.try_bits =
- trybits & MDS_INODELOCK_UPDATE;
- /* at least one of them should be set */
- LASSERT(policy->l_inodebits.bits |
- policy->l_inodebits.try_bits);
+ mdt_lock_pdo_init(lh, mode, name);
+ mdt_lock_pdo_mode(info, obj, lh);
+ if (lh->mlh_pdo_mode != LCK_NL) {
+ if (pdo_lock) {
rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh,
lh->mlh_pdo_mode, policy, res_id,
dlmflags, cookie);
- if (unlikely(rc != 0))
- GOTO(out_unlock, rc);
- }
+ if (rc) {
+ mdt_object_unlock(info, obj, lh, 1);
+ return rc;
+ }
+ }
+ res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
+ }
- /*
- * Finish res_id initializing by name hash marking part of
- * directory which is taking modification.
- */
- res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
- }
+ rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
+ policy, res_id, dlmflags, cookie);
+ if (rc)
+ mdt_object_unlock(info, obj, lh, 1);
+ else if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK) &&
+ lh->mlh_pdo_hash != 0 &&
+ (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+
+ return rc;
+}
+
+int mdt_object_lock_internal(struct mdt_thread_info *info,
+ struct mdt_object *obj, const struct lu_fid *fid,
+ struct mdt_lock_handle *lh, __u64 *ibits,
+ __u64 trybits, bool cache, bool cos_incompat)
+{
+ union ldlm_policy_data *policy = &info->mti_policy;
+ struct ldlm_res_id *res_id = &info->mti_res_id;
+ struct lustre_handle *handle;
+ int rc;
policy->l_inodebits.bits = *ibits;
policy->l_inodebits.try_bits = trybits;
policy->l_inodebits.li_gid = lh->mlh_gid;
+ fid_build_reg_res_name(fid, res_id);
- /*
- * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
- * going to be sent to client. If it is - mdt_intent_policy() path will
- * fix it up and turn FL_LOCAL flag off.
- */
- rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
- policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
- cookie);
-out_unlock:
- if (rc != 0)
- mdt_object_unlock(info, o, lh, 1);
- else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) &&
- lh->mlh_pdo_hash != 0 &&
- (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+ if (obj && mdt_object_remote(obj)) {
+ handle = &lh->mlh_rreg_lh;
+ LASSERT(!lustre_handle_is_used(handle));
+ LASSERT(lh->mlh_rreg_mode != LCK_MINMODE);
+ LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+ rc = mdt_remote_object_lock_try(info, obj, handle,
+ lh->mlh_rreg_mode, policy,
+ res_id, cache);
+ } else {
+ struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+ /*
+ * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if
+ * it is going to be sent to client. If it is -
+ * mdt_intent_policy() path will fix it up and turn FL_LOCAL
+ * flag off.
+ */
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB | LDLM_FL_LOCAL_ONLY;
+ __u64 *cookie = NULL;
+
+ handle = &lh->mlh_reg_lh;
+ LASSERT(!lustre_handle_is_used(handle));
+ LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
+ LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+
+ if (cos_incompat) {
+ LASSERT(lh->mlh_reg_mode == LCK_PW ||
+ lh->mlh_reg_mode == LCK_EX);
+ dlmflags |= LDLM_FL_COS_INCOMPAT;
+ } else if (mdt_cos_is_enabled(info->mti_mdt)) {
+ dlmflags |= LDLM_FL_COS_ENABLED;
+ }
+
+ /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
+ if (lh->mlh_type == MDT_REG_LOCK &&
+ lh->mlh_reg_mode == LCK_EX && *ibits == MDS_INODELOCK_OPEN)
+ dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
+
+
+ if (info->mti_exp)
+ cookie = &info->mti_exp->exp_handle.h_cookie;
+
+ rc = mdt_fid_lock(info->mti_env, ns, handle, lh->mlh_reg_mode,
+ policy, res_id, dlmflags, cookie);
+ if (rc)
+ mdt_object_unlock(info, obj, lh, 1);
+ }
- /* Return successfully acquired bits to a caller */
if (rc == 0) {
- struct ldlm_lock *lock = ldlm_handle2lock(&lh->mlh_reg_lh);
+ struct ldlm_lock *lock;
+ /* Return successfully acquired bits to a caller */
+ lock = ldlm_handle2lock(handle);
LASSERT(lock);
*ibits = lock->l_policy_data.l_inodebits.bits;
LDLM_LOCK_PUT(lock);
}
- RETURN(rc);
+
+ return rc;
}
-static int
-mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 *ibits,
- __u64 trybits, bool cos_incompat)
+/*
+ * MDT object locking functions:
+ * mdt_object_lock(): lock object, this is used in most places, and normally
+ * lock ibits doesn't contain LOOKUP, unless the caller knows it's not
+ * remote object.
+ * mdt_object_check_lock(): lock object with LOOKUP and other ibits, it needs
+ * to check whether parent is on remote MDT, if so, take LOOKUP on parent
+ * MDT separately, and then lock other ibits on child object.
+ * mdt_parent_lock(): take parent UPDATE lock with specific mode, if parent is
+ * local, take PDO lock by name hash, otherwise take regular lock.
+ * mdt_object_stripes_lock(): lock object which should be local, and if it's a
+ * striped directory, lock its stripes, this is called in operations which
+ * modify both object and stripes.
+ * mdt_object_lock_try(): lock object with trybits, the trybits contains
+ * optional inode lock bits that can be granted. This is called by
+ * getattr/open to fetch more inode lock bits to client, and is also called
+ * by dir migration to lock link parent in non-block mode to avoid
+ * deadlock.
+ */
+
+/**
+ * lock object
+ *
+ * this is used to lock object in most places, and normally lock ibits doesn't
+ * contain LOOKUP, unless the caller knows it's not remote object.
+ *
+ * \param info struct mdt_thread_info
+ * \param obj object
+ * \param lh lock handle
+ * \param ibits MDS inode lock bits
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval 0 on success, -ev on error.
+ */
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ enum ldlm_mode mode, bool cos_incompat)
{
- struct mdt_lock_handle *local_lh = NULL;
int rc;
- ENTRY;
-
- if (!mdt_object_remote(o)) {
- rc = mdt_object_local_lock(info, o, lh, ibits, trybits,
- cos_incompat);
- RETURN(rc);
- }
- /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
- *ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ ENTRY;
+ mdt_lock_reg_init(lh, mode);
+ rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh,
+ &ibits, 0, false, cos_incompat);
+ RETURN(rc);
+}
- /* Only enqueue LOOKUP lock for remote object */
- if (*ibits & MDS_INODELOCK_LOOKUP) {
- __u64 local = MDS_INODELOCK_LOOKUP;
+/**
+ * lock object with LOOKUP and other ibits
+ *
+ * it will check whether parent and child are on different MDTs, if so, take
+ * LOOKUP lock on parent MDT, and lock other ibits on child MDT, otherwise lock
+ * all ibits on child MDT. Note, parent and child shouldn't be both on remote
+ * MDTs, in which case specific lock function should be used, and it's in
+ * rename and migrate only.
+ *
+ * \param info struct mdt_thread_info
+ * \param parent parent object
+ * \param child child object
+ * \param lh lock handle
+ * \param ibits MDS inode lock bits
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval 0 on success, -ev on error.
+ */
+int mdt_object_check_lock(struct mdt_thread_info *info,
+ struct mdt_object *parent, struct mdt_object *child,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ enum ldlm_mode mode, bool cos_incompat)
+{
+ int rc;
- rc = mdt_object_local_lock(info, o, lh, &local, 0,
- cos_incompat);
- if (rc != ELDLM_OK)
+ ENTRY;
+ /* if LOOKUP ibit is not set, use mdt_object_lock() */
+ LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+ /* if only LOOKUP ibit is needed, use mdt_object_lookup_lock() */
+ LASSERT(ibits != MDS_INODELOCK_LOOKUP);
+ LASSERT(parent);
+ /* @parent and @child shouldn't both be on remote MDTs */
+ LASSERT(!(mdt_object_remote(parent) && mdt_object_remote(child)));
+
+ mdt_lock_reg_init(lh, mode);
+ if (mdt_object_remote(parent) ^ mdt_object_remote(child)) {
+ __u64 lookup_ibits = MDS_INODELOCK_LOOKUP;
+
+ rc = mdt_object_lock_internal(info, parent,
+ mdt_object_fid(child), lh,
+ &lookup_ibits, 0, false,
+ cos_incompat);
+ if (rc)
RETURN(rc);
- local_lh = lh;
- }
-
- if ((*ibits | trybits) & MDS_INODELOCK_UPDATE) {
- /* Sigh, PDO needs to enqueue 2 locks right now, but
- * enqueue RPC can only request 1 lock, to avoid extra
- * RPC, so it will instead enqueue EX lock for remote
- * object anyway XXX*/
- if (lh->mlh_type == MDT_PDO_LOCK &&
- lh->mlh_pdo_hash != 0) {
- CDEBUG(D_INFO,
- "%s: "DFID" convert PDO lock to EX lock.\n",
- mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(o)));
- lh->mlh_pdo_hash = 0;
- lh->mlh_rreg_mode = LCK_EX;
- lh->mlh_type = MDT_REG_LOCK;
- }
-
- rc = mdt_remote_object_lock_try(info, o, mdt_object_fid(o),
- &lh->mlh_rreg_lh,
- lh->mlh_rreg_mode,
- ibits, trybits, false);
- if (rc != ELDLM_OK) {
- if (local_lh != NULL)
- mdt_object_unlock(info, o, local_lh, rc);
- RETURN(rc);
- }
+ ibits &= ~MDS_INODELOCK_LOOKUP;
}
- /* other components like LFSCK can use lockless access
- * and populate cache, so we better invalidate it */
- mo_invalidate(info->mti_env, mdt_object_child(o));
+ rc = mdt_object_lock_internal(info, child, mdt_object_fid(child), lh,
+ &ibits, 0, false, cos_incompat);
+ if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
+ mdt_object_unlock(info, NULL, lh, 1);
- RETURN(0);
+ RETURN(rc);
}
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits)
+/**
+ * take parent UPDATE lock
+ *
+ * if parent is local, take PDO lock by name hash, otherwise take regular lock.
+ *
+ * \param info struct mdt_thread_info
+ * \param obj parent object
+ * \param lh lock handle
+ * \param lname child name
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval 0 on success, -ev on error.
+ */
+int mdt_parent_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, const struct lu_name *lname,
+ enum ldlm_mode mode, bool cos_incompat)
{
- return mdt_object_lock_internal(info, o, lh, &ibits, 0, false);
-}
+ int rc;
-int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits,
- bool cos_incompat)
-{
- LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX);
- return mdt_object_lock_internal(info, o, lh, &ibits, 0,
- cos_incompat);
+ ENTRY;
+ LASSERT(obj && lname);
+ if (mdt_object_remote(obj)) {
+ __u64 ibits = MDS_INODELOCK_UPDATE;
+
+ mdt_lock_reg_init(lh, mode);
+ rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj),
+ lh, &ibits, 0, false,
+ cos_incompat);
+ } else {
+ rc = mdt_object_pdo_lock(info, obj, lh, lname, mode, true,
+ cos_incompat);
+ }
+ RETURN(rc);
}
-int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+/**
+ * lock object with trybits
+ *
+ * the trybits contains optional inode lock bits that can be granted. This is
+ * called by getattr/open to fetch more inode lock bits to client, and is also
+ * called by dir migration to lock link parent in non-block mode to avoid
+ * deadlock.
+ *
+ * \param info struct mdt_thread_info
+ * \param obj object
+ * \param lh lock handle
+ * \param ibits MDS inode lock bits
+ * \param trybits optional inode lock bits
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval 0 on success, -ev on error.
+ */
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *obj,
struct mdt_lock_handle *lh, __u64 *ibits,
- __u64 trybits, bool cos_incompat)
+ __u64 trybits, enum ldlm_mode mode, bool cos_incompat)
{
bool trylock_only = *ibits == 0;
int rc;
+ ENTRY;
LASSERT(!(*ibits & trybits));
- rc = mdt_object_lock_internal(info, o, lh, ibits, trybits,
- cos_incompat);
+ mdt_lock_reg_init(lh, mode);
+ rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh, ibits,
+ trybits, false, cos_incompat);
if (rc && trylock_only) { /* clear error for try ibits lock only */
LASSERT(*ibits == 0);
rc = 0;
}
- return rc;
+ RETURN(rc);
+}
+
+/*
+ * Helper function to take \a obj LOOKUP lock.
+ *
+ * Both \a pobj and \a obj may be located on remote MDTs.
+ */
+int mdt_object_lookup_lock(struct mdt_thread_info *info,
+ struct mdt_object *pobj, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, enum ldlm_mode mode,
+ bool cos_incompat)
+{
+ __u64 ibits = MDS_INODELOCK_LOOKUP;
+ int rc;
+
+ ENTRY;
+ /* if @parent is NULL, it's on local MDT, and @child is remote,
+ * this is case in getattr/unlink/open by name.
+ */
+ LASSERT(ergo(!pobj, mdt_object_remote(obj)));
+ mdt_lock_reg_init(lh, mode);
+ rc = mdt_object_lock_internal(info, pobj, mdt_object_fid(obj), lh,
+ &ibits, 0, false, cos_incompat);
+ RETURN(rc);
}
/**
* \param mode lock mode
* \param decref force immediate lock releasing
*/
-void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
- enum ldlm_mode mode, int decref)
+static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
+ enum ldlm_mode mode, int decref)
{
struct tgt_session_info *tsi = info->mti_env->le_ses ?
tgt_ses_info(info->mti_env) : NULL;
}
struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
- const struct lu_fid *f,
- struct mdt_lock_handle *lh,
- __u64 ibits)
+ const struct lu_fid *f,
+ struct mdt_lock_handle *lh,
+ __u64 ibits, enum ldlm_mode mode)
{
- struct mdt_object *o;
+ struct mdt_object *o;
- o = mdt_object_find(info->mti_env, info->mti_mdt, f);
- if (!IS_ERR(o)) {
- int rc;
+ o = mdt_object_find(info->mti_env, info->mti_mdt, f);
+ if (!IS_ERR(o)) {
+ int rc;
- rc = mdt_object_lock(info, o, lh, ibits);
- if (rc != 0) {
- mdt_object_put(info->mti_env, o);
- o = ERR_PTR(rc);
- }
- }
- return o;
+ rc = mdt_object_lock(info, o, lh, ibits, mode, false);
+ if (rc != 0) {
+ mdt_object_put(info->mti_env, o);
+ o = ERR_PTR(rc);
+ }
+ }
+ return o;
}
void mdt_object_unlock_put(struct mdt_thread_info * info,
- struct mdt_object * o,
- struct mdt_lock_handle *lh,
- int decref)
+ struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ int decref)
{
- mdt_object_unlock(info, o, lh, decref);
- mdt_object_put(info->mti_env, o);
+ mdt_object_unlock(info, o, lh, decref);
+ mdt_object_put(info->mti_env, o);
}
/*
RETURN(rc);
}
-void mdt_lock_handle_init(struct mdt_lock_handle *lh)
-{
- lh->mlh_type = MDT_NUL_LOCK;
- lh->mlh_reg_lh.cookie = 0ull;
- lh->mlh_reg_mode = LCK_MINMODE;
- lh->mlh_pdo_lh.cookie = 0ull;
- lh->mlh_pdo_mode = LCK_MINMODE;
- lh->mlh_rreg_lh.cookie = 0ull;
- lh->mlh_rreg_mode = LCK_MINMODE;
-}
-
-void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
-{
- LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
- LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
-}
-
void mdt_thread_info_reset(struct mdt_thread_info *info)
{
memset(&info->mti_attr, 0, sizeof(info->mti_attr));
void mdt_thread_info_init(struct ptlrpc_request *req,
struct mdt_thread_info *info)
{
- int i;
-
info->mti_pill = &req->rq_pill;
- /* lock handle */
- for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
- mdt_lock_handle_init(&info->mti_lh[i]);
-
/* mdt device: it can be NULL while CONNECT */
if (req->rq_export) {
info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
}
for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
- mdt_lock_handle_fini(&info->mti_lh[i]);
+ mdt_lock_handle_assert(&info->mti_lh[i]);
info->mti_env = NULL;
info->mti_pill = NULL;
info->mti_exp = NULL;
*/
mdt_intent_fixup_resent(info, *lockp, lhc, flags);
if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
- mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
rc = mdt_object_lock(info, info->mti_object, lhc,
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, (*lockp)->l_req_mode,
+ false);
if (rc)
return rc;
}
CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid));
- lh = &mti->mti_lh[MDT_LH_PARENT];
- mdt_lock_reg_init(lh, LCK_CR);
-
- obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE);
- if (IS_ERR(obj))
- RETURN(PTR_ERR(obj));
+ lh = &mti->mti_lh[MDT_LH_PARENT];
+ obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE, LCK_CR);
+ if (IS_ERR(obj))
+ RETURN(PTR_ERR(obj));
if (mdt_object_remote(obj)) {
rc = -EREMOTE;
GOTO(out, rc = err_serious(rc));
lh = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(lh, LCK_PR);
- rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LOOKUP);
+ rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LOOKUP, LCK_PR,
+ false);
if (rc < 0)
GOTO(out_ucred, rc);
GOTO(out, rc = err_serious(rc));
lh = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(lh, LCK_PW);
rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LOOKUP |
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, LCK_PW, false);
if (rc < 0)
GOTO(out_ucred, rc);
MDT_LH_NEW, /* new lockh for rename */
MDT_LH_RMT, /* used for return lh to caller */
MDT_LH_LOCAL, /* local lock never return to client */
+ MDT_LH_LOOKUP, /* lookup lock for source object in rename/migrate if
+ * it's remote object */
MDT_LH_NR
};
int mdt_check_resent_lock(struct mdt_thread_info *info, struct mdt_object *mo,
struct mdt_lock_handle *lhc);
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *mo,
- struct mdt_lock_handle *lh, __u64 ibits);
-
-int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ enum ldlm_mode mode, bool cos_incompat);
+int mdt_parent_lock(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, const struct lu_name *lname,
+ enum ldlm_mode mode, bool cos_incompat);
+int mdt_object_stripes_lock(struct mdt_thread_info *info,
+ struct mdt_object *pobj, struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo, __u64 ibits,
+ enum ldlm_mode mode, bool cos_incompat);
+int mdt_object_check_lock(struct mdt_thread_info *info,
+ struct mdt_object *parent, struct mdt_object *child,
struct mdt_lock_handle *lh, __u64 ibits,
- bool cos_incompat);
-
+ enum ldlm_mode mode, bool cos_incompat);
int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *mo,
struct mdt_lock_handle *lh, __u64 *ibits,
- __u64 trybits, bool cos_incompat);
+ __u64 trybits, enum ldlm_mode mode, bool cos_incompat);
+
+/* below three lock functions are used internally */
+int mdt_object_lock_internal(struct mdt_thread_info *info,
+ struct mdt_object *obj, const struct lu_fid *fid,
+ struct mdt_lock_handle *lh, __u64 *ibits,
+ __u64 trybits, bool cache, bool cos_incompat);
+int mdt_object_pdo_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, const struct lu_name *name,
+ enum ldlm_mode mode, bool pdo_lock, bool cos_incompat);
+int mdt_object_lookup_lock(struct mdt_thread_info *info,
+ struct mdt_object *pobj, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, enum ldlm_mode mode,
+ bool cos_incompat);
void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *mo,
struct mdt_lock_handle *lh, int decref);
-void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
- enum ldlm_mode mode, int decref);
+void mdt_object_stripes_unlock(struct mdt_thread_info *info,
+ struct mdt_object *o, struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo, int decref);
struct mdt_object *mdt_object_new(const struct lu_env *env,
struct mdt_device *,
struct mdt_object *mdt_object_find(const struct lu_env *,
struct mdt_device *,
const struct lu_fid *);
-struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *,
- const struct lu_fid *,
- struct mdt_lock_handle *,
- __u64);
-void mdt_object_unlock_put(struct mdt_thread_info *,
- struct mdt_object *,
- struct mdt_lock_handle *,
- int decref);
-
-void mdt_client_compatibility(struct mdt_thread_info *info);
-
-int mdt_remote_object_lock(struct mdt_thread_info *mti,
- struct mdt_object *o, const struct lu_fid *fid,
- struct lustre_handle *lh,
- enum ldlm_mode mode, __u64 ibits, bool cache);
-int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 *ibits,
- __u64 trybits, bool cos_incompat);
-int mdt_reint_striped_lock(struct mdt_thread_info *info,
+struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
+ const struct lu_fid *f,
+ struct mdt_lock_handle *lh,
+ __u64 ibits, enum ldlm_mode mode);
+void mdt_object_unlock_put(struct mdt_thread_info *info,
struct mdt_object *o,
struct mdt_lock_handle *lh,
- __u64 ibits,
- struct ldlm_enqueue_info *einfo,
- bool cos_incompat);
-void mdt_reint_striped_unlock(struct mdt_thread_info *info,
- struct mdt_object *o,
- struct mdt_lock_handle *lh,
- struct ldlm_enqueue_info *einfo, int decref);
+ int decref);
+
+void mdt_client_compatibility(struct mdt_thread_info *info);
enum mdt_name_flags {
MNF_FIX_ANON = 1,
int mdt_reint_setxattr(struct mdt_thread_info *info,
struct mdt_lock_handle *lh);
-void mdt_lock_handle_init(struct mdt_lock_handle *lh);
-void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
-
void mdt_reconstruct(struct mdt_thread_info *, struct mdt_lock_handle *);
void mdt_reconstruct_generic(struct mdt_thread_info *mti,
struct mdt_lock_handle *lhc);
mdt_intent_fixup_resent(mti, *lockp, lhc, flags);
/* resent case */
if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
- mdt_lock_handle_init(lhc);
+ __u64 ibits = MDS_INODELOCK_DOM;
+
mdt_lh_reg_init(lhc, *lockp);
/* This will block MDT thread but it should be fine until
* return ELDLM_OK here and fall back into normal lock enqueue
* process.
*/
- rc = mdt_object_lock(mti, mo, lhc, MDS_INODELOCK_DOM);
+ rc = mdt_object_lock_internal(mti, mo, mdt_object_fid(mo), lhc,
+ &ibits, 0, false, false);
if (rc)
GOTO(out, rc);
}
ENTRY;
*ibits = 0;
- mdt_lock_handle_init(lhc);
-
if (req_is_replay(mdt_info_req(info)))
RETURN(0);
atomic_read(&obj->mot_lease_count), lm);
}
- mdt_lock_reg_init(lhc, lm);
-
/* Return lookup lock to validate inode at the client side.
* This is pretty important otherwise MDT will return layout
* lock for each open.
}
if (*ibits | trybits)
- rc = mdt_object_lock_try(info, obj, lhc, ibits, trybits, false);
+ rc = mdt_object_lock_try(info, obj, lhc, ibits, trybits, lm,
+ false);
CDEBUG(D_INODE, "%s: Requested bits lock:"DFID ", ibits = %#llx/%#llx"
", open_flags = %#llo, try_layout = %d : rc = %d\n",
mdt_object_unlock(info, obj, lhc, 1);
LASSERT(!try_layout);
- mdt_lock_handle_init(ll);
- mdt_lock_reg_init(ll, LCK_EX);
- rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT);
+ rc = mdt_object_lock(info, obj, ll, MDS_INODELOCK_LAYOUT,
+ LCK_EX, false);
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LL_BLOCK, 2);
}
struct mdt_device *mdt)
{
struct mdt_object *md_root = mdt->mdt_md_root;
- struct lustre_handle lhroot;
+ struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+ __u64 ibits = MDS_INODELOCK_XATTR;
int rc;
if (md_root == NULL) {
if (md_root->mot_cache_attr || !mdt_object_remote(md_root))
return 0;
- rc = mdt_remote_object_lock(info, md_root, mdt_object_fid(md_root),
- &lhroot, LCK_PR, MDS_INODELOCK_XATTR,
- true);
+ mdt_lock_reg_init(lh, LCK_PR);
+ rc = mdt_object_lock_internal(info, md_root, mdt_object_fid(md_root),
+ lh, &ibits, 0, true, false);
if (rc < 0)
return rc;
md_root->mot_cache_attr = 1;
/* don't cancel this lock, so that we know the cached xattr is valid. */
- ldlm_lock_decref(&lhroot, LCK_PR);
+ ldlm_lock_decref(&lh->mlh_rreg_lh, LCK_PR);
+ lh->mlh_rreg_lh.cookie = 0ull;
return 0;
}
again_pw:
if (lock_mode != LCK_NL) {
lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lh, lock_mode, &rr->rr_name);
- result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+ result = mdt_parent_lock(info, parent, lh, &rr->rr_name,
+ lock_mode, false);
if (result != 0)
GOTO(out_parent, result);
/* unlink vs create race: get write lock and restart */
mdt_object_unlock(info, parent, lh, 1);
mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
- mdt_lock_handle_init(lh);
lock_mode = LCK_PW;
goto again_pw;
}
LASSERT(lhc != NULL);
rc = mdt_check_resent_lock(info, child, lhc);
- if (rc < 0) {
+ if (rc < 0)
GOTO(out_child, result = rc);
- } else if (rc > 0) {
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, LCK_PR);
-
- rc = mdt_object_lock(info, child, lhc,
- MDS_INODELOCK_LOOKUP);
- }
+ else if (rc > 0)
+ rc = mdt_object_lookup_lock(info, NULL, child,
+ lhc, LCK_PR, false);
repbody->mbo_fid1 = *mdt_object_fid(child);
repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
if (rc != 0)
lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
ma->ma_hsm.mh_flags &= ~HS_RELEASED;
- mdt_lock_reg_init(lh, LCK_EX);
rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, LCK_EX, false);
if (rc != 0)
GOTO(out_close, rc);
if (lease_broken)
GOTO(out_unlock_sem, rc = -ESTALE);
- mdt_lock_reg_init(lh1, LCK_EX);
rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, LCK_EX, false);
if (rc < 0)
GOTO(out_unlock_sem, rc);
if (o2) {
- mdt_lock_reg_init(lh2, LCK_EX);
rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, LCK_EX, false);
if (rc < 0)
GOTO(out_unlock1, rc);
}
}
-static int mdt_unlock_slaves(struct mdt_thread_info *mti,
- struct mdt_object *obj,
- struct ldlm_enqueue_info *einfo,
- int decref)
+static int mdt_stripes_unlock(struct mdt_thread_info *mti,
+ struct mdt_object *obj,
+ struct ldlm_enqueue_info *einfo,
+ int decref)
{
union ldlm_policy_data *policy = &mti->mti_policy;
struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
- struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+ struct lustre_handle_array *locks = einfo->ei_cbdata;
int i;
LASSERT(S_ISDIR(obj->mot_header.loh_attr));
- LASSERT(slave_locks);
+ LASSERT(locks);
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = einfo->ei_inodebits;
- mdt_lock_handle_init(lh);
mdt_lock_reg_init(lh, einfo->ei_mode);
- for (i = 0; i < slave_locks->ha_count; i++) {
- if (test_bit(i, (void *)slave_locks->ha_map))
- lh->mlh_rreg_lh = slave_locks->ha_handles[i];
+ for (i = 0; i < locks->ha_count; i++) {
+ if (test_bit(i, (void *)locks->ha_map))
+ lh->mlh_rreg_lh = locks->ha_handles[i];
else
- lh->mlh_reg_lh = slave_locks->ha_handles[i];
+ lh->mlh_reg_lh = locks->ha_handles[i];
mdt_object_unlock(mti, NULL, lh, decref);
- slave_locks->ha_handles[i].cookie = 0ull;
+ locks->ha_handles[i].cookie = 0ull;
}
return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
* Lock slave stripes if necessary, the lock handles of slave stripes
* will be stored in einfo->ei_cbdata.
**/
-static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
- enum ldlm_mode mode, __u64 ibits,
- struct ldlm_enqueue_info *einfo)
+static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
+ enum ldlm_mode mode, __u64 ibits,
+ struct ldlm_enqueue_info *einfo)
{
union ldlm_policy_data *policy = &mti->mti_policy;
LASSERT(S_ISDIR(obj->mot_header.loh_attr));
-
einfo->ei_type = LDLM_IBITS;
einfo->ei_mode = mode;
einfo->ei_cb_bl = mdt_remote_blocking_ast;
policy);
}
-int mdt_reint_striped_lock(struct mdt_thread_info *info,
- struct mdt_object *o,
- struct mdt_lock_handle *lh,
- __u64 ibits,
- struct ldlm_enqueue_info *einfo,
- bool cos_incompat)
+/** lock object, and stripes if it's a striped directory
+ *
+ * object should be local, this is called in operations which modify both object
+ * and stripes.
+ *
+ * \param info struct mdt_thread_info
+ * \param parent parent object, if it's NULL, find parent by mdo_lookup()
+ * \param child child object
+ * \param lh lock handle
+ * \param einfo struct ldlm_enqueue_info
+ * \param ibits MDS inode lock bits
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval 0 on success, -ev on error.
+ */
+int mdt_object_stripes_lock(struct mdt_thread_info *info,
+ struct mdt_object *parent,
+ struct mdt_object *child,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo, __u64 ibits,
+ enum ldlm_mode mode, bool cos_incompat)
{
int rc;
- LASSERT(!mdt_object_remote(o));
+ ENTRY;
+ /* according to the protocol, child should be local, is request sent to
+ * wrong MDT.
+ */
+ if (mdt_object_remote(child)) {
+ CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
+ -EREMOTE);
+ RETURN(-EREMOTE);
+ }
memset(einfo, 0, sizeof(*einfo));
-
- rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
- if (rc)
- return rc;
-
- rc = mdt_object_striped(info, o);
- if (rc != 1) {
- if (rc < 0)
- mdt_object_unlock(info, o, lh, rc);
- return rc;
+ if (ibits & MDS_INODELOCK_LOOKUP) {
+ LASSERT(parent);
+ rc = mdt_object_check_lock(info, parent, child, lh, ibits,
+ mode, cos_incompat);
+ } else {
+ rc = mdt_object_lock(info, child, lh, ibits, mode,
+ cos_incompat);
}
+ if (rc)
+ RETURN(rc);
- rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
- if (rc) {
- mdt_object_unlock(info, o, lh, rc);
- if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
- rc = 0;
+ if (S_ISDIR(child->mot_header.loh_attr)) {
+ rc = mdt_stripes_lock(info, child, mode, ibits, einfo);
+ if (rc) {
+ mdt_object_unlock(info, child, lh, rc);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
+ rc == -EIO)
+ rc = 0;
+ }
}
- return rc;
+ RETURN(rc);
}
-void mdt_reint_striped_unlock(struct mdt_thread_info *info,
- struct mdt_object *o,
+void mdt_object_stripes_unlock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
struct mdt_lock_handle *lh,
struct ldlm_enqueue_info *einfo, int decref)
{
+ /* this is checked in mdt_object_stripes_lock() */
+ LASSERT(!mdt_object_remote(obj));
if (einfo->ei_cbdata)
- mdt_unlock_slaves(info, o, einfo, decref);
- mdt_object_unlock(info, o, lh, decref);
+ mdt_stripes_unlock(info, obj, einfo, decref);
+ mdt_object_unlock(info, obj, lh, decref);
}
static int mdt_restripe(struct mdt_thread_info *info,
RETURN(rc);
lhp = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lhp, LCK_PW, lname);
- rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
- true);
+ rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW, true);
if (rc)
RETURN(rc);
/* lock object */
lhc = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(lhc, LCK_EX);
-
- /* enqueue object remote LOOKUP lock */
- if (mdt_object_remote(parent)) {
- rc = mdt_remote_object_lock(info, parent, fid,
- &lhc->mlh_rreg_lh,
- lhc->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP, false);
- if (rc != ELDLM_OK)
- GOTO(out_child, rc);
- }
-
- rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
- true);
+ rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
+ MDS_INODELOCK_FULL, LCK_PW, true);
if (rc)
GOTO(unlock_child, rc);
restriping_clear:
child->mot_restriping = 0;
unlock_child:
- mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+ mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
out_child:
mdt_object_put(info->mti_env, child);
unlock_parent:
OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
- rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+ rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW, false);
if (rc)
GOTO(put_parent, rc);
if (cos_incompat) {
if (!mdt_object_remote(parent)) {
mdt_object_unlock(info, parent, lh, 1);
- mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
- rc = mdt_reint_object_lock(info, parent, lh,
- MDS_INODELOCK_UPDATE,
- true);
+ rc = mdt_parent_lock(info, parent, lh,
+ &rr->rr_name, LCK_PW,
+ true);
if (rc)
GOTO(put_child, rc);
}
}
lhc = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, LCK_PW);
- rc = mdt_reint_striped_lock(info, child, lhc,
- MDS_INODELOCK_UPDATE, einfo,
- cos_incompat);
+ rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
+ MDS_INODELOCK_UPDATE, LCK_PW,
+ cos_incompat);
if (rc)
GOTO(put_child, rc);
- mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+ mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
}
/* Return fid & attr to client. */
rc = mdt_object_striped(info, mo);
if (rc < 0)
RETURN(rc);
-
cos_incompat = rc;
- lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_reg_init(lh, LCK_PW);
-
- /* Even though the new MDT will grant PERM lock to the old
- * client, but the old client will almost ignore that during
- * So it needs to revoke both LOOKUP and PERM lock here, so
- * both new and old client can cancel the dcache
- */
if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
- lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
+ lockpart |= MDS_INODELOCK_PERM;
/* Clear xattr cache on clients, so the virtual project ID xattr
* can get the new project ID
*/
if (ma->ma_attr.la_valid & LA_PROJID)
lockpart |= MDS_INODELOCK_XATTR;
- rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
- cos_incompat);
+ lh = &info->mti_lh[MDT_LH_PARENT];
+ rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
+ LCK_PW, cos_incompat);
if (rc != 0)
RETURN(rc);
mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
EXIT;
out_unlock:
- mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
+ mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
return rc;
}
if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
lhc = &info->mti_lh[MDT_LH_LOCAL];
- mdt_lock_reg_init(lhc, LCK_CW);
-
- rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
+ rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
+ LCK_CW, false);
if (rc != 0) {
up_read(&mo->mot_open_sem);
GOTO(out_put, rc);
struct lu_ucred *uc = mdt_ucred(info);
struct mdt_lock_handle *lh;
const char *name;
- __u64 lockpart = MDS_INODELOCK_XATTR;
/* reject if either remote or striped dir is disabled */
if (ma->ma_valid & MA_LMV) {
GOTO(out_put, rc = -EPROTO);
lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_reg_init(lh, LCK_PW);
-
if (ma->ma_valid & MA_LOV) {
buf->lb_buf = ma->ma_lmm;
buf->lb_len = ma->ma_lmm_size;
name = XATTR_NAME_LOV;
+ rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
+ LCK_PW, false);
} else {
- struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
- struct lu_fid *pfid = &info->mti_tmp_fid1;
- struct lu_name *pname = &info->mti_name;
- const char dotdot[] = "..";
- struct mdt_object *pobj;
-
- buf->lb_buf = lmu;
+ buf->lb_buf = &ma->ma_lmv->lmv_user_md;
buf->lb_len = ma->ma_lmv_size;
name = XATTR_NAME_DEFAULT_LMV;
- if (fid_is_root(rr->rr_fid1)) {
- lockpart |= MDS_INODELOCK_LOOKUP;
+ if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
+ rc = mdt_object_lock(info, mo, lh,
+ MDS_INODELOCK_XATTR |
+ MDS_INODELOCK_LOOKUP,
+ LCK_PW, false);
} else {
- /* force client to update dir default layout */
+ struct lu_fid *pfid = &info->mti_tmp_fid1;
+ struct lu_name *pname = &info->mti_name;
+ const char dotdot[] = "..";
+ struct mdt_object *pobj;
+
fid_zero(pfid);
pname->ln_name = dotdot;
pname->ln_namelen = sizeof(dotdot);
if (rc)
GOTO(out_put, rc);
- pobj = mdt_object_find(info->mti_env, mdt,
- pfid);
+ pobj = mdt_object_find(info->mti_env,
+ info->mti_mdt, pfid);
if (IS_ERR(pobj))
GOTO(out_put, rc = PTR_ERR(pobj));
- if (mdt_object_remote(pobj))
- rc = mdt_remote_object_lock(info, pobj,
- mdt_object_fid(mo),
- &lh->mlh_rreg_lh, LCK_EX,
- MDS_INODELOCK_LOOKUP, false);
- else
- lockpart |= MDS_INODELOCK_LOOKUP;
-
+ rc = mdt_object_check_lock(info, pobj, mo, lh,
+ MDS_INODELOCK_XATTR |
+ MDS_INODELOCK_LOOKUP,
+ LCK_PW, false);
mdt_object_put(info->mti_env, pobj);
-
- if (rc)
- GOTO(out_put, rc);
}
}
- rc = mdt_object_lock(info, mo, lh, lockpart);
if (rc != 0)
GOTO(out_put, rc);
struct mdt_lock_handle *child_lh;
struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
struct lu_ucred *uc = mdt_ucred(info);
- __u64 lock_ibits;
bool cos_incompat = false;
int no_name = 0;
ktime_t kstart = ktime_get();
OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
relock:
parent_lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
- rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
- cos_incompat);
+ rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW,
+ cos_incompat);
if (rc != 0)
GOTO(put_parent, rc);
}
child_lh = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(child_lh, LCK_EX);
if (mdt_object_remote(mc)) {
struct mdt_body *repbody;
* would happen if another client try to grab the LOOKUP
* lock at the same time with unlink XXX
*/
- mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
+ rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX,
+ false);
+ if (rc)
+ GOTO(put_child, rc);
+
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(repbody != NULL);
repbody->mbo_fid1 = *mdt_object_fid(mc);
* this now because a running HSM restore on the child (unlink
* victim) will hold the layout lock. See LU-4002.
*/
- lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
- if (mdt_object_remote(mp)) {
- /* Enqueue lookup lock from parent MDT */
- rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
- &child_lh->mlh_rreg_lh,
- child_lh->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP, false);
- if (rc != ELDLM_OK)
- GOTO(put_child, rc);
-
- lock_ibits &= ~MDS_INODELOCK_LOOKUP;
- }
-
- rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
- cos_incompat);
+ rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
+ MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE,
+ LCK_EX, cos_incompat);
if (rc != 0)
GOTO(put_child, rc);
EXIT;
unlock_child:
- mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
+ mdt_object_stripes_unlock(info, mc, child_lh, einfo, rc);
put_child:
if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
info->mti_big_buf.lb_buf)
OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
lhp = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
- rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
- cos_incompat);
+ rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW, cos_incompat);
if (rc != 0)
GOTO(put_source, rc);
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
lhs = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(lhs, LCK_EX);
- rc = mdt_reint_object_lock(info, ms, lhs,
- MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
- cos_incompat);
+ rc = mdt_object_lock(info, ms, lhs,
+ MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, LCK_EX,
+ cos_incompat);
if (rc != 0)
GOTO(unlock_parent, rc);
mdt_object_put(info->mti_env, mp);
return rc;
}
-/**
- * lock the part of the directory according to the hash of the name
- * (lh->mlh_pdo_hash) in parallel directory lock.
- */
-static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
- struct mdt_lock_handle *lh,
- struct mdt_object *obj, __u64 ibits,
- bool cos_incompat)
-{
- struct ldlm_res_id *res = &info->mti_res_id;
- struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
- union ldlm_policy_data *policy = &info->mti_policy;
- __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
- int rc;
-
- /*
- * Finish res_id initializing by name hash marking part of
- * directory which is taking modification.
- */
- LASSERT(lh->mlh_pdo_hash != 0);
- fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
- memset(policy, 0, sizeof(*policy));
- policy->l_inodebits.bits = ibits;
- if (cos_incompat &&
- (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
- dlmflags |= LDLM_FL_COS_INCOMPAT;
- /*
- * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
- * going to be sent to client. If it is - mdt_intent_policy() path will
- * fix it up and turn FL_LOCAL flag off.
- */
- rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
- policy, res, dlmflags,
- &info->mti_exp->exp_handle.h_cookie);
- return rc;
-}
/**
* Get BFL lock for rename or migrate process.
**/
static int mdt_rename_lock(struct mdt_thread_info *info,
- struct lustre_handle *lh)
+ struct mdt_lock_handle *lh)
{
- int rc;
+ struct lu_fid *fid = &info->mti_tmp_fid1;
+ struct mdt_object *obj;
+ __u64 ibits = MDS_INODELOCK_UPDATE;
+ int rc;
ENTRY;
- if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
- struct lu_fid *fid = &info->mti_tmp_fid1;
- struct mdt_object *obj;
+ lu_root_fid(fid);
+ obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+ if (IS_ERR(obj))
+ RETURN(PTR_ERR(obj));
- /* XXX, right now, it has to use object API to
- * enqueue lock cross MDT, so it will enqueue
- * rename lock(with LUSTRE_BFL_FID) by root object
- */
- lu_root_fid(fid);
- obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
- if (IS_ERR(obj))
- RETURN(PTR_ERR(obj));
-
- rc = mdt_remote_object_lock(info, obj,
- &LUSTRE_BFL_FID, lh,
- LCK_EX,
- MDS_INODELOCK_UPDATE, false);
- mdt_object_put(info->mti_env, obj);
- } else {
- struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
- union ldlm_policy_data *policy = &info->mti_policy;
- struct ldlm_res_id *res_id = &info->mti_res_id;
- __u64 flags = 0;
-
- fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
- memset(policy, 0, sizeof(*policy));
- policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
- flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
- rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
- LDLM_IBITS, policy, LCK_EX, &flags,
- ldlm_blocking_ast,
- ldlm_completion_ast, NULL, NULL, 0,
- LVB_T_NONE,
- &info->mti_exp->exp_handle.h_cookie,
- lh);
- RETURN(rc);
- }
+ mdt_lock_reg_init(lh, LCK_EX);
+ rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
+ &ibits, 0, false, false);
+ mdt_object_put(info->mti_env, obj);
RETURN(rc);
}
-static void mdt_rename_unlock(struct lustre_handle *lh)
+static void mdt_rename_unlock(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lh)
{
ENTRY;
- LASSERT(lustre_handle_is_used(lh));
/* Cancel the single rename lock right away */
- ldlm_lock_decref_and_cancel(lh, LCK_EX);
+ mdt_object_unlock(info, NULL, lh, 1);
EXIT;
}
struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
int rc;
- mdt_lock_handle_init(lh);
- mdt_lock_reg_init(lh, LCK_EX);
-
- if (mdt_object_remote(pobj)) {
- /* don't bother to check if pobj and obj are on the same MDT. */
- rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
- &lh->mlh_rreg_lh, LCK_EX,
- MDS_INODELOCK_LOOKUP, false);
- } else if (mdt_object_remote(obj)) {
- struct ldlm_res_id *res = &info->mti_res_id;
- union ldlm_policy_data *policy = &info->mti_policy;
- __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
- LDLM_FL_COS_INCOMPAT;
-
- fid_build_reg_res_name(mdt_object_fid(obj), res);
- memset(policy, 0, sizeof(*policy));
- policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
- rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
- &lh->mlh_reg_lh, LCK_EX, policy, res,
- dlmflags, NULL);
- } else {
- /* do nothing if both are local */
- return 0;
- }
-
- if (rc != ELDLM_OK)
+ rc = mdt_object_lookup_lock(info, pobj, obj, lh, LCK_EX, true);
+ if (rc)
return rc;
/*
mdt_unlock_list(info, slave_locks, decref);
mdt_object_unlock(info, obj, lh, decref);
} else {
- mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
+ mdt_object_stripes_unlock(info, obj, lh, einfo, decref);
}
}
* one, and continue processing the remaining entries, and in
* the end of the loop restart from beginning.
*/
- mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
ibits = 0;
rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
- MDS_INODELOCK_UPDATE, true);
+ MDS_INODELOCK_UPDATE, LCK_PW, true);
if (!(ibits & MDS_INODELOCK_UPDATE)) {
CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
blocked = true;
- mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
- MDS_INODELOCK_UPDATE);
+ MDS_INODELOCK_UPDATE, LCK_PW,
+ true);
if (rc) {
mdt_object_put(info->mti_env, lnkp);
OBD_FREE_PTR(msl);
GOTO(out, rc = -ENOMEM);
}
- mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
- rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
- MDS_INODELOCK_UPDATE, true);
+ rc = mdt_object_lock(info, slave, &msl->msl_lh,
+ MDS_INODELOCK_UPDATE, LCK_EX, true);
if (rc) {
OBD_FREE_PTR(msl);
mdt_object_put(info->mti_env, slave);
int rc;
if (mdt_object_remote(obj)) {
- rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
- &lh->mlh_rreg_lh, LCK_PW,
- MDS_INODELOCK_UPDATE, false);
- if (rc != ELDLM_OK)
+ rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
+ LCK_PW, true);
+ if (rc)
return rc;
/*
mdt_object_unlock(info, obj, lh, rc);
}
} else {
- rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
- einfo, true);
+ rc = mdt_object_stripes_lock(info, NULL, obj, lh, einfo,
+ MDS_INODELOCK_UPDATE, LCK_PW,
+ true);
}
return rc;
if (rc)
return rc;
- rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
- &lh->mlh_rreg_lh, LCK_EX,
- MDS_INODELOCK_FULL, false);
- if (rc != ELDLM_OK)
+ rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_FULL, LCK_EX,
+ true);
+ if (rc)
return rc;
/*
}
}
} else {
- if (mdt_object_remote(pobj)) {
- rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
- if (rc)
- return rc;
- }
-
- rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
- einfo, true);
+ rc = mdt_object_stripes_lock(info, pobj, obj, lh, einfo,
+ MDS_INODELOCK_FULL, LCK_EX, true);
}
return rc;
struct mdt_object *spobj = NULL;
struct mdt_object *sobj = NULL;
struct mdt_object *tobj;
- struct lustre_handle rename_lh = { 0 };
+ struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
struct mdt_lock_handle *lhp;
struct mdt_lock_handle *lhs;
struct mdt_lock_handle *lht;
* req is NULL if this is called by directory auto-split.
*/
if (req && !req_is_replay(req)) {
- rc = mdt_rename_lock(info, &rename_lh);
+ rc = mdt_rename_lock(info, rename_lh);
if (rc != 0) {
CERROR("%s: can't lock FS for rename: rc = %d\n",
mdt_obd_name(info->mti_mdt), rc);
lock_parent:
/* lock parent object */
lhp = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_reg_init(lhp, LCK_PW);
rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
&parent_slave_locks);
if (rc)
/* lock source */
lhs = &info->mti_lh[MDT_LH_OLD];
- mdt_lock_reg_init(lhs, LCK_EX);
rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
&child_slave_locks);
if (rc)
GOTO(unlock_source, rc = PTR_ERR(tobj));
lht = &info->mti_lh[MDT_LH_NEW];
- mdt_lock_reg_init(lht, LCK_EX);
- rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
+ rc = mdt_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, LCK_EX, true);
if (rc)
GOTO(put_target, rc);
put_parent:
mdt_object_put(env, pobj);
unlock_rename:
- if (lustre_handle_is_used(&rename_lh))
- mdt_rename_unlock(&rename_lh);
+ mdt_rename_unlock(info, rename_lh);
return rc;
}
-static int mdt_object_lock_save(struct mdt_thread_info *info,
- struct mdt_object *dir,
- struct mdt_lock_handle *lh,
- int idx, bool cos_incompat)
-{
- int rc;
-
- /* we lock the target dir if it is local */
- rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
- cos_incompat);
- if (rc != 0)
- return rc;
-
- /* get and save correct version after locking */
- mdt_version_get_save(info, dir, idx);
- return 0;
-}
-
/*
* determine lock order of sobj and tobj
*
if (rc < 0)
return rc;
- if (rc) {
- /* enqueue remote LOOKUP lock from the parent MDT */
- __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
-
- if (mdt_object_remote(parent)) {
- rc = mdt_remote_object_lock(info, parent,
- mdt_object_fid(child),
- &lhr->mlh_rreg_lh,
- lhr->mlh_rreg_mode,
- rmt_ibits, false);
- if (rc != ELDLM_OK)
- return rc;
- } else {
- LASSERT(mdt_object_remote(child));
- rc = mdt_object_local_lock(info, child, lhr,
- &rmt_ibits, 0, true);
- if (rc < 0)
- return rc;
- }
+ if (rc == 1) {
+ rc = mdt_object_lookup_lock(info, parent, child, lhr, LCK_EX,
+ cos_incompat);
+ if (rc)
+ return rc;
ibits &= ~MDS_INODELOCK_LOOKUP;
}
- if (mdt_object_remote(child)) {
- rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
- &lhc->mlh_rreg_lh,
- lhc->mlh_rreg_mode,
- ibits, false);
- if (rc == ELDLM_OK)
- rc = 0;
- } else {
- rc = mdt_reint_object_lock(info, child, lhc, ibits,
- cos_incompat);
- }
-
- if (!rc)
- mdt_object_unlock(info, child, lhr, rc);
+ rc = mdt_object_lock(info, child, lhc, ibits, LCK_EX, cos_incompat);
+ if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
+ mdt_object_unlock(info, NULL, lhr, rc);
return rc;
}
static int mdt_lock_two_dirs(struct mdt_thread_info *info,
struct mdt_object *mfirstdir,
struct mdt_lock_handle *lh_firstdirp,
+ const struct lu_name *firstname,
struct mdt_object *mseconddir,
struct mdt_lock_handle *lh_seconddirp,
+ const struct lu_name *secondname,
bool cos_incompat)
{
int rc;
- rc = mdt_object_lock_save(info, mfirstdir, lh_firstdirp, 0,
- cos_incompat);
+ rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW,
+ cos_incompat);
if (rc)
return rc;
+ mdt_version_get_save(info, mfirstdir, 0);
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
if (mfirstdir != mseconddir) {
- rc = mdt_object_lock_save(info, mseconddir, lh_seconddirp, 1,
- cos_incompat);
- } else if (!mdt_object_remote(mseconddir) &&
- lh_firstdirp->mlh_pdo_hash !=
- lh_seconddirp->mlh_pdo_hash) {
- rc = mdt_pdir_hash_lock(info, lh_seconddirp, mseconddir,
- MDS_INODELOCK_UPDATE,
- cos_incompat);
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
+ rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
+ secondname, LCK_PW, cos_incompat);
+ } else if (!mdt_object_remote(mseconddir)) {
+ if (lh_firstdirp->mlh_pdo_hash !=
+ lh_seconddirp->mlh_pdo_hash) {
+ rc = mdt_object_pdo_lock(info, mseconddir,
+ lh_seconddirp, secondname,
+ LCK_PW, false, cos_incompat);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
+ }
}
+ mdt_version_get_save(info, mseconddir, 1);
if (rc != 0)
mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
struct mdt_object *mtgtdir = NULL;
struct mdt_object *mold;
struct mdt_object *mnew = NULL;
- struct lustre_handle rename_lh = { 0 };
+ struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
struct mdt_lock_handle *lh_srcdirp;
struct mdt_lock_handle *lh_tgtdirp;
struct mdt_lock_handle *lh_oldp = NULL;
struct lu_fid *old_fid = &info->mti_tmp_fid1;
struct lu_fid *new_fid = &info->mti_tmp_fid2;
struct lu_ucred *uc = mdt_ucred(info);
- __u64 lock_ibits;
bool reverse = false, discard = false;
bool cos_incompat;
ktime_t kstart = ktime_get();
!mdt->mdt_enable_parallel_rename_dir) ||
(!S_ISDIR(ma->ma_attr.la_mode) &&
!mdt->mdt_enable_parallel_rename_file)) {
- rc = mdt_rename_lock(info, &rename_lh);
+ rc = mdt_rename_lock(info, rename_lh);
if (rc != 0) {
CERROR("%s: cannot lock for rename: rc = %d\n",
mdt_obd_name(mdt), rc);
reverse = 0;
if (reverse)
- rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp, msrcdir,
- lh_srcdirp, cos_incompat);
+ rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
+ &rr->rr_tgt_name, msrcdir, lh_srcdirp,
+ &rr->rr_name, cos_incompat);
else
- rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, mtgtdir,
- lh_tgtdirp, cos_incompat);
+ rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
+ mtgtdir, lh_tgtdirp, &rr->rr_tgt_name,
+ cos_incompat);
if (rc != 0)
GOTO(out_unlock_rename, rc);
GOTO(out_put_new, rc = -EISDIR);
lh_oldp = &info->mti_lh[MDT_LH_OLD];
- lh_rmt = &info->mti_lh[MDT_LH_RMT];
- mdt_lock_reg_init(lh_oldp, LCK_EX);
- mdt_lock_reg_init(lh_rmt, LCK_EX);
- lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
+ lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
- lh_rmt, lock_ibits, cos_incompat);
+ lh_rmt, MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_XATTR, cos_incompat);
if (rc < 0)
GOTO(out_put_new, rc);
*/
lh_newp = &info->mti_lh[MDT_LH_NEW];
- mdt_lock_reg_init(lh_newp, LCK_EX);
- lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
- if (mdt_object_remote(mtgtdir)) {
- rc = mdt_remote_object_lock(info, mtgtdir,
- mdt_object_fid(mnew),
- &lh_newp->mlh_rreg_lh,
- lh_newp->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP,
- false);
- if (rc != ELDLM_OK)
- GOTO(out_unlock_old, rc);
-
- lock_ibits &= ~MDS_INODELOCK_LOOKUP;
- }
- rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
+ rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
+ MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE, LCK_EX,
cos_incompat);
if (rc != 0)
GOTO(out_unlock_new, rc);
GOTO(out_put_old, rc);
} else {
lh_oldp = &info->mti_lh[MDT_LH_OLD];
- lh_rmt = &info->mti_lh[MDT_LH_RMT];
- mdt_lock_reg_init(lh_oldp, LCK_EX);
- mdt_lock_reg_init(lh_rmt, LCK_EX);
- lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
+ lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
- lh_rmt, lock_ibits, cos_incompat);
+ lh_rmt, MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_XATTR, cos_incompat);
if (rc != 0)
GOTO(out_put_old, rc);
mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
out_unlock_rename:
- if (lustre_handle_is_used(&rename_lh))
- mdt_rename_unlock(&rename_lh);
+ mdt_rename_unlock(info, rename_lh);
out_put_tgtdir:
mdt_object_put(info->mti_env, mtgtdir);
out_put_srcdir:
GOTO(restriping_clear, rc);
lhp = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lhp, LCK_PW, lname);
- rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
- true);
+ rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW, true);
if (rc)
GOTO(restriping_clear, rc);
lhc = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(lhc, LCK_EX);
- if (mdt_object_remote(parent)) {
- /* enqueue object remote LOOKUP lock */
- rc = mdt_remote_object_lock(info, parent, mdt_object_fid(child),
- &lhc->mlh_rreg_lh,
- lhc->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP, false);
- if (rc != ELDLM_OK)
- GOTO(unlock_parent, rc);
- }
-
- rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
- true);
+ rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
+ MDS_INODELOCK_FULL, LCK_EX, true);
if (rc)
- GOTO(unlock_child, rc);
+ GOTO(unlock_parent, rc);
mdt_auto_split_prep(info, spec, ma, lum_stripe_count);
rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
EXIT;
-unlock_child:
- mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+ mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
unlock_parent:
mdt_object_unlock(info, parent, lhp, rc);
restriping_clear:
buf.lb_len = sizeof(*lmv);
lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_reg_init(lh, LCK_EX);
- rc = mdt_reint_object_lock(info, stripe, lh, MDS_INODELOCK_XATTR,
- false);
+ rc = mdt_object_lock(info, stripe, lh, MDS_INODELOCK_XATTR, LCK_EX,
+ false);
if (!rc)
rc = mo_xattr_set(info->mti_env, mdt_object_child(stripe), &buf,
XATTR_NAME_LMV, LU_XATTR_REPLACE);
if (IS_ERR(pobj))
GOTO(put_obj, rc = PTR_ERR(pobj));
- /* revoke object remote LOOKUP lock */
- if (mdt_object_remote(pobj)) {
- rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
- if (rc)
- GOTO(put_pobj, rc);
- }
-
/*
* lock parent if dir will be shrunk to 1 stripe, because dir will be
* converted to normal directory, as will change dir FID and update
* namespace of parent.
*/
lhp = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_reg_init(lhp, LCK_PW);
-
if (le32_to_cpu(lmu->lum_stripe_count) < 2) {
- rc = mdt_reint_object_lock(info, pobj, lhp,
- MDS_INODELOCK_UPDATE, true);
+ rc = mdt_object_lock(info, pobj, lhp, MDS_INODELOCK_UPDATE,
+ LCK_PW, true);
if (rc)
GOTO(put_pobj, rc);
}
/* lock object */
lhc = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(lhc, LCK_EX);
- rc = mdt_reint_striped_lock(info, obj, lhc, MDS_INODELOCK_FULL, einfo,
- true);
+ rc = mdt_object_stripes_lock(info, pobj, obj, lhc, einfo,
+ MDS_INODELOCK_FULL, LCK_EX, true);
if (rc)
GOTO(unlock_pobj, rc);
GOTO(unlock_obj, rc);
unlock_obj:
- mdt_reint_striped_unlock(info, obj, lhc, einfo, rc);
+ mdt_object_stripes_unlock(info, obj, lhc, einfo, rc);
unlock_pobj:
mdt_object_unlock(info, pobj, lhp, rc);
put_pobj:
lockpart |= MDS_INODELOCK_LAYOUT;
}
- /* Revoke all clients' lookup lock, since the access
- * permissions for this inode is changed when ACL_ACCESS is
- * set. This isn't needed for ACL_DEFAULT, since that does
- * not change the access permissions of this inode, nor any
- * other existing inodes. It is setting the ACLs inherited
- * by new directories/files at create time.
- */
- /* We need revoke both LOOKUP|PERM lock here, see mdt_attr_set. */
if (!strcmp(xattr_name, XATTR_NAME_ACL_ACCESS))
- lockpart |= MDS_INODELOCK_PERM | MDS_INODELOCK_LOOKUP;
+ lockpart |= MDS_INODELOCK_PERM;
/* We need to take the lock on behalf of old clients so that newer
* clients flush their xattr caches
*/
/* ACLs were sent to clients under LCK_CR locks, so taking LCK_EX
* to cancel them.
*/
- mdt_lock_reg_init(lh, LCK_EX);
- obj = mdt_object_find_lock(info, rr->rr_fid1, lh, lockpart);
+ obj = mdt_object_find_lock(info, rr->rr_fid1, lh, lockpart, LCK_EX);
if (IS_ERR(obj))
GOTO(out, rc = PTR_ERR(obj));