void *ei_cb_gl; /** lock glimpse callback */
void *ei_cbdata; /** Data to be passed into callbacks. */
void *ei_namespace; /** lock namespace **/
- unsigned int ei_enq_slave:1; /* whether enqueue slave stripes */
+ unsigned int ei_enq_slave:1, /* whether enqueue slave stripes */
+ ei_nonblock:1; /* non block enqueue */
};
#define ei_res_id ei_cb_gl
int mdt_remote_object_lock(struct mdt_thread_info *mti,
struct mdt_object *o, const struct lu_fid *fid,
struct lustre_handle *lh, ldlm_mode_t mode,
- __u64 ibits)
+ __u64 ibits, bool nonblock)
{
struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
ldlm_policy_data_t *policy = &mti->mti_policy;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_enq_slave = 0;
einfo->ei_res_id = res_id;
+ if (nonblock)
+ einfo->ei_nonblock = 1;
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = ibits;
struct mdt_lock_handle *lh, __u64 ibits,
bool nonblock)
{
+ struct mdt_lock_handle *local_lh = NULL;
int rc;
ENTRY;
/* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
MDS_INODELOCK_XATTR);
+
+ /* Only enqueue LOOKUP lock for remote object */
+ if (ibits & MDS_INODELOCK_LOOKUP) {
+ rc = mdt_object_local_lock(info, o, lh,
+ MDS_INODELOCK_LOOKUP,
+ nonblock);
+ if (rc != ELDLM_OK)
+ RETURN(rc);
+
+ local_lh = lh;
+ }
+
if (ibits & MDS_INODELOCK_UPDATE) {
/* Sigh, PDO needs to enqueue 2 locks right now, but
* enqueue RPC can only request 1 lock, to avoid extra
rc = mdt_remote_object_lock(info, o, mdt_object_fid(o),
&lh->mlh_rreg_lh,
lh->mlh_rreg_mode,
- MDS_INODELOCK_UPDATE);
- if (rc != ELDLM_OK)
- RETURN(rc);
- }
-
- /* Only enqueue LOOKUP lock for remote object */
- if (ibits & MDS_INODELOCK_LOOKUP) {
- rc = mdt_object_local_lock(info, o, lh,
- MDS_INODELOCK_LOOKUP,
- nonblock);
- if (rc != ELDLM_OK)
+ MDS_INODELOCK_UPDATE, nonblock);
+ if (rc != ELDLM_OK) {
+ if (local_lh != NULL)
+ mdt_object_unlock(info, o, local_lh, rc);
RETURN(rc);
+ }
}
RETURN(0);
int mdt_remote_object_lock(struct mdt_thread_info *mti,
struct mdt_object *o, const struct lu_fid *fid,
struct lustre_handle *lh,
- ldlm_mode_t mode, __u64 ibits);
+ ldlm_mode_t mode, __u64 ibits, bool nonblock);
enum mdt_name_flags {
MNF_FIX_ANON = 1,
rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
&child_lh->mlh_rreg_lh,
child_lh->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP);
+ MDS_INODELOCK_LOOKUP, false);
if (rc != ELDLM_OK)
GOTO(put_child, rc);
rc = mdt_remote_object_lock(info, obj,
&LUSTRE_BFL_FID, lh,
LCK_EX,
- MDS_INODELOCK_UPDATE);
+ MDS_INODELOCK_UPDATE, false);
mdt_object_put(info->mti_env, obj);
} else {
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
GOTO(out, rc = -ENOMEM);
}
+ /* Since this needs to lock all of objects in linkea, to avoid
+ * deadlocks, because it does not follow parent-child order as
+ * other MDT operation, let's use try_lock here, i.e. it will
+ * return immediately once there are conflict locks, and return
+ * EBUSY to client */
mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
- rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
- MDS_INODELOCK_UPDATE);
- if (rc != 0) {
- CERROR("%s: cannot lock "DFID": rc =%d\n",
+ rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh,
+ MDS_INODELOCK_UPDATE);
+ if (rc == 0) {
+ CDEBUG(D_ERROR, "%s: cannot lock "DFID": rc =%d\n",
mdt_obd_name(mdt), PFID(&fid), rc);
mdt_object_put(info->mti_env, mdt_pobj);
OBD_FREE_PTR(mll);
- GOTO(out, rc);
+ GOTO(out, rc = -EBUSY);
}
INIT_LIST_HEAD(&mll->mll_list);
rc = mdt_remote_object_lock(info, msrcdir, mdt_object_fid(mold),
&lh_childp->mlh_rreg_lh,
lh_childp->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP);
+ MDS_INODELOCK_LOOKUP, false);
if (rc != ELDLM_OK)
GOTO(out_unlock_list, rc);
mdt_object_fid(mnew),
&lh_tgtp->mlh_rreg_lh,
lh_tgtp->mlh_rreg_mode,
- MDS_INODELOCK_UPDATE);
+ MDS_INODELOCK_UPDATE, false);
if (rc != 0) {
lh_tgtp = NULL;
GOTO(out_put_new, rc);
mdt_object_fid(mold),
&lh_oldp->mlh_rreg_lh,
lh_oldp->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP);
+ MDS_INODELOCK_LOOKUP,
+ false);
if (rc != ELDLM_OK)
GOTO(out_put_new, rc);
mdt_object_fid(mold),
&lh_oldp->mlh_rreg_lh,
lh_oldp->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP);
+ MDS_INODELOCK_LOOKUP,
+ false);
if (rc != ELDLM_OK)
GOTO(out_put_new, rc);
if (mode > 0)
return ELDLM_OK;
+ if (einfo->ei_nonblock)
+ flags |= LDLM_FL_BLOCK_NOWAIT;
+
req = ldlm_enqueue_pack(osp->opd_exp, 0);
if (IS_ERR(req))
RETURN(PTR_ERR(req));