/* MDT supports FMD for regular files due to Data-on-MDT */
if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
- ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
+ ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
req->rq_xid);
+ if (ma->ma_attr.la_valid & LA_MTIME) {
+ rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
+ if (!rc)
+ ma->ma_valid |= MA_PFID;
+ }
+ }
+
rc = mdt_attr_set(info, mo, ma);
if (rc)
GOTO(out_put, rc);
EXIT;
out:
- if (rc)
+ if (rc) {
mdt_unlock_list(info, link_locks, rc);
- else if (local_lnkp_cnt > RS_MAX_LOCKS - 6)
+ } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
+ CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
+ local_lnkp_cnt);
/*
* parent may have 3 local objects: master object and 2 stripes
- * (if it's being migrated too); source may have 2 local
- * objects: master and 1 stripe; target has 1 local object.
+ * (if it's being migrated too); source may have 1 local objects
+ * as regular file; target has 1 local object.
+ * Note, source may have 2 local locks if it is directory but it
+ * can't have hardlinks, so it is not considered here.
*/
rc = 1;
+ }
return rc;
}
}
/*
+ * lock rename source object.
+ *
+ * Both source and source parent may be remote, and source may be a remote
+ * object on source parent, to avoid overriding lock handle, store remote
+ * LOOKUP lock separately in @lhr.
+ *
+ * \retval 0 on success
+ * \retval -ev negative errno upon error
+ */
+static int mdt_rename_source_lock(struct mdt_thread_info *info,
+ struct mdt_object *parent,
+ struct mdt_object *child,
+ struct mdt_lock_handle *lhc,
+ struct mdt_lock_handle *lhr,
+ __u64 ibits,
+ bool cos_incompat)
+{
+ int rc;
+
+ rc = mdt_is_remote_object(info, parent, child);
+ if (rc < 0)
+ return rc;
+
+ if (rc) {
+ /* enqueue remote LOOKUP lock from the parent MDT */
+ __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
+
+ if (mdt_object_remote(parent)) {
+ rc = mdt_remote_object_lock(info, parent,
+ mdt_object_fid(child),
+ &lhr->mlh_rreg_lh,
+ lhr->mlh_rreg_mode,
+ rmt_ibits, false);
+ if (rc != ELDLM_OK)
+ return rc;
+ } else {
+ LASSERT(mdt_object_remote(child));
+ rc = mdt_object_local_lock(info, child, lhr,
+ &rmt_ibits, 0, true);
+ if (rc < 0)
+ return rc;
+ }
+
+ ibits &= ~MDS_INODELOCK_LOOKUP;
+ }
+
+ if (mdt_object_remote(child)) {
+ rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
+ &lhc->mlh_rreg_lh,
+ lhc->mlh_rreg_mode,
+ ibits, false);
+ if (rc == ELDLM_OK)
+ rc = 0;
+ } else {
+ rc = mdt_reint_object_lock(info, child, lhc, ibits,
+ cos_incompat);
+ }
+
+ if (!rc)
+ mdt_object_unlock(info, child, lhr, rc);
+
+ return rc;
+}
+
+/*
* VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
* 2 - srcdir child; 3 - tgtdir child.
* Update on disk version of srcdir child.
struct mdt_lock_handle *lh_srcdirp;
struct mdt_lock_handle *lh_tgtdirp;
struct mdt_lock_handle *lh_oldp = NULL;
+ struct mdt_lock_handle *lh_rmt = NULL;
struct mdt_lock_handle *lh_newp = NULL;
struct lu_fid *old_fid = &info->mti_tmp_fid1;
struct lu_fid *new_fid = &info->mti_tmp_fid2;
GOTO(out_put_new, rc = -EISDIR);
lh_oldp = &info->mti_lh[MDT_LH_OLD];
+ lh_rmt = &info->mti_lh[MDT_LH_RMT];
mdt_lock_reg_init(lh_oldp, LCK_EX);
+ mdt_lock_reg_init(lh_rmt, LCK_EX);
lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
- if (mdt_object_remote(msrcdir)) {
- /* Enqueue lookup lock from the parent MDT */
- rc = mdt_remote_object_lock(info, msrcdir,
- mdt_object_fid(mold),
- &lh_oldp->mlh_rreg_lh,
- lh_oldp->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP,
- false);
- if (rc != ELDLM_OK)
- GOTO(out_put_new, rc);
-
- lock_ibits &= ~MDS_INODELOCK_LOOKUP;
- }
-
- rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits,
- cos_incompat);
- if (rc != 0)
- GOTO(out_unlock_old, rc);
+ rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
+ lh_rmt, lock_ibits, cos_incompat);
+ if (rc < 0)
+ GOTO(out_put_new, rc);
/* Check if @msrcdir is subdir of @mnew, before locking child
* to avoid reverse locking.
lh_newp = &info->mti_lh[MDT_LH_NEW];
mdt_lock_reg_init(lh_newp, LCK_EX);
- rc = mdt_reint_object_lock(info, mnew, lh_newp,
- MDS_INODELOCK_LOOKUP |
- MDS_INODELOCK_UPDATE,
+ lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
+ if (mdt_object_remote(mtgtdir)) {
+ rc = mdt_remote_object_lock(info, mtgtdir,
+ mdt_object_fid(mnew),
+ &lh_newp->mlh_rreg_lh,
+ lh_newp->mlh_rreg_mode,
+ MDS_INODELOCK_LOOKUP,
+ false);
+ if (rc != ELDLM_OK)
+ GOTO(out_unlock_old, rc);
+
+ lock_ibits &= ~MDS_INODELOCK_LOOKUP;
+ }
+ rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
cos_incompat);
if (rc != 0)
- GOTO(out_unlock_old, rc);
+ GOTO(out_unlock_new, rc);
/* get and save version after locking */
mdt_version_get_save(info, mnew, 3);
- } else if (rc != -EREMOTE && rc != -ENOENT) {
+ } else if (rc != -ENOENT) {
GOTO(out_put_old, rc);
} else {
lh_oldp = &info->mti_lh[MDT_LH_OLD];
+ lh_rmt = &info->mti_lh[MDT_LH_RMT];
mdt_lock_reg_init(lh_oldp, LCK_EX);
+ mdt_lock_reg_init(lh_rmt, LCK_EX);
lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
- if (mdt_object_remote(msrcdir)) {
- /* Enqueue lookup lock from the parent MDT */
- rc = mdt_remote_object_lock(info, msrcdir,
- mdt_object_fid(mold),
- &lh_oldp->mlh_rreg_lh,
- lh_oldp->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP,
- false);
- if (rc != ELDLM_OK)
- GOTO(out_put_old, rc);
-
- lock_ibits &= ~MDS_INODELOCK_LOOKUP;
- }
-
- rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits,
- cos_incompat);
+ rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
+ lh_rmt, lock_ibits, cos_incompat);
if (rc != 0)
- GOTO(out_unlock_old, rc);
+ GOTO(out_put_old, rc);
mdt_enoent_version_save(info, 3);
}
}
EXIT;
+out_unlock_new:
if (mnew != NULL)
mdt_object_unlock(info, mnew, lh_newp, rc);
out_unlock_old:
+ mdt_object_unlock(info, NULL, lh_rmt, rc);
mdt_object_unlock(info, mold, lh_oldp, rc);
out_put_new:
if (mnew && !discard)