rep->lock_policy_res1 |= op_flag;
}
+/* assert lock is unlocked before reuse */
+static inline void mdt_lock_handle_assert(struct mdt_lock_handle *lh)
+{
+ LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
+ LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
+ LASSERT(!lustre_handle_is_used(&lh->mlh_rreg_lh));
+}
+
void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
{
+ mdt_lock_handle_assert(lh);
lh->mlh_pdo_hash = 0;
lh->mlh_reg_mode = lm;
lh->mlh_rreg_mode = lm;
void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
const struct lu_name *lname)
{
+ mdt_lock_handle_assert(lh);
lh->mlh_reg_mode = lock_mode;
lh->mlh_pdo_mode = LCK_MINMODE;
lh->mlh_rreg_mode = lock_mode;
if (layout->mlc_opc == MD_LAYOUT_WRITE)
lockpart |= MDS_INODELOCK_UPDATE;
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, LCK_EX);
- rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
+ rc = mdt_object_lock(info, obj, lhc, lockpart, LCK_EX, false);
if (rc)
RETURN(rc);
}
GOTO(put, rc = -EPROTO);
lh1 = &info->mti_lh[MDT_LH_NEW];
- mdt_lock_reg_init(lh1, LCK_EX);
lh2 = &info->mti_lh[MDT_LH_OLD];
- mdt_lock_reg_init(lh2, LCK_EX);
-
rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, LCK_EX, false);
if (rc < 0)
GOTO(put, rc);
rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, LCK_EX, false);
if (rc < 0)
GOTO(unlock1, rc);
if (rc < 0) {
RETURN(rc);
} else if (rc > 0) {
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, LCK_PR);
-
/*
* Object's name entry is on another MDS, it will
* request PERM lock only because LOOKUP lock is owned
child_bits &= ~(MDS_INODELOCK_LOOKUP |
MDS_INODELOCK_LAYOUT);
child_bits |= MDS_INODELOCK_PERM;
-
- rc = mdt_object_lock(info, child, lhc, child_bits);
+ rc = mdt_object_lock(info, child, lhc, child_bits,
+ LCK_PR, false);
if (rc < 0)
RETURN(rc);
}
/* step 1: lock parent only if parent is a directory */
if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
lhp = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lhp, LCK_PR, lname);
- rc = mdt_object_lock(info, parent, lhp,
- MDS_INODELOCK_UPDATE);
+ rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PR,
+ false);
if (unlikely(rc != 0))
RETURN(rc);
}
if (rc < 0) {
GOTO(out_child, rc);
} else if (rc > 0) {
- mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, LCK_PR);
-
if (!(child_bits & MDS_INODELOCK_UPDATE) &&
!mdt_object_remote(child)) {
struct md_attr *ma = &info->mti_attr;
/* try layout lock, it may fail to be granted due to
* contention at LOOKUP or UPDATE */
rc = mdt_object_lock_try(info, child, lhc, &child_bits,
- try_bits, false);
+ try_bits, LCK_PR, false);
if (child_bits & MDS_INODELOCK_LAYOUT)
ma_need |= MA_LOV;
} else {
/* Do not enqueue the UPDATE lock from MDT(cross-MDT),
* client will enqueue the lock to the remote MDT */
if (mdt_object_remote(child))
- child_bits &= ~MDS_INODELOCK_UPDATE;
- rc = mdt_object_lock(info, child, lhc, child_bits);
+ rc = mdt_object_lookup_lock(info, NULL, child,
+ lhc, LCK_PR, false);
+ else
+ rc = mdt_object_lock(info, child, lhc,
+ child_bits, LCK_PR, false);
}
if (unlikely(rc != 0))
GOTO(out_child, rc);
if (IS_ERR(pobj))
GOTO(out, rc = PTR_ERR(pobj));
+ if (mdt_object_remote(pobj))
+ cos_incompat = true;
+
parent_lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(parent_lh, LCK_PW, name);
- rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
+ rc = mdt_parent_lock(info, pobj, parent_lh, name, LCK_PW, cos_incompat);
if (rc != 0)
GOTO(put_parent, rc);
- if (mdt_object_remote(pobj))
- cos_incompat = true;
-
rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
name, child_fid, &info->mti_spec);
if (rc != 0)
GOTO(unlock_parent, rc = -EREMCHG);
child_lh = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(child_lh, LCK_EX);
- rc = mdt_reint_striped_lock(info, obj, child_lh,
- MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
- einfo, cos_incompat);
+ rc = mdt_object_stripes_lock(info, pobj, obj, child_lh, einfo,
+ MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE,
+ LCK_EX, cos_incompat);
if (rc != 0)
GOTO(unlock_parent, rc);
mutex_unlock(&obj->mot_lov_mutex);
unlock_child:
- mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
+ mdt_object_stripes_unlock(info, obj, child_lh, einfo, 1);
unlock_parent:
mdt_object_unlock(info, pobj, parent_lh, 1);
put_parent:
mdt_object_get(NULL, lock->l_ast_data);
}
-int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
- struct mdt_object *o, const struct lu_fid *fid,
- struct lustre_handle *lh, enum ldlm_mode mode,
- __u64 *ibits, __u64 trybits, bool cache)
+static int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
+ struct mdt_object *obj,
+ struct lustre_handle *lh,
+ enum ldlm_mode mode,
+ union ldlm_policy_data *policy,
+ struct ldlm_res_id *res_id,
+ bool cache)
{
struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo;
- union ldlm_policy_data *policy = &mti->mti_policy;
- struct ldlm_res_id *res_id = &mti->mti_res_id;
- int rc = 0;
- ENTRY;
-
- LASSERT(mdt_object_remote(o));
+ int rc;
- fid_build_reg_res_name(fid, res_id);
+ LASSERT(mdt_object_remote(obj));
memset(einfo, 0, sizeof(*einfo));
einfo->ei_type = LDLM_IBITS;
einfo->ei_enq_slave = 0;
einfo->ei_res_id = res_id;
einfo->ei_req_slot = 1;
-
if (cache) {
/*
* if we cache lock, couple lock with mdt_object, so that object
* can be easily found in lock ASTs.
*/
- einfo->ei_cbdata = o;
+ einfo->ei_cbdata = obj;
einfo->ei_cb_created = mdt_remote_object_lock_created_cb;
}
- memset(policy, 0, sizeof(*policy));
- policy->l_inodebits.bits = *ibits;
- policy->l_inodebits.try_bits = trybits;
-
- rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
+ rc = mo_object_lock(mti->mti_env, mdt_object_child(obj), lh, einfo,
policy);
-
- /* Return successfully acquired bits to a caller */
- if (rc == 0) {
- struct ldlm_lock *lock = ldlm_handle2lock(lh);
-
- LASSERT(lock);
- *ibits = lock->l_policy_data.l_inodebits.bits;
- LDLM_LOCK_PUT(lock);
+ if (rc) {
+ lh->cookie = 0ull;
+ return rc;
}
- RETURN(rc);
-}
-int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
- const struct lu_fid *fid, struct lustre_handle *lh,
- enum ldlm_mode mode, __u64 ibits, bool cache)
-{
- return mdt_remote_object_lock_try(mti, o, fid, lh, mode, &ibits, 0,
- cache);
+ /* other components like LFSCK can use lockless access
+ * and populate cache, so we better invalidate it
+ */
+ if (policy->l_inodebits.bits &
+ (MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR))
+ mo_invalidate(mti->mti_env, mdt_object_child(obj));
+
+ return 0;
}
-int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 *ibits,
- __u64 trybits, bool cos_incompat)
+/*
+ * Helper function to take PDO and hash lock.
+ *
+ * if \a pdo_lock is false, don't take PDO lock, this is case in rename.
+ */
+int mdt_object_pdo_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, const struct lu_name *name,
+ enum ldlm_mode mode, bool pdo_lock, bool cos_incompat)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
union ldlm_policy_data *policy = &info->mti_policy;
struct ldlm_res_id *res_id = &info->mti_res_id;
- __u64 dlmflags = 0, *cookie = NULL;
+ /*
+ * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it is never going to
+ * be sent to client and we do not want it slowed down due to possible
+ * cancels.
+ */
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+ __u64 *cookie = NULL;
int rc;
- ENTRY;
- LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
- LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
- LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
- LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+ LASSERT(obj);
+
+ /* check for exists after object is locked */
+ if (!mdt_object_exists(obj))
+ /* Non-existent object shouldn't have PDO lock */
+ return -ESTALE;
+
+ /* Non-dir object shouldn't have PDO lock */
+ if (!S_ISDIR(lu_object_attr(&obj->mot_obj)))
+ return -ENOTDIR;
if (cos_incompat) {
- LASSERT(lh->mlh_reg_mode == LCK_PW ||
- lh->mlh_reg_mode == LCK_EX);
+ LASSERT(mode == LCK_PW || mode == LCK_EX);
dlmflags |= LDLM_FL_COS_INCOMPAT;
} else if (mdt_cos_is_enabled(info->mti_mdt)) {
dlmflags |= LDLM_FL_COS_ENABLED;
}
- /* Only enqueue LOOKUP lock for remote object */
- LASSERT(ergo(mdt_object_remote(o), *ibits == MDS_INODELOCK_LOOKUP));
-
- /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
- if (lh->mlh_type == MDT_REG_LOCK && lh->mlh_reg_mode == LCK_EX &&
- *ibits == MDS_INODELOCK_OPEN)
- dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
-
- if (lh->mlh_type == MDT_PDO_LOCK) {
- /* check for exists after object is locked */
- if (mdt_object_exists(o) == 0) {
- /* Non-existent object shouldn't have PDO lock */
- RETURN(-ESTALE);
- } else {
- /* Non-dir object shouldn't have PDO lock */
- if (!S_ISDIR(lu_object_attr(&o->mot_obj)))
- RETURN(-ENOTDIR);
- }
- }
-
- fid_build_reg_res_name(mdt_object_fid(o), res_id);
- dlmflags |= LDLM_FL_ATOMIC_CB;
-
+ policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ policy->l_inodebits.try_bits = 0;
+ policy->l_inodebits.li_gid = 0;
+ fid_build_reg_res_name(mdt_object_fid(obj), res_id);
if (info->mti_exp)
cookie = &info->mti_exp->exp_handle.h_cookie;
- /*
- * Take PDO lock on whole directory and build correct @res_id for lock
- * on part of directory.
- */
- if (lh->mlh_pdo_hash != 0) {
- LASSERT(lh->mlh_type == MDT_PDO_LOCK);
- mdt_lock_pdo_mode(info, o, lh);
- if (lh->mlh_pdo_mode != LCK_NL) {
- /*
- * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
- * is never going to be sent to client and we do not
- * want it slowed down due to possible cancels.
- */
- policy->l_inodebits.bits =
- *ibits & MDS_INODELOCK_UPDATE;
- policy->l_inodebits.try_bits =
- trybits & MDS_INODELOCK_UPDATE;
- /* at least one of them should be set */
- LASSERT(policy->l_inodebits.bits |
- policy->l_inodebits.try_bits);
+ mdt_lock_pdo_init(lh, mode, name);
+ mdt_lock_pdo_mode(info, obj, lh);
+ if (lh->mlh_pdo_mode != LCK_NL) {
+ if (pdo_lock) {
rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh,
lh->mlh_pdo_mode, policy, res_id,
dlmflags, cookie);
- if (unlikely(rc != 0))
- GOTO(out_unlock, rc);
- }
+ if (rc) {
+ mdt_object_unlock(info, obj, lh, 1);
+ return rc;
+ }
+ }
+ res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
+ }
- /*
- * Finish res_id initializing by name hash marking part of
- * directory which is taking modification.
- */
- res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
- }
+ rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
+ policy, res_id, dlmflags, cookie);
+ if (rc)
+ mdt_object_unlock(info, obj, lh, 1);
+ else if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK) &&
+ lh->mlh_pdo_hash != 0 &&
+ (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+
+ return rc;
+}
+
+int mdt_object_lock_internal(struct mdt_thread_info *info,
+ struct mdt_object *obj, const struct lu_fid *fid,
+ struct mdt_lock_handle *lh, __u64 *ibits,
+ __u64 trybits, bool cache, bool cos_incompat)
+{
+ union ldlm_policy_data *policy = &info->mti_policy;
+ struct ldlm_res_id *res_id = &info->mti_res_id;
+ struct lustre_handle *handle;
+ int rc;
policy->l_inodebits.bits = *ibits;
policy->l_inodebits.try_bits = trybits;
policy->l_inodebits.li_gid = lh->mlh_gid;
+ fid_build_reg_res_name(fid, res_id);
- /*
- * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
- * going to be sent to client. If it is - mdt_intent_policy() path will
- * fix it up and turn FL_LOCAL flag off.
- */
- rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
- policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
- cookie);
-out_unlock:
- if (rc != 0)
- mdt_object_unlock(info, o, lh, 1);
- else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) &&
- lh->mlh_pdo_hash != 0 &&
- (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+ if (obj && mdt_object_remote(obj)) {
+ handle = &lh->mlh_rreg_lh;
+ LASSERT(!lustre_handle_is_used(handle));
+ LASSERT(lh->mlh_rreg_mode != LCK_MINMODE);
+ LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+ rc = mdt_remote_object_lock_try(info, obj, handle,
+ lh->mlh_rreg_mode, policy,
+ res_id, cache);
+ } else {
+ struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+ /*
+ * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if
+ * it is going to be sent to client. If it is -
+ * mdt_intent_policy() path will fix it up and turn FL_LOCAL
+ * flag off.
+ */
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB | LDLM_FL_LOCAL_ONLY;
+ __u64 *cookie = NULL;
+
+ handle = &lh->mlh_reg_lh;
+ LASSERT(!lustre_handle_is_used(handle));
+ LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
+ LASSERT(lh->mlh_type != MDT_NUL_LOCK);
+
+ if (cos_incompat) {
+ LASSERT(lh->mlh_reg_mode == LCK_PW ||
+ lh->mlh_reg_mode == LCK_EX);
+ dlmflags |= LDLM_FL_COS_INCOMPAT;
+ } else if (mdt_cos_is_enabled(info->mti_mdt)) {
+ dlmflags |= LDLM_FL_COS_ENABLED;
+ }
+
+ /* Lease lock are granted with LDLM_FL_CANCEL_ON_BLOCK */
+ if (lh->mlh_type == MDT_REG_LOCK &&
+ lh->mlh_reg_mode == LCK_EX && *ibits == MDS_INODELOCK_OPEN)
+ dlmflags |= LDLM_FL_CANCEL_ON_BLOCK;
+
+
+ if (info->mti_exp)
+ cookie = &info->mti_exp->exp_handle.h_cookie;
+
+ rc = mdt_fid_lock(info->mti_env, ns, handle, lh->mlh_reg_mode,
+ policy, res_id, dlmflags, cookie);
+ if (rc)
+ mdt_object_unlock(info, obj, lh, 1);
+ }
- /* Return successfully acquired bits to a caller */
if (rc == 0) {
- struct ldlm_lock *lock = ldlm_handle2lock(&lh->mlh_reg_lh);
+ struct ldlm_lock *lock;
+ /* Return successfully acquired bits to a caller */
+ lock = ldlm_handle2lock(handle);
LASSERT(lock);
*ibits = lock->l_policy_data.l_inodebits.bits;
LDLM_LOCK_PUT(lock);
}
- RETURN(rc);
+
+ return rc;
}
-static int
-mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 *ibits,
- __u64 trybits, bool cos_incompat)
+/*
+ * MDT object locking functions:
+ * mdt_object_lock(): lock object, this is used in most places, and normally
+ * lock ibits doesn't contain LOOKUP, unless the caller knows it's not
+ * remote object.
+ * mdt_object_check_lock(): lock object with LOOKUP and other ibits, it needs
+ * to check whether parent is on remote MDT, if so, take LOOKUP on parent
+ * MDT separately, and then lock other ibits on child object.
+ * mdt_parent_lock(): take parent UPDATE lock with specific mode, if parent is
+ * local, take PDO lock by name hash, otherwise take regular lock.
+ * mdt_object_stripes_lock(): lock object which should be local, and if it's a
+ * striped directory, lock its stripes, this is called in operations which
+ * modify both object and stripes.
+ * mdt_object_lock_try(): lock object with trybits, the trybits contains
+ * optional inode lock bits that can be granted. This is called by
+ * getattr/open to fetch more inode lock bits to client, and is also called
+ * by dir migration to lock link parent in non-block mode to avoid
+ * deadlock.
+ */
+
+/**
+ * lock object
+ *
+ * this is used to lock object in most places, and normally lock ibits doesn't
+ * contain LOOKUP, unless the caller knows it's not remote object.
+ *
+ * \param info struct mdt_thread_info
+ * \param obj object
+ * \param lh lock handle
+ * \param ibits MDS inode lock bits
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval 0 on success, -ev on error.
+ */
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ enum ldlm_mode mode, bool cos_incompat)
{
- struct mdt_lock_handle *local_lh = NULL;
int rc;
- ENTRY;
-
- if (!mdt_object_remote(o)) {
- rc = mdt_object_local_lock(info, o, lh, ibits, trybits,
- cos_incompat);
- RETURN(rc);
- }
- /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
- *ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ ENTRY;
+ mdt_lock_reg_init(lh, mode);
+ rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh,
+ &ibits, 0, false, cos_incompat);
+ RETURN(rc);
+}
- /* Only enqueue LOOKUP lock for remote object */
- if (*ibits & MDS_INODELOCK_LOOKUP) {
- __u64 local = MDS_INODELOCK_LOOKUP;
+/**
+ * lock object with LOOKUP and other ibits
+ *
+ * it will check whether parent and child are on different MDTs, if so, take
+ * LOOKUP lock on parent MDT, and lock other ibits on child MDT, otherwise lock
+ * all ibits on child MDT. Note, parent and child shouldn't be both on remote
+ * MDTs, in which case specific lock function should be used, and it's in
+ * rename and migrate only.
+ *
+ * \param info struct mdt_thread_info
+ * \param parent parent object
+ * \param child child object
+ * \param lh lock handle
+ * \param ibits MDS inode lock bits
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval 0 on success, -ev on error.
+ */
+int mdt_object_check_lock(struct mdt_thread_info *info,
+ struct mdt_object *parent, struct mdt_object *child,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ enum ldlm_mode mode, bool cos_incompat)
+{
+ int rc;
- rc = mdt_object_local_lock(info, o, lh, &local, 0,
- cos_incompat);
- if (rc != ELDLM_OK)
+ ENTRY;
+ /* if LOOKUP ibit is not set, use mdt_object_lock() */
+ LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+ /* if only LOOKUP ibit is needed, use mdt_object_lookup_lock() */
+ LASSERT(ibits != MDS_INODELOCK_LOOKUP);
+ LASSERT(parent);
+ /* @parent and @child shouldn't both be on remote MDTs */
+ LASSERT(!(mdt_object_remote(parent) && mdt_object_remote(child)));
+
+ mdt_lock_reg_init(lh, mode);
+ if (mdt_object_remote(parent) ^ mdt_object_remote(child)) {
+ __u64 lookup_ibits = MDS_INODELOCK_LOOKUP;
+
+ rc = mdt_object_lock_internal(info, parent,
+ mdt_object_fid(child), lh,
+ &lookup_ibits, 0, false,
+ cos_incompat);
+ if (rc)
RETURN(rc);
- local_lh = lh;
- }
-
- if ((*ibits | trybits) & MDS_INODELOCK_UPDATE) {
- /* Sigh, PDO needs to enqueue 2 locks right now, but
- * enqueue RPC can only request 1 lock, to avoid extra
- * RPC, so it will instead enqueue EX lock for remote
- * object anyway XXX*/
- if (lh->mlh_type == MDT_PDO_LOCK &&
- lh->mlh_pdo_hash != 0) {
- CDEBUG(D_INFO,
- "%s: "DFID" convert PDO lock to EX lock.\n",
- mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(o)));
- lh->mlh_pdo_hash = 0;
- lh->mlh_rreg_mode = LCK_EX;
- lh->mlh_type = MDT_REG_LOCK;
- }
-
- rc = mdt_remote_object_lock_try(info, o, mdt_object_fid(o),
- &lh->mlh_rreg_lh,
- lh->mlh_rreg_mode,
- ibits, trybits, false);
- if (rc != ELDLM_OK) {
- if (local_lh != NULL)
- mdt_object_unlock(info, o, local_lh, rc);
- RETURN(rc);
- }
+ ibits &= ~MDS_INODELOCK_LOOKUP;
}
- /* other components like LFSCK can use lockless access
- * and populate cache, so we better invalidate it */
- mo_invalidate(info->mti_env, mdt_object_child(o));
+ rc = mdt_object_lock_internal(info, child, mdt_object_fid(child), lh,
+ &ibits, 0, false, cos_incompat);
+ if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
+ mdt_object_unlock(info, NULL, lh, 1);
- RETURN(0);
+ RETURN(rc);
}
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits)
+/**
+ * take parent UPDATE lock
+ *
+ * if parent is local, take PDO lock by name hash, otherwise take regular lock.
+ *
+ * \param info struct mdt_thread_info
+ * \param obj parent object
+ * \param lh lock handle
+ * \param lname child name
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval 0 on success, -ev on error.
+ */
+int mdt_parent_lock(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, const struct lu_name *lname,
+ enum ldlm_mode mode, bool cos_incompat)
{
- return mdt_object_lock_internal(info, o, lh, &ibits, 0, false);
-}
+ int rc;
-int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits,
- bool cos_incompat)
-{
- LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX);
- return mdt_object_lock_internal(info, o, lh, &ibits, 0,
- cos_incompat);
+ ENTRY;
+ LASSERT(obj && lname);
+ if (mdt_object_remote(obj)) {
+ __u64 ibits = MDS_INODELOCK_UPDATE;
+
+ mdt_lock_reg_init(lh, mode);
+ rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj),
+ lh, &ibits, 0, false,
+ cos_incompat);
+ } else {
+ rc = mdt_object_pdo_lock(info, obj, lh, lname, mode, true,
+ cos_incompat);
+ }
+ RETURN(rc);
}
-int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+/**
+ * lock object with trybits
+ *
+ * the trybits contains optional inode lock bits that can be granted. This is
+ * called by getattr/open to fetch more inode lock bits to client, and is also
+ * called by dir migration to lock link parent in non-block mode to avoid
+ * deadlock.
+ *
+ * \param info struct mdt_thread_info
+ * \param obj object
+ * \param lh lock handle
+ * \param ibits MDS inode lock bits
+ * \param trybits optional inode lock bits
+ * \param mode lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval 0 on success, -ev on error.
+ */
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *obj,
struct mdt_lock_handle *lh, __u64 *ibits,
- __u64 trybits, bool cos_incompat)
+ __u64 trybits, enum ldlm_mode mode, bool cos_incompat)
{
bool trylock_only = *ibits == 0;
int rc;
+ ENTRY;
LASSERT(!(*ibits & trybits));
- rc = mdt_object_lock_internal(info, o, lh, ibits, trybits,
- cos_incompat);
+ mdt_lock_reg_init(lh, mode);
+ rc = mdt_object_lock_internal(info, obj, mdt_object_fid(obj), lh, ibits,
+ trybits, false, cos_incompat);
if (rc && trylock_only) { /* clear error for try ibits lock only */
LASSERT(*ibits == 0);
rc = 0;
}
- return rc;
+ RETURN(rc);
+}
+
+/*
+ * Helper function to take \a obj LOOKUP lock.
+ *
+ * Both \a pobj and \a obj may be located on remote MDTs.
+ */
+int mdt_object_lookup_lock(struct mdt_thread_info *info,
+ struct mdt_object *pobj, struct mdt_object *obj,
+ struct mdt_lock_handle *lh, enum ldlm_mode mode,
+ bool cos_incompat)
+{
+ __u64 ibits = MDS_INODELOCK_LOOKUP;
+ int rc;
+
+ ENTRY;
+ /* if @parent is NULL, it's on local MDT, and @child is remote,
+ * this is case in getattr/unlink/open by name.
+ */
+ LASSERT(ergo(!pobj, mdt_object_remote(obj)));
+ mdt_lock_reg_init(lh, mode);
+ rc = mdt_object_lock_internal(info, pobj, mdt_object_fid(obj), lh,
+ &ibits, 0, false, cos_incompat);
+ RETURN(rc);
}
/**
* \param mode lock mode
* \param decref force immediate lock releasing
*/
-void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
- enum ldlm_mode mode, int decref)
+static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
+ enum ldlm_mode mode, int decref)
{
struct tgt_session_info *tsi = info->mti_env->le_ses ?
tgt_ses_info(info->mti_env) : NULL;
}
struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
- const struct lu_fid *f,
- struct mdt_lock_handle *lh,
- __u64 ibits)
+ const struct lu_fid *f,
+ struct mdt_lock_handle *lh,
+ __u64 ibits, enum ldlm_mode mode)
{
- struct mdt_object *o;
+ struct mdt_object *o;
- o = mdt_object_find(info->mti_env, info->mti_mdt, f);
- if (!IS_ERR(o)) {
- int rc;
+ o = mdt_object_find(info->mti_env, info->mti_mdt, f);
+ if (!IS_ERR(o)) {
+ int rc;
- rc = mdt_object_lock(info, o, lh, ibits);
- if (rc != 0) {
- mdt_object_put(info->mti_env, o);
- o = ERR_PTR(rc);
- }
- }
- return o;
+ rc = mdt_object_lock(info, o, lh, ibits, mode, false);
+ if (rc != 0) {
+ mdt_object_put(info->mti_env, o);
+ o = ERR_PTR(rc);
+ }
+ }
+ return o;
}
void mdt_object_unlock_put(struct mdt_thread_info * info,
- struct mdt_object * o,
- struct mdt_lock_handle *lh,
- int decref)
+ struct mdt_object *o,
+ struct mdt_lock_handle *lh,
+ int decref)
{
- mdt_object_unlock(info, o, lh, decref);
- mdt_object_put(info->mti_env, o);
+ mdt_object_unlock(info, o, lh, decref);
+ mdt_object_put(info->mti_env, o);
}
/*
RETURN(rc);
}
-void mdt_lock_handle_init(struct mdt_lock_handle *lh)
-{
- lh->mlh_type = MDT_NUL_LOCK;
- lh->mlh_reg_lh.cookie = 0ull;
- lh->mlh_reg_mode = LCK_MINMODE;
- lh->mlh_pdo_lh.cookie = 0ull;
- lh->mlh_pdo_mode = LCK_MINMODE;
- lh->mlh_rreg_lh.cookie = 0ull;
- lh->mlh_rreg_mode = LCK_MINMODE;
-}
-
-void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
-{
- LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
- LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
-}
-
void mdt_thread_info_reset(struct mdt_thread_info *info)
{
memset(&info->mti_attr, 0, sizeof(info->mti_attr));
void mdt_thread_info_init(struct ptlrpc_request *req,
struct mdt_thread_info *info)
{
- int i;
-
info->mti_pill = &req->rq_pill;
- /* lock handle */
- for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
- mdt_lock_handle_init(&info->mti_lh[i]);
-
/* mdt device: it can be NULL while CONNECT */
if (req->rq_export) {
info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
}
for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
- mdt_lock_handle_fini(&info->mti_lh[i]);
+ mdt_lock_handle_assert(&info->mti_lh[i]);
info->mti_env = NULL;
info->mti_pill = NULL;
info->mti_exp = NULL;
*/
mdt_intent_fixup_resent(info, *lockp, lhc, flags);
if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
- mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
rc = mdt_object_lock(info, info->mti_object, lhc,
- MDS_INODELOCK_XATTR);
+ MDS_INODELOCK_XATTR, (*lockp)->l_req_mode,
+ false);
if (rc)
return rc;
}
CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid));
- lh = &mti->mti_lh[MDT_LH_PARENT];
- mdt_lock_reg_init(lh, LCK_CR);
-
- obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE);
- if (IS_ERR(obj))
- RETURN(PTR_ERR(obj));
+ lh = &mti->mti_lh[MDT_LH_PARENT];
+ obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE, LCK_CR);
+ if (IS_ERR(obj))
+ RETURN(PTR_ERR(obj));
if (mdt_object_remote(obj)) {
rc = -EREMOTE;