ma_need |= MA_LOV;
}
} else {
+ /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
+ * client will enqueue the lock to the remote MDT */
+ if (mdt_object_remote(child))
+ child_bits &= ~MDS_INODELOCK_UPDATE;
rc = mdt_object_lock(info, child, lhc, child_bits,
MDT_CROSS_LOCK);
}
}
int mdt_remote_object_lock(struct mdt_thread_info *mti,
- struct mdt_object *o, struct lustre_handle *lh,
- ldlm_mode_t mode, __u64 ibits)
+ struct mdt_object *o, const struct lu_fid *fid,
+ struct lustre_handle *lh, ldlm_mode_t mode,
+ __u64 ibits)
{
struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
ldlm_policy_data_t *policy = &mti->mti_policy;
+ struct ldlm_res_id *res_id = &mti->mti_res_id;
int rc = 0;
ENTRY;
LASSERT(mdt_object_remote(o));
- LASSERT(ibits & MDS_INODELOCK_UPDATE);
+ LASSERT(ibits == MDS_INODELOCK_UPDATE);
+
+ fid_build_reg_res_name(fid, res_id);
memset(einfo, 0, sizeof(*einfo));
einfo->ei_type = LDLM_IBITS;
einfo->ei_cb_bl = mdt_remote_blocking_ast;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_enq_slave = 0;
+ einfo->ei_res_id = res_id;
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = ibits;
RETURN(rc);
}
-static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 ibits,
- bool nonblock, int locality)
+static int mdt_object_local_lock(struct mdt_thread_info *info,
+ struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ bool nonblock, int locality)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
ldlm_policy_data_t *policy = &info->mti_policy;
LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
LASSERT(lh->mlh_type != MDT_NUL_LOCK);
- if (mdt_object_remote(o)) {
- if (locality == MDT_CROSS_LOCK) {
- ibits &= ~(MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM |
- MDS_INODELOCK_LAYOUT);
- ibits |= MDS_INODELOCK_LOOKUP;
- } else {
- LASSERTF(!(ibits &
- (MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM |
- MDS_INODELOCK_LAYOUT)),
- "%s: wrong bit "LPX64" for remote obj "DFID"\n",
- mdt_obd_name(info->mti_mdt), ibits,
- PFID(mdt_object_fid(o)));
- LASSERT(ibits & MDS_INODELOCK_LOOKUP);
- }
- /* No PDO lock on remote object */
- LASSERT(lh->mlh_type != MDT_PDO_LOCK);
- }
+ /* Only enqueue LOOKUP lock for remote object */
+ if (mdt_object_remote(o))
+ LASSERT(ibits == MDS_INODELOCK_LOOKUP);
if (lh->mlh_type == MDT_PDO_LOCK) {
/* check for exists after object is locked */
RETURN(rc);
}
+int mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 ibits,
+ bool nonblock, int locality)
+{
+ int rc;
+ ENTRY;
+
+ if (!mdt_object_remote(o))
+ return mdt_object_local_lock(info, o, lh, ibits, nonblock,
+ locality);
+
+ if (locality == MDT_LOCAL_LOCK) {
+ CERROR("%s: try to get local lock for remote object"
+ DFID".\n", mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(o)));
+ RETURN(-EPROTO);
+ }
+
+ /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
+ ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
+ MDS_INODELOCK_XATTR);
+ if (ibits & MDS_INODELOCK_UPDATE) {
+ /* Sigh, PDO needs to enqueue 2 locks right now, but
+ * enqueue RPC can only request 1 lock, to avoid extra
+ * RPC, so it will instead enqueue EX lock for remote
+ * object anyway XXX*/
+ if (lh->mlh_type == MDT_PDO_LOCK &&
+ lh->mlh_pdo_hash != 0) {
+ CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to"
+ "EX lock.\n", mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(o)));
+ lh->mlh_pdo_hash = 0;
+ lh->mlh_rreg_mode = LCK_EX;
+ lh->mlh_type = MDT_REG_LOCK;
+ }
+ rc = mdt_remote_object_lock(info, o, mdt_object_fid(o),
+ &lh->mlh_rreg_lh,
+ lh->mlh_rreg_mode,
+ MDS_INODELOCK_UPDATE);
+ if (rc != ELDLM_OK)
+ RETURN(rc);
+ }
+
+ /* Only enqueue LOOKUP lock for remote object */
+ if (ibits & MDS_INODELOCK_LOOKUP) {
+ rc = mdt_object_local_lock(info, o, lh,
+ MDS_INODELOCK_LOOKUP,
+ nonblock, locality);
+ if (rc != ELDLM_OK)
+ RETURN(rc);
+ }
+
+ RETURN(0);
+}
+
int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle *lh, __u64 ibits, int locality)
{
- return mdt_object_lock0(info, o, lh, ibits, false, locality);
+ return mdt_object_lock_internal(info, o, lh, ibits, false, locality);
}
int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle tmp = *lh;
int rc;
- rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality);
+ rc = mdt_object_lock_internal(info, o, &tmp, ibits, true, locality);
if (rc == 0)
*lh = tmp;
GOTO(put_parent, rc = -ENOENT);
lh = &info->mti_lh[MDT_LH_PARENT];
- if (mdt_object_remote(parent)) {
- mdt_lock_reg_init(lh, LCK_EX);
- rc = mdt_remote_object_lock(info, parent, &lh->mlh_rreg_lh,
- lh->mlh_rreg_mode,
- MDS_INODELOCK_UPDATE);
- if (rc != ELDLM_OK)
- GOTO(put_parent, rc);
-
- } else {
- mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
- rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE,
- MDT_LOCAL_LOCK);
- if (rc)
- GOTO(put_parent, rc);
+ mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
+ rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE,
+ MDT_CROSS_LOCK);
+ if (rc)
+ GOTO(put_parent, rc);
+ if (!mdt_object_remote(parent)) {
rc = mdt_version_get_check_save(info, parent, 0);
if (rc)
GOTO(unlock_parent, rc);
}
parent_lh = &info->mti_lh[MDT_LH_PARENT];
+ mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
+ rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
+ MDT_CROSS_LOCK);
+ if (rc != 0)
+ GOTO(put_parent, rc);
- if (mdt_object_remote(mp)) {
- mdt_lock_reg_init(parent_lh, LCK_EX);
- rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
- parent_lh->mlh_rreg_mode,
- MDS_INODELOCK_UPDATE);
- if (rc != ELDLM_OK)
- GOTO(put_parent, rc);
-
- } else {
- mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
- rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
- MDT_LOCAL_LOCK);
- if (rc)
- GOTO(put_parent, rc);
-
+ if (!mdt_object_remote(mp)) {
rc = mdt_version_get_check_save(info, mp, 0);
if (rc)
GOTO(unlock_parent, rc);
}
rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE |
- MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
+ MDS_INODELOCK_XATTR, MDT_LOCAL_LOCK);
if (rc != 0) {
mdt_object_put(info->mti_env, ms);
GOTO(out_unlock_parent, rc);
}
static int mdt_rename_lock(struct mdt_thread_info *info,
- struct lustre_handle *lh)
+ struct lustre_handle *lh, bool rename)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
+ __u64 flags = 0;
+ int rc;
ldlm_policy_data_t *policy = &info->mti_policy;
struct ldlm_res_id *res_id = &info->mti_res_id;
- __u64 flags = 0;
- int rc;
ENTRY;
- fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
+ /* XXX only do global rename lock for migration */
+ if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0 && !rename) {
+ struct lu_fid *fid = &info->mti_tmp_fid1;
+ struct mdt_object *obj;
+
+ /* XXX, right now, it has to use object API to
+ * enqueue lock cross MDT, so it will enqueue
+ * rename lock(with LUSTRE_BFL_FID) by root object */
+ lu_root_fid(fid);
+ obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+ if (IS_ERR(obj))
+ RETURN(PTR_ERR(obj));
+
+ LASSERT(mdt_object_remote(obj));
+ rc = mdt_remote_object_lock(info, obj,
+ &LUSTRE_BFL_FID, lh,
+ LCK_EX,
+ MDS_INODELOCK_UPDATE);
+ mdt_object_put(info->mti_env, obj);
+ } else {
+ fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
+ memset(policy, 0, sizeof *policy);
+ policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
+ rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
+ LCK_EX, &flags, ldlm_blocking_ast,
+ ldlm_completion_ast, NULL, NULL, 0,
+ LVB_T_NONE,
+ &info->mti_exp->exp_handle.h_cookie,
+ lh);
+ }
- memset(policy, 0, sizeof *policy);
- policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 6, 53, 0)
- /* In phase I, we will not do cross-rename, so local BFL lock would
- * be enough
- */
- flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
- /*
- * Current node is controller, that is mdt0, where we should
- * take BFL lock.
- */
- rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
- LCK_EX, &flags, ldlm_blocking_ast,
- ldlm_completion_ast, NULL, NULL, 0,
- LVB_T_NONE,
- &info->mti_exp->exp_handle.h_cookie,
- lh);
-#else
-#warning "Local rename lock is invalid for DNE phase II."
-#endif
- RETURN(rc);
+ RETURN(rc);
}
static void mdt_rename_unlock(struct lustre_handle *lh)
{
- ENTRY;
- LASSERT(lustre_handle_is_used(lh));
- ldlm_lock_decref(lh, LCK_EX);
- EXIT;
+ ENTRY;
+ LASSERT(lustre_handle_is_used(lh));
+ /* Cancel the single rename lock right away */
+ ldlm_lock_decref_and_cancel(lh, LCK_EX);
+ EXIT;
}
/*
GOTO(out, rc = -ENOMEM);
}
- if (mdt_object_remote(mdt_pobj)) {
- mdt_lock_reg_init(&mll->mll_lh, LCK_EX);
- rc = mdt_remote_object_lock(info, mdt_pobj,
- &mll->mll_lh.mlh_rreg_lh,
- mll->mll_lh.mlh_rreg_mode,
- MDS_INODELOCK_UPDATE);
- } else {
- mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
- rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
- MDS_INODELOCK_UPDATE,
- MDT_LOCAL_LOCK);
- }
+ mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
+ rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
+ MDS_INODELOCK_UPDATE,
+ MDT_CROSS_LOCK);
if (rc != 0) {
CERROR("%s: cannot lock "DFID": rc =%d\n",
mdt_obd_name(mdt), PFID(&fid), rc);
}
lh_dirp = &info->mti_lh[MDT_LH_PARENT];
- if (mdt_object_remote(msrcdir)) {
- mdt_lock_reg_init(lh_dirp, LCK_EX);
- rc = mdt_remote_object_lock(info, msrcdir,
- &lh_dirp->mlh_rreg_lh,
- lh_dirp->mlh_rreg_mode,
- MDS_INODELOCK_UPDATE);
- if (rc != ELDLM_OK)
- GOTO(out_put_parent, rc);
- } else {
- mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
- rc = mdt_object_lock(info, msrcdir, lh_dirp,
- MDS_INODELOCK_UPDATE,
- MDT_LOCAL_LOCK);
- if (rc)
- GOTO(out_put_parent, rc);
+ mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
+ rc = mdt_object_lock(info, msrcdir, lh_dirp,
+ MDS_INODELOCK_UPDATE,
+ MDT_CROSS_LOCK);
+ if (rc)
+ GOTO(out_put_parent, rc);
+ if (!mdt_object_remote(msrcdir)) {
rc = mdt_version_get_check_save(info, msrcdir, 0);
if (rc)
GOTO(out_unlock_parent, rc);
lh_tgtp = &info->mti_lh[MDT_LH_CHILD];
mdt_lock_reg_init(lh_tgtp, LCK_EX);
rc = mdt_remote_object_lock(info, mnew,
+ mdt_object_fid(mnew),
&lh_tgtp->mlh_rreg_lh,
lh_tgtp->mlh_rreg_mode,
MDS_INODELOCK_UPDATE);
rc = mdt_object_lock(info, mnew, lh_newp,
MDS_INODELOCK_LOOKUP |
MDS_INODELOCK_UPDATE,
- MDT_CROSS_LOCK);
+ MDT_LOCAL_LOCK);
if (rc != 0)
GOTO(out_unlock_old, rc);
fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
RETURN(-EPERM);
- rc = mdt_rename_lock(info, &rename_lh);
+ rc = mdt_rename_lock(info, &rename_lh, rename);
if (rc != 0) {
CERROR("%s: can't lock FS for rename: rc = %d\n",
mdt_obd_name(info->mti_mdt), rc);