__u64 child_bits,
struct ldlm_reply *ldlm_rep)
{
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_body *reqbody = NULL;
- struct mdt_object *parent = info->mti_object;
- struct mdt_object *child;
- struct lu_fid *child_fid = &info->mti_tmp_fid1;
- struct lu_name *lname = NULL;
- struct mdt_lock_handle *lhp = NULL;
- struct ldlm_lock *lock;
- bool is_resent;
- bool try_layout;
- int ma_need = 0;
- int rc;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_body *reqbody = NULL;
+ struct mdt_object *parent = info->mti_object;
+ struct mdt_object *child;
+ struct lu_fid *child_fid = &info->mti_tmp_fid1;
+ struct lu_name *lname = NULL;
+ struct mdt_lock_handle *lhp = NULL;
+ struct ldlm_lock *lock;
+ __u64 try_bits = 0;
+ bool is_resent;
+ int ma_need = 0;
+ int rc;
+
ENTRY;
is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
if (rc < 0) {
GOTO(out_child, rc);
} else if (rc > 0) {
- mdt_lock_handle_init(lhc);
+ mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PR);
- try_layout = false;
if (!(child_bits & MDS_INODELOCK_UPDATE) &&
mdt_object_exists(child) && !mdt_object_remote(child)) {
- struct md_attr *ma = &info->mti_attr;
+ struct md_attr *ma = &info->mti_attr;
- ma->ma_valid = 0;
- ma->ma_need = MA_INODE;
+ ma->ma_valid = 0;
+ ma->ma_need = MA_INODE;
rc = mdt_attr_get_complex(info, child, ma);
- if (unlikely(rc != 0))
- GOTO(out_child, rc);
+ if (unlikely(rc != 0))
+ GOTO(out_child, rc);
/* If the file has not been changed for some time, we
* return not only a LOOKUP lock, but also an UPDATE
S_ISREG(lu_object_attr(&child->mot_obj)) &&
!mdt_object_remote(child) && ldlm_rep != NULL) {
/* try to grant layout lock for regular file. */
- try_layout = true;
+ try_bits = MDS_INODELOCK_LAYOUT;
}
- rc = 0;
- if (try_layout) {
- child_bits |= MDS_INODELOCK_LAYOUT;
+ if (try_bits != 0) {
/* try layout lock, it may fail to be granted due to
* contention at LOOKUP or UPDATE */
- if (!mdt_object_lock_try(info, child, lhc,
- child_bits)) {
- child_bits &= ~MDS_INODELOCK_LAYOUT;
- LASSERT(child_bits != 0);
- rc = mdt_object_lock(info, child, lhc,
- child_bits);
- } else {
+ rc = mdt_object_lock_try(info, child, lhc, &child_bits,
+ try_bits, false);
+ if (child_bits & MDS_INODELOCK_LAYOUT)
ma_need |= MA_LOV;
- }
} else {
/* Do not enqueue the UPDATE lock from MDT(cross-MDT),
* client will enqueue the lock to the remote MDT */
return 1;
}
-int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
- const struct lu_fid *fid, struct lustre_handle *lh,
- enum ldlm_mode mode, __u64 ibits, bool nonblock,
- bool cache)
+int mdt_remote_object_lock_try(struct mdt_thread_info *mti,
+ struct mdt_object *o, const struct lu_fid *fid,
+ struct lustre_handle *lh, enum ldlm_mode mode,
+ __u64 *ibits, __u64 trybits, bool cache)
{
struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
union ldlm_policy_data *policy = &mti->mti_policy;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_enq_slave = 0;
einfo->ei_res_id = res_id;
- if (nonblock)
- einfo->ei_nonblock = 1;
+
if (cache) {
/*
* if we cache lock, couple lock with mdt_object, so that object
einfo->ei_cbdata = o;
}
+
memset(policy, 0, sizeof(*policy));
- policy->l_inodebits.bits = ibits;
+ policy->l_inodebits.bits = *ibits;
+ policy->l_inodebits.try_bits = trybits;
rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
policy);
mdt_object_put(mti->mti_env, o);
einfo->ei_cbdata = NULL;
}
+
+ /* Return successfully acquired bits to a caller */
+ if (rc == 0) {
+ struct ldlm_lock *lock = ldlm_handle2lock(lh);
+
+ LASSERT(lock);
+ *ibits = lock->l_policy_data.l_inodebits.bits;
+ LDLM_LOCK_PUT(lock);
+ }
RETURN(rc);
}
+int mdt_remote_object_lock(struct mdt_thread_info *mti, struct mdt_object *o,
+ const struct lu_fid *fid, struct lustre_handle *lh,
+ enum ldlm_mode mode, __u64 ibits, bool cache)
+{
+ return mdt_remote_object_lock_try(mti, o, fid, lh, mode, &ibits, 0,
+ cache);
+}
+
static int mdt_object_local_lock(struct mdt_thread_info *info,
struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits,
- bool nonblock, bool cos_incompat)
+ struct mdt_lock_handle *lh, __u64 *ibits,
+ __u64 trybits, bool cos_incompat)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
union ldlm_policy_data *policy = &info->mti_policy;
}
/* Only enqueue LOOKUP lock for remote object */
- if (mdt_object_remote(o))
- LASSERT(ibits == MDS_INODELOCK_LOOKUP);
+ if (mdt_object_remote(o)) {
+ LASSERT(*ibits == MDS_INODELOCK_LOOKUP);
+ }
if (lh->mlh_type == MDT_PDO_LOCK) {
/* check for exists after object is locked */
/* Non-dir object shouldn't have PDO lock */
if (!S_ISDIR(lu_object_attr(&o->mot_obj)))
RETURN(-ENOTDIR);
- }
- }
+ }
+ }
- memset(policy, 0, sizeof(*policy));
- fid_build_reg_res_name(mdt_object_fid(o), res_id);
+ fid_build_reg_res_name(mdt_object_fid(o), res_id);
dlmflags |= LDLM_FL_ATOMIC_CB;
- if (nonblock)
- dlmflags |= LDLM_FL_BLOCK_NOWAIT;
- /*
- * Take PDO lock on whole directory and build correct @res_id for lock
- * on part of directory.
- */
- if (lh->mlh_pdo_hash != 0) {
- LASSERT(lh->mlh_type == MDT_PDO_LOCK);
- mdt_lock_pdo_mode(info, o, lh);
- if (lh->mlh_pdo_mode != LCK_NL) {
- /*
- * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
- * is never going to be sent to client and we do not
- * want it slowed down due to possible cancels.
- */
- policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ /*
+ * Take PDO lock on whole directory and build correct @res_id for lock
+ * on part of directory.
+ */
+ if (lh->mlh_pdo_hash != 0) {
+ LASSERT(lh->mlh_type == MDT_PDO_LOCK);
+ mdt_lock_pdo_mode(info, o, lh);
+ if (lh->mlh_pdo_mode != LCK_NL) {
+ /*
+ * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
+ * is never going to be sent to client and we do not
+ * want it slowed down due to possible cancels.
+ */
+ policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ policy->l_inodebits.try_bits = 0;
rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
policy, res_id, dlmflags,
info->mti_exp == NULL ? NULL :
res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
}
- policy->l_inodebits.bits = ibits;
+ policy->l_inodebits.bits = *ibits;
+ policy->l_inodebits.try_bits = trybits;
/*
* Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
+ /* Return successfully acquired bits to a caller */
+ if (rc == 0) {
+ struct ldlm_lock *lock = ldlm_handle2lock(&lh->mlh_reg_lh);
+
+ LASSERT(lock);
+ *ibits = lock->l_policy_data.l_inodebits.bits;
+ LDLM_LOCK_PUT(lock);
+ }
RETURN(rc);
}
static int
mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits, bool nonblock,
- bool cos_incompat)
+ struct mdt_lock_handle *lh, __u64 *ibits,
+ __u64 trybits, bool cos_incompat)
{
struct mdt_lock_handle *local_lh = NULL;
int rc;
ENTRY;
if (!mdt_object_remote(o)) {
- rc = mdt_object_local_lock(info, o, lh, ibits, nonblock,
+ rc = mdt_object_local_lock(info, o, lh, ibits, trybits,
cos_incompat);
RETURN(rc);
}
/* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
- ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
- MDS_INODELOCK_XATTR);
+ *ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
+ MDS_INODELOCK_XATTR);
/* Only enqueue LOOKUP lock for remote object */
- if (ibits & MDS_INODELOCK_LOOKUP) {
- rc = mdt_object_local_lock(info, o, lh, MDS_INODELOCK_LOOKUP,
- nonblock, cos_incompat);
+ if (*ibits & MDS_INODELOCK_LOOKUP) {
+ __u64 local = MDS_INODELOCK_LOOKUP;
+
+ rc = mdt_object_local_lock(info, o, lh, &local, 0,
+ cos_incompat);
if (rc != ELDLM_OK)
RETURN(rc);
local_lh = lh;
}
- if (ibits & MDS_INODELOCK_UPDATE) {
+ if ((*ibits | trybits) & MDS_INODELOCK_UPDATE) {
/* Sigh, PDO needs to enqueue 2 locks right now, but
* enqueue RPC can only request 1 lock, to avoid extra
* RPC, so it will instead enqueue EX lock for remote
lh->mlh_rreg_mode = LCK_EX;
lh->mlh_type = MDT_REG_LOCK;
}
- rc = mdt_remote_object_lock(info, o, mdt_object_fid(o),
- &lh->mlh_rreg_lh,
- lh->mlh_rreg_mode,
- MDS_INODELOCK_UPDATE, nonblock,
- false);
+
+ rc = mdt_remote_object_lock_try(info, o, mdt_object_fid(o),
+ &lh->mlh_rreg_lh,
+ lh->mlh_rreg_mode,
+ ibits, trybits, false);
if (rc != ELDLM_OK) {
if (local_lh != NULL)
mdt_object_unlock(info, o, local_lh, rc);
int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle *lh, __u64 ibits)
{
- return mdt_object_lock_internal(info, o, lh, ibits, false, false);
+ return mdt_object_lock_internal(info, o, lh, &ibits, 0, false);
}
int mdt_reint_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
bool cos_incompat)
{
LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX);
- return mdt_object_lock_internal(info, o, lh, ibits, false,
+ return mdt_object_lock_internal(info, o, lh, &ibits, 0,
cos_incompat);
}
int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits)
+ struct mdt_lock_handle *lh, __u64 *ibits,
+ __u64 trybits, bool cos_incompat)
{
- struct mdt_lock_handle tmp = *lh;
- int rc;
-
- rc = mdt_object_lock_internal(info, o, &tmp, ibits, true, false);
- if (rc == 0)
- *lh = tmp;
-
- return rc == 0;
-}
-
-int mdt_reint_object_lock_try(struct mdt_thread_info *info,
- struct mdt_object *o, struct mdt_lock_handle *lh,
- __u64 ibits, bool cos_incompat)
-{
- struct mdt_lock_handle tmp = *lh;
- int rc;
-
- LASSERT(lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX);
- rc = mdt_object_lock_internal(info, o, &tmp, ibits, true, cos_incompat);
- if (rc == 0)
- *lh = tmp;
-
- return rc == 0;
+ return mdt_object_lock_internal(info, o, lh, ibits, trybits,
+ cos_incompat);
}
/**