int ln_namelen;
};
+static inline bool name_is_dot_or_dotdot(const char *name, int namelen)
+{
+ return name[0] == '.' &&
+ (namelen == 1 || (namelen == 2 && name[1] == '.'));
+}
+
+static inline bool lu_name_is_dot_or_dotdot(const struct lu_name *lname)
+{
+ return name_is_dot_or_dotdot(lname->ln_name, lname->ln_namelen);
+}
+
+static inline bool lu_name_is_valid_len(const char *name, size_t name_len)
+{
+ return name != NULL &&
+ name_len > 0 &&
+ name_len < INT_MAX &&
+ strlen(name) == name_len &&
+ memchr(name, '/', name_len) == NULL;
+}
+
/**
* Validate names (path components)
*
*/
static inline bool lu_name_is_valid_2(const char *name, size_t name_len)
{
- return name != NULL &&
- name_len > 0 &&
- name_len < INT_MAX &&
- name[name_len] == '\0' &&
- strlen(name) == name_len &&
- memchr(name, '/', name_len) == NULL;
+ return lu_name_is_valid_len(name, name_len) && name[name_len] == '\0';
}
static inline bool lu_name_is_valid(const struct lu_name *ln)
__u32 lsm_md_master_mdt_index;
__u32 lsm_md_hash_type;
__u32 lsm_md_layout_version;
+ __u32 lsm_md_migrate_offset;
+ __u32 lsm_md_migrate_hash;
__u32 lsm_md_default_count;
__u32 lsm_md_default_index;
char lsm_md_pool_name[LOV_MAXPOOLNAME + 1];
lsm1->lsm_md_hash_type != lsm2->lsm_md_hash_type ||
lsm1->lsm_md_layout_version !=
lsm2->lsm_md_layout_version ||
+ lsm1->lsm_md_migrate_offset !=
+ lsm2->lsm_md_migrate_offset ||
+ lsm1->lsm_md_migrate_hash !=
+ lsm2->lsm_md_migrate_hash ||
strcmp(lsm1->lsm_md_pool_name,
lsm2->lsm_md_pool_name) != 0)
return false;
unsigned int stripe_count,
const char *name, int namelen)
{
- int idx;
- __u32 hash_type = lmv_hash_type & LMV_HASH_TYPE_MASK;
+ int idx;
LASSERT(namelen > 0);
- if (stripe_count <= 1)
- return 0;
- /* for migrating object, always start from 0 stripe */
- if (lmv_hash_type & LMV_HASH_FLAG_MIGRATION)
+ if (stripe_count <= 1)
return 0;
- switch (hash_type) {
+ switch (lmv_hash_type & LMV_HASH_TYPE_MASK) {
case LMV_HASH_TYPE_ALL_CHARS:
idx = lmv_hash_all_chars(stripe_count, name, namelen);
break;
break;
}
- CDEBUG(D_INFO, "name %.*s hash_type %d idx %d\n", namelen, name,
- hash_type, idx);
+ CDEBUG(D_INFO, "name %.*s hash_type %#x idx %d/%u\n", namelen, name,
+ lmv_hash_type, idx, stripe_count);
return idx;
}
/** Additional parameters for create */
struct md_op_spec {
- union {
- /** symlink target */
- const char *sp_symname;
- /** eadata for regular files */
- struct md_spec_reg {
- const void *eadata;
- int eadatalen;
- } sp_ea;
- } u;
+ union {
+ /** symlink target */
+ const char *sp_symname;
+ /** eadata for regular files */
+ struct md_spec_reg {
+ void *eadata;
+ int eadatalen;
+ } sp_ea;
+ } u;
/** Create flag from client: such as MDS_OPEN_CREAT, and others. */
__u64 sp_cr_flags;
sp_permitted:1, /* do not check permission */
sp_migrate_close:1; /* close the file during migrate */
/** Current lock mode for parent dir where create is performing. */
- mdl_mode_t sp_cr_mode;
+ mdl_mode_t sp_cr_mode;
- /** to create directory */
- const struct dt_index_features *sp_feat;
+ /** to create directory */
+ const struct dt_index_features *sp_feat;
};
enum md_layout_opc {
int (*mdo_migrate)(const struct lu_env *env, struct md_object *pobj,
struct md_object *sobj, const struct lu_name *lname,
- struct md_object *tobj, struct md_attr *ma);
+ struct md_object *tobj, struct md_op_spec *spec,
+ struct md_attr *ma);
};
struct md_device_operations {
struct md_object *sobj,
const struct lu_name *lname,
struct md_object *tobj,
+ struct md_op_spec *spec,
struct md_attr *ma)
{
LASSERT(pobj->mo_dir_ops->mdo_migrate);
- return pobj->mo_dir_ops->mdo_migrate(env, pobj, sobj, lname, tobj, ma);
+ return pobj->mo_dir_ops->mdo_migrate(env, pobj, sobj, lname, tobj, spec,
+ ma);
}
static inline int mdo_is_subdir(const struct lu_env *env,
#define OBD_FAIL_INVALIDATE_UPDATE 0x1705
/* MIGRATE */
-#define OBD_FAIL_MIGRATE_NET_REP 0x1800
#define OBD_FAIL_MIGRATE_ENTRIES 0x1801
-#define OBD_FAIL_MIGRATE_LINKEA 0x1802
-#define OBD_FAIL_MIGRATE_DELAY 0x1803
/* LMV */
#define OBD_FAIL_UNKNOWN_LMV_STRIPE 0x1901
#define MDT_CONNECT_SUPPORTED2 (OBD_CONNECT2_FILE_SECCTX | OBD_CONNECT2_FLR | \
OBD_CONNECT2_SUM_STATFS | \
- OBD_CONNECT2_LOCK_CONVERT)
+ OBD_CONNECT2_LOCK_CONVERT | \
+ OBD_CONNECT2_DIR_MIGRATE)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
* used for now. Higher 16 bits will
* be used to mark the object status,
* for example migrating or dead. */
- __u32 lmv_layout_version; /* Used for directory restriping */
- __u32 lmv_padding1;
- __u64 lmv_padding2;
+ __u32 lmv_layout_version; /* increased each time layout changed,
+ * by directory migration, restripe
+ * and LFSCK. */
+ __u32 lmv_migrate_offset; /* once this is set, it means this
+ * directory is been migrated, stripes
+ * before this offset belong to target,
+ * from this to source. */
+ __u32 lmv_migrate_hash; /* hash type of source stripes of
+ * migrating directory */
+ __u32 lmv_padding2;
__u64 lmv_padding3;
char lmv_pool_name[LOV_MAXPOOLNAME + 1]; /* pool name */
struct lu_fid lmv_stripe_fids[0]; /* FIDs for each stripe */
extern const char *lfsck_param_names[];
extern struct lu_context_key lfsck_thread_key;
-static inline bool name_is_dot_or_dotdot(const char *name, int namelen)
-{
- return name[0] == '.' &&
- (namelen == 1 || (namelen == 2 && name[1] == '.'));
-}
-
static inline struct dt_device *lfsck_obj2dev(struct dt_object *obj)
{
return container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
dst->lmv_master_mdt_index = le32_to_cpu(src->lmv_master_mdt_index);
dst->lmv_hash_type = le32_to_cpu(src->lmv_hash_type);
dst->lmv_layout_version = le32_to_cpu(src->lmv_layout_version);
+ dst->lmv_migrate_offset = le32_to_cpu(src->lmv_migrate_offset);
+ dst->lmv_migrate_hash = le32_to_cpu(src->lmv_migrate_hash);
}
static inline void lfsck_lmv_header_cpu_to_le(struct lmv_mds_md_v1 *dst,
dst->lmv_master_mdt_index = cpu_to_le32(src->lmv_master_mdt_index);
dst->lmv_hash_type = cpu_to_le32(src->lmv_hash_type);
dst->lmv_layout_version = cpu_to_le32(src->lmv_layout_version);
+ dst->lmv_migrate_offset = cpu_to_le32(src->lmv_migrate_offset);
+ dst->lmv_migrate_hash = cpu_to_le32(src->lmv_migrate_hash);
}
static inline struct lfsck_assistant_object *
lmv3->lmv_magic = LMV_MAGIC;
lmv3->lmv_master_mdt_index = pidx;
+ lmv3->lmv_layout_version++;
if (flags & LEF_SET_LMV_ALL) {
rc = lfsck_allow_regenerate_master_lmv(env, com, obj,
if (!child_inode)
RETURN(-ENOENT);
+ if (!(exp_connect_flags2(ll_i2sbi(parent)->ll_md_exp) &
+ OBD_CONNECT2_DIR_MIGRATE)) {
+ if (le32_to_cpu(lum->lum_stripe_count) > 1 ||
+ ll_i2info(child_inode)->lli_lsm_md) {
+ CERROR("%s: MDT doesn't support stripe directory "
+ "migration!\n",
+ ll_get_fsname(parent->i_sb, NULL, 0));
+ GOTO(out_iput, rc = -EOPNOTSUPP);
+ }
+ }
+
/*
* lfs migrate command needs to be blocked on the client
* by checking the migrate FID against the FID of the
* where the initialization of slave inode is slightly
* different, so it reset lsm_md to NULL to avoid
* initializing lsm for slave inode. */
- /* For migrating inode, master stripe and master object will
- * be same, so we only need assign this inode */
- if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION && i == 0)
- lsm->lsm_md_oinfo[i].lmo_root = inode;
- else
- lsm->lsm_md_oinfo[i].lmo_root =
+ lsm->lsm_md_oinfo[i].lmo_root =
ll_iget_anon_dir(inode->i_sb, fid, md);
-
if (IS_ERR(lsm->lsm_md_oinfo[i].lmo_root)) {
int rc = PTR_ERR(lsm->lsm_md_oinfo[i].lmo_root);
return 0;
}
-static inline int lli_lsm_md_eq(const struct lmv_stripe_md *lsm_md1,
- const struct lmv_stripe_md *lsm_md2)
-{
- return lsm_md1->lsm_md_magic == lsm_md2->lsm_md_magic &&
- lsm_md1->lsm_md_stripe_count == lsm_md2->lsm_md_stripe_count &&
- lsm_md1->lsm_md_master_mdt_index ==
- lsm_md2->lsm_md_master_mdt_index &&
- lsm_md1->lsm_md_hash_type == lsm_md2->lsm_md_hash_type &&
- lsm_md1->lsm_md_layout_version ==
- lsm_md2->lsm_md_layout_version &&
- strcmp(lsm_md1->lsm_md_pool_name,
- lsm_md2->lsm_md_pool_name) == 0;
-}
-
static int ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
{
struct ll_inode_info *lli = ll_i2info(inode);
CDEBUG(D_INODE, "update lsm %p of "DFID"\n", lli->lli_lsm_md,
PFID(ll_inode2fid(inode)));
- /* no striped information from request. */
- if (lsm == NULL) {
- if (lli->lli_lsm_md == NULL) {
- RETURN(0);
- } else if (lli->lli_lsm_md->lsm_md_hash_type &
- LMV_HASH_FLAG_MIGRATION) {
- /* migration is done, the temporay MIGRATE layout has
- * been removed */
- CDEBUG(D_INODE, DFID" finish migration.\n",
- PFID(ll_inode2fid(inode)));
- lmv_free_memmd(lli->lli_lsm_md);
- lli->lli_lsm_md = NULL;
- RETURN(0);
- } else {
- /* The lustre_md from req does not include stripeEA,
- * see ll_md_setattr */
- RETURN(0);
- }
+ /*
+ * no striped information from request, lustre_md from req does not
+ * include stripeEA, see ll_md_setattr()
+ */
+ if (!lsm)
+ RETURN(0);
+
+ /* Compare the old and new stripe information */
+ if (lli->lli_lsm_md && !lsm_md_eq(lli->lli_lsm_md, lsm)) {
+ struct lmv_stripe_md *old_lsm = lli->lli_lsm_md;
+ int idx;
+ bool layout_changed = lsm->lsm_md_layout_version >
+ old_lsm->lsm_md_layout_version;
+
+ int mask = layout_changed ? D_INODE : D_ERROR;
+
+ CDEBUG(mask,
+ "%s: inode@%p "DFID" lmv layout %s magic %#x/%#x "
+ "stripe count %d/%d master_mdt %d/%d "
+ "hash_type %#x/%#x version %d/%d migrate offset %d/%d "
+ "migrate hash %#x/%#x pool %s/%s\n",
+ ll_get_fsname(inode->i_sb, NULL, 0), inode,
+ PFID(&lli->lli_fid),
+ layout_changed ? "changed" : "mismatch",
+ lsm->lsm_md_magic, old_lsm->lsm_md_magic,
+ lsm->lsm_md_stripe_count,
+ old_lsm->lsm_md_stripe_count,
+ lsm->lsm_md_master_mdt_index,
+ old_lsm->lsm_md_master_mdt_index,
+ lsm->lsm_md_hash_type, old_lsm->lsm_md_hash_type,
+ lsm->lsm_md_layout_version,
+ old_lsm->lsm_md_layout_version,
+ lsm->lsm_md_migrate_offset,
+ old_lsm->lsm_md_migrate_offset,
+ lsm->lsm_md_migrate_hash,
+ old_lsm->lsm_md_migrate_hash,
+ lsm->lsm_md_pool_name,
+ old_lsm->lsm_md_pool_name);
+
+ for (idx = 0; idx < old_lsm->lsm_md_stripe_count; idx++)
+ CDEBUG(mask, "old stripe[%d] "DFID"\n",
+ idx, PFID(&old_lsm->lsm_md_oinfo[idx].lmo_fid));
+
+ for (idx = 0; idx < lsm->lsm_md_stripe_count; idx++)
+ CDEBUG(mask, "new stripe[%d] "DFID"\n",
+ idx, PFID(&lsm->lsm_md_oinfo[idx].lmo_fid));
+
+ if (!layout_changed)
+ RETURN(-EINVAL);
+
+ ll_dir_clear_lsm_md(inode);
}
/* set the directory layout */
- if (lli->lli_lsm_md == NULL) {
+ if (!lli->lli_lsm_md) {
struct cl_attr *attr;
rc = ll_init_lsm_md(inode, md);
return sizeof(*lsm) + stripe_count * sizeof(lsm->lsm_md_oinfo[0]);
}
+/* for file under migrating directory, return the target stripe info */
static inline const struct lmv_oinfo *
lsm_name_to_stripe_info(const struct lmv_stripe_md *lsm, const char *name,
int namelen)
{
+ __u32 hash_type = lsm->lsm_md_hash_type;
+ __u32 stripe_count = lsm->lsm_md_stripe_count;
int stripe_index;
- stripe_index = lmv_name_to_stripe_index(lsm->lsm_md_hash_type,
- lsm->lsm_md_stripe_count,
+ if (hash_type & LMV_HASH_FLAG_MIGRATION) {
+ hash_type &= ~LMV_HASH_FLAG_MIGRATION;
+ stripe_count = lsm->lsm_md_migrate_offset;
+ }
+
+ stripe_index = lmv_name_to_stripe_index(hash_type, stripe_count,
name, namelen);
if (stripe_index < 0)
return ERR_PTR(stripe_index);
RETURN(rc);
}
-static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
- const char *old, size_t oldlen,
- const char *new, size_t newlen,
- struct ptlrpc_request **request)
+static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data,
+ const char *name, size_t namelen,
+ struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *src_tgt;
- struct lmv_tgt_desc *tgt_tgt;
- struct obd_export *target_exp;
- struct mdt_body *body;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ struct lmv_tgt_desc *parent_tgt;
+ struct lmv_tgt_desc *sp_tgt;
+ struct lmv_tgt_desc *tp_tgt = NULL;
+ struct lmv_tgt_desc *child_tgt;
+ struct lmv_tgt_desc *tgt;
+ struct lu_fid target_fid;
+ int rc;
+
ENTRY;
- LASSERT(oldlen != 0);
+ LASSERT(op_data->op_cli_flags & CLI_MIGRATE);
+ LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
+ PFID(&op_data->op_fid3));
- CDEBUG(D_INODE, "RENAME %.*s in "DFID":%d to %.*s in "DFID":%d\n",
- (int)oldlen, old, PFID(&op_data->op_fid1),
- op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
- (int)newlen, new, PFID(&op_data->op_fid2),
- op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
+ CDEBUG(D_INODE, "MIGRATE "DFID"/%.*s\n",
+ PFID(&op_data->op_fid1), (int)namelen, name);
op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
op_data->op_cap = cfs_curproc_cap_pack();
- if (op_data->op_cli_flags & CLI_MIGRATE) {
- LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
- PFID(&op_data->op_fid3));
-
- if (op_data->op_mea1 != NULL) {
- struct lmv_stripe_md *lsm = op_data->op_mea1;
- struct lmv_tgt_desc *tmp;
-
- /* Fix the parent fid for striped dir */
- tmp = lmv_locate_target_for_name(lmv, lsm, old,
- oldlen,
- &op_data->op_fid1,
- NULL);
- if (IS_ERR(tmp))
- RETURN(PTR_ERR(tmp));
- }
-
- rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
- if (rc != 0)
- RETURN(rc);
- src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
- if (IS_ERR(src_tgt))
- RETURN(PTR_ERR(src_tgt));
+ parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(parent_tgt))
+ RETURN(PTR_ERR(parent_tgt));
- target_exp = src_tgt->ltd_exp;
- } else {
- if (op_data->op_mea1 != NULL) {
- struct lmv_stripe_md *lsm = op_data->op_mea1;
+ if (lsm) {
+ __u32 hash_type = lsm->lsm_md_hash_type;
+ __u32 stripe_count = lsm->lsm_md_stripe_count;
- src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
- oldlen,
- &op_data->op_fid1,
- &op_data->op_mds);
- } else {
- src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ /*
+ * old stripes are appended after new stripes for migrating
+ * directory.
+ */
+ if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION) {
+ hash_type = lsm->lsm_md_migrate_hash;
+ stripe_count -= lsm->lsm_md_migrate_offset;
}
- if (IS_ERR(src_tgt))
- RETURN(PTR_ERR(src_tgt));
+ rc = lmv_name_to_stripe_index(hash_type, stripe_count, name,
+ namelen);
+ if (rc < 0)
+ RETURN(rc);
- if (op_data->op_mea2 != NULL) {
- struct lmv_stripe_md *lsm = op_data->op_mea2;
+ if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION)
+ rc += lsm->lsm_md_migrate_offset;
- tgt_tgt = lmv_locate_target_for_name(lmv, lsm, new,
- newlen,
- &op_data->op_fid2,
- &op_data->op_mds);
- } else {
- tgt_tgt = lmv_find_target(lmv, &op_data->op_fid2);
+ /* save it in fid4 temporarily for early cancel */
+ op_data->op_fid4 = lsm->lsm_md_oinfo[rc].lmo_fid;
+ sp_tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[rc].lmo_mds,
+ NULL);
+ if (IS_ERR(sp_tgt))
+ RETURN(PTR_ERR(sp_tgt));
+ /*
+ * if parent is being migrated too, fill op_fid2 with target
+ * stripe fid, otherwise the target stripe is not created yet.
+ */
+ if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION) {
+ hash_type = lsm->lsm_md_hash_type &
+ ~LMV_HASH_FLAG_MIGRATION;
+ stripe_count = lsm->lsm_md_migrate_offset;
+
+ rc = lmv_name_to_stripe_index(hash_type, stripe_count,
+ name, namelen);
+ if (rc < 0)
+ RETURN(rc);
+
+ op_data->op_fid2 = lsm->lsm_md_oinfo[rc].lmo_fid;
+ tp_tgt = lmv_get_target(lmv,
+ lsm->lsm_md_oinfo[rc].lmo_mds,
+ NULL);
+ if (IS_ERR(tp_tgt))
+ RETURN(PTR_ERR(tp_tgt));
}
- if (IS_ERR(tgt_tgt))
- RETURN(PTR_ERR(tgt_tgt));
-
- target_exp = tgt_tgt->ltd_exp;
+ } else {
+ sp_tgt = parent_tgt;
}
- /*
- * LOOKUP lock on src child (fid3) should also be cancelled for
- * src_tgt in mdc_rename.
- */
- op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+ child_tgt = lmv_find_target(lmv, &op_data->op_fid3);
+ if (IS_ERR(child_tgt))
+ RETURN(PTR_ERR(child_tgt));
- /*
- * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
- * own target.
- */
- rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_UPDATE,
- MF_MDC_CANCEL_FID2);
-
- if (rc != 0)
+ rc = lmv_fid_alloc(NULL, exp, &target_fid, op_data);
+ if (rc)
RETURN(rc);
+
/*
- * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
+ * for directory, send migrate request to the MDT where the object will
+ * be migrated to, because we can't create a striped directory remotely.
+ *
+ * otherwise, send to the MDT where source is located because regular
+ * file may open lease.
+ *
+ * NB. if MDT doesn't support DIR_MIGRATE, send to source MDT too for
+ * backward compatibility.
*/
- if (fid_is_sane(&op_data->op_fid3)) {
- struct lmv_tgt_desc *tgt;
-
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (S_ISDIR(op_data->op_mode) &&
+ (exp_connect_flags2(exp) & OBD_CONNECT2_DIR_MIGRATE)) {
+ tgt = lmv_find_target(lmv, &target_fid);
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
+ } else {
+ tgt = child_tgt;
+ }
- /* Cancel LOOKUP lock on its parent */
- rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_LOOKUP,
- MF_MDC_CANCEL_FID3);
- if (rc != 0)
+ /* cancel UPDATE lock of parent master object */
+ rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+ if (rc)
+ RETURN(rc);
+
+ /* cancel UPDATE lock of source parent */
+ if (sp_tgt != parent_tgt) {
+ /*
+ * migrate RPC packs master object FID, because we can only pack
+ * two FIDs in reint RPC, but MDS needs to know both source
+ * parent and target parent, and it will obtain them from master
+ * FID and LMV, the other FID in RPC is kept for target.
+ *
+ * since this FID is not passed to MDC, cancel it anyway.
+ */
+ rc = lmv_early_cancel(exp, sp_tgt, op_data, -1, LCK_EX,
+ MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID4);
+ if (rc)
RETURN(rc);
- rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_ELC,
+ op_data->op_flags &= ~MF_MDC_CANCEL_FID4;
+ }
+ op_data->op_fid4 = target_fid;
+
+ /* cancel UPDATE locks of target parent */
+ rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
+ if (rc)
+ RETURN(rc);
+
+ /* cancel LOOKUP lock of source if source is remote object */
+ if (child_tgt != sp_tgt) {
+ rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_LOOKUP,
MF_MDC_CANCEL_FID3);
- if (rc != 0)
+ if (rc)
RETURN(rc);
}
-retry_rename:
- /*
- * Cancel all the locks on tgt child (fid4).
- */
- if (fid_is_sane(&op_data->op_fid4)) {
- struct lmv_tgt_desc *tgt;
+ /* cancel ELC locks of source */
+ rc = lmv_early_cancel(exp, child_tgt, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3);
+ if (rc)
+ RETURN(rc);
- rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_ELC,
- MF_MDC_CANCEL_FID4);
- if (rc != 0)
- RETURN(rc);
+ rc = md_rename(tgt->ltd_exp, op_data, name, namelen, NULL, 0, request);
+
+ RETURN(rc);
+}
+
+static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
+ const char *old, size_t oldlen,
+ const char *new, size_t newlen,
+ struct ptlrpc_request **request)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ struct lmv_tgt_desc *sp_tgt;
+ struct lmv_tgt_desc *tp_tgt = NULL;
+ struct lmv_tgt_desc *tgt;
+ struct mdt_body *body;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(oldlen != 0);
+
+ if (op_data->op_cli_flags & CLI_MIGRATE) {
+ rc = lmv_migrate(exp, op_data, old, oldlen, request);
+ RETURN(rc);
+ }
+
+ op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
+ op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
+ op_data->op_cap = cfs_curproc_cap_pack();
+
+ CDEBUG(D_INODE, "RENAME "DFID"/%.*s to "DFID"/%.*s\n",
+ PFID(&op_data->op_fid1), (int)oldlen, old,
+ PFID(&op_data->op_fid2), (int)newlen, new);
+ if (lsm)
+ sp_tgt = lmv_locate_target_for_name(lmv, lsm, old, oldlen,
+ &op_data->op_fid1,
+ &op_data->op_mds);
+ else
+ sp_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(sp_tgt))
+ RETURN(PTR_ERR(sp_tgt));
+
+ lsm = op_data->op_mea2;
+ if (lsm)
+ tp_tgt = lmv_locate_target_for_name(lmv, lsm, new, newlen,
+ &op_data->op_fid2,
+ &op_data->op_mds);
+ else
+ tp_tgt = lmv_find_target(lmv, &op_data->op_fid2);
+ if (IS_ERR(tp_tgt))
+ RETURN(PTR_ERR(tp_tgt));
+
+ /* Since the target child might be destroyed, and it might become
+ * orphan, and we can only check orphan on the local MDT right now, so
+ * we send rename request to the MDT where target child is located. If
+ * target child does not exist, then it will send the request to the
+ * target parent */
+ if (fid_is_sane(&op_data->op_fid4)) {
tgt = lmv_find_target(lmv, &op_data->op_fid4);
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
+ } else {
+ tgt = tp_tgt;
+ }
+
+ op_data->op_flags |= MF_MDC_CANCEL_FID4;
+
+ /* cancel UPDATE locks of source parent */
+ rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* cancel UPDATE locks of target parent */
+ rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (fid_is_sane(&op_data->op_fid3)) {
+ struct lmv_tgt_desc *src_tgt;
+
+ src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
+ if (IS_ERR(src_tgt))
+ RETURN(PTR_ERR(src_tgt));
+
+ /* cancel LOOKUP lock of source on source parent */
+ if (src_tgt != sp_tgt) {
+ rc = lmv_early_cancel(exp, sp_tgt, op_data,
+ tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_LOOKUP,
+ MF_MDC_CANCEL_FID3);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ /* cancel ELC locks of source */
+ rc = lmv_early_cancel(exp, src_tgt, op_data, tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_ELC,
+ MF_MDC_CANCEL_FID3);
+ if (rc != 0)
+ RETURN(rc);
+ }
- /* Since the target child might be destroyed, and it might
- * become orphan, and we can only check orphan on the local
- * MDT right now, so we send rename request to the MDT where
- * target child is located. If target child does not exist,
- * then it will send the request to the target parent */
- target_exp = tgt->ltd_exp;
+retry_rename:
+ if (fid_is_sane(&op_data->op_fid4)) {
+ /* cancel LOOKUP lock of target on target parent */
+ if (tgt != tp_tgt) {
+ rc = lmv_early_cancel(exp, tp_tgt, op_data,
+ tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_LOOKUP,
+ MF_MDC_CANCEL_FID4);
+ if (rc != 0)
+ RETURN(rc);
+ }
}
- rc = md_rename(target_exp, op_data, old, oldlen, new, newlen,
- request);
+ rc = md_rename(tgt->ltd_exp, op_data, old, oldlen, new, newlen,
+ request);
if (rc != 0 && rc != -EXDEV)
RETURN(rc);
op_data->op_fid4 = body->mbo_fid1;
ptlrpc_req_finished(*request);
*request = NULL;
+
+ tgt = lmv_find_target(lmv, &op_data->op_fid4);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
goto retry_rename;
}
else
lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
+ lsm->lsm_md_migrate_offset = le32_to_cpu(lmm1->lmv_migrate_offset);
+ lsm->lsm_md_migrate_hash = le32_to_cpu(lmm1->lmv_migrate_hash);
cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
sizeof(lsm->lsm_md_pool_name));
if (cplen >= sizeof(lsm->lsm_md_pool_name))
RETURN(-E2BIG);
- CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d"
+ CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %#x "
"layout_version %d\n", lsm->lsm_md_stripe_count,
lsm->lsm_md_master_mdt_index, lsm->lsm_md_hash_type,
lsm->lsm_md_layout_version);
/* Free memmd */
if (lsm != NULL && lmm == NULL) {
int i;
- for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
- /* For migrating inode, the master stripe and master
- * object will be the same, so do not need iput, see
- * ll_update_lsm_md */
- if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION &&
- i == 0) && lsm->lsm_md_oinfo[i].lmo_root != NULL)
- iput(lsm->lsm_md_oinfo[i].lmo_root);
- }
+
+ for (i = 0; i < lsm->lsm_md_stripe_count; i++)
+ iput(lsm->lsm_md_oinfo[i].lmo_root);
lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
OBD_FREE(lsm, lsm_size);
*lsmp = NULL;
__u16 ldo_dir_stripes_allocated;
__u32 ldo_dir_stripe_offset;
__u32 ldo_dir_hash_type;
+ __u32 ldo_dir_migrate_offset;
+ __u32 ldo_dir_migrate_hash;
/* Is a slave stripe of striped directory? */
__u32 ldo_dir_slave_stripe:1,
ldo_dir_striped:1,
int rc;
ENTRY;
- /* If it is not a striped directory, then load nothing. */
if (magic != LMV_MAGIC_V1)
RETURN(0);
- /* If it is in migration (or failure), then load nothing. */
- if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
- RETURN(0);
-
stripes = le32_to_cpu(lmv1->lmv_stripe_count);
if (stripes < 1)
RETURN(0);
}
lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
+ memset(lmm1, 0, sizeof(*lmm1));
lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
+ if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) {
+ lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash);
+ lmm1->lmv_migrate_offset =
+ cpu_to_le32(lo->ldo_dir_migrate_offset);
+ }
rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
&mdtidx, &type);
if (rc != 0)
}
/**
+ * Append source stripes after target stripes for migrating directory. NB, we
+ * only need to declare this, the append is done inside lod_xattr_set_lmv().
+ *
+ * \param[in] env execution environment
+ * \param[in] dt target object
+ * \param[in] buf LMV buf which contains source stripe fids
+ * \param[in] th transaction handle
+ *
+ * \retval 0 on success
+ * \retval negative if failed
+ */
+static int lod_dir_declare_layout_add(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct dt_object_format *dof = &info->lti_format;
+ struct lmv_mds_md_v1 *lmv = buf->lb_buf;
+ struct dt_object **stripe;
+ __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+ struct lu_fid *fid = &info->lti_fid;
+ struct lod_tgt_desc *tgt;
+ struct dt_object *dto;
+ struct dt_device *tgt_dt;
+ int type = LU_SEQ_RANGE_ANY;
+ struct dt_insert_rec *rec = &info->lti_dt_rec;
+ char *stripe_name = info->lti_key;
+ struct lu_name *sname;
+ struct linkea_data ldata = { NULL };
+ struct lu_buf linkea_buf;
+ __u32 idx;
+ int i;
+ int rc;
+
+ ENTRY;
+
+ if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+ RETURN(-EINVAL);
+
+ if (stripe_count == 0)
+ RETURN(-EINVAL);
+
+ dof->dof_type = DFT_DIR;
+
+ OBD_ALLOC(stripe,
+ sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count));
+ if (stripe == NULL)
+ RETURN(-ENOMEM);
+
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++)
+ stripe[i] = lo->ldo_stripe[i];
+
+ for (i = 0; i < stripe_count; i++) {
+ fid_le_to_cpu(fid,
+ &lmv->lmv_stripe_fids[i]);
+ if (!fid_is_sane(fid))
+ GOTO(out, rc = -ESTALE);
+
+ rc = lod_fld_lookup(env, lod, fid, &idx, &type);
+ if (rc)
+ GOTO(out, rc);
+
+ if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
+ tgt_dt = lod->lod_child;
+ } else {
+ tgt = LTD_TGT(ltd, idx);
+ if (tgt == NULL)
+ GOTO(out, rc = -ESTALE);
+ tgt_dt = tgt->ltd_tgt;
+ }
+
+ dto = dt_locate_at(env, tgt_dt, fid,
+ lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
+ NULL);
+ if (IS_ERR(dto))
+ GOTO(out, rc = PTR_ERR(dto));
+
+ stripe[i + lo->ldo_dir_stripe_count] = dto;
+
+ if (!dt_try_as_dir(env, dto))
+ GOTO(out, rc = -ENOTDIR);
+
+ rc = lod_sub_declare_ref_add(env, dto, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_xattr_set(env, dto, buf,
+ XATTR_NAME_LMV, 0, th);
+ if (rc)
+ GOTO(out, rc);
+
+ snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
+ PFID(lu_object_fid(&dto->do_lu)),
+ i + lo->ldo_dir_stripe_count);
+
+ sname = lod_name_get(env, stripe_name, strlen(stripe_name));
+ rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+ sname, lu_object_fid(&dt->do_lu));
+ if (rc)
+ GOTO(out, rc);
+
+ linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+ linkea_buf.lb_len = ldata.ld_leh->leh_len;
+ rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
+ XATTR_NAME_LINK, 0, th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_insert(env, next,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)stripe_name,
+ th);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = lod_sub_declare_ref_add(env, next, th);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ if (lo->ldo_stripe)
+ OBD_FREE(lo->ldo_stripe,
+ sizeof(*stripe) * lo->ldo_dir_stripes_allocated);
+ lo->ldo_stripe = stripe;
+ lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count;
+ lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type);
+ lo->ldo_dir_stripe_count += stripe_count;
+ lo->ldo_dir_stripes_allocated += stripe_count;
+ lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION;
+
+ RETURN(0);
+out:
+ i = lo->ldo_dir_stripe_count;
+ while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i])
+ dt_object_put(env, stripe[i++]);
+
+ OBD_FREE(stripe,
+ sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count));
+ RETURN(rc);
+}
+
+static int lod_dir_declare_layout_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct lmv_user_md *lmu = buf->lb_buf;
+ __u32 final_stripe_count;
+ char *stripe_name = info->lti_key;
+ struct dt_object *dto;
+ int i;
+ int rc = 0;
+
+ if (!lmu)
+ return -EINVAL;
+
+ final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+ if (final_stripe_count >= lo->ldo_dir_stripe_count)
+ return -EINVAL;
+
+ for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
+ dto = lo->ldo_stripe[i];
+ LASSERT(dto);
+
+ if (!dt_try_as_dir(env, dto))
+ return -ENOTDIR;
+
+ rc = lod_sub_declare_delete(env, dto,
+ (const struct dt_key *)dot, th);
+ if (rc)
+ return rc;
+
+ rc = lod_sub_declare_ref_del(env, dto, th);
+ if (rc)
+ return rc;
+
+ rc = lod_sub_declare_delete(env, dto,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ return rc;
+
+ snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+ PFID(lu_object_fid(&dto->do_lu)), i);
+
+ rc = lod_sub_declare_delete(env, next,
+ (const struct dt_key *)stripe_name, th);
+ if (rc)
+ return rc;
+
+ rc = lod_sub_declare_ref_del(env, next, th);
+ if (rc)
+ return rc;
+ }
+
+ return 0;
+}
+
+/*
+ * delete stripes from dir master object, the lum_stripe_count in argument is
+ * the final stripe count, the stripes after that will be deleted, NB, they
+ * are not destroyed, but deleted from it's parent namespace, this function
+ * will be called in two places:
+ * 1. mdd_migrate_create() delete stripes from source, and append them to
+ * target.
+ * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them.
+ */
+static int lod_dir_layout_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ struct lmv_user_md *lmu = buf->lb_buf;
+ __u32 final_stripe_count;
+ char *stripe_name = info->lti_key;
+ struct dt_object *dto;
+ int i;
+ int rc = 0;
+
+ ENTRY;
+
+ if (!lmu)
+ RETURN(-EINVAL);
+
+ final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+ if (final_stripe_count >= lo->ldo_dir_stripe_count)
+ RETURN(-EINVAL);
+
+ for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
+ dto = lo->ldo_stripe[i];
+ LASSERT(dto);
+
+ rc = lod_sub_delete(env, dto,
+ (const struct dt_key *)dotdot, th);
+ if (rc)
+ break;
+
+ snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+ PFID(lu_object_fid(&dto->do_lu)), i);
+
+ rc = lod_sub_delete(env, next,
+ (const struct dt_key *)stripe_name, th);
+ if (rc)
+ break;
+
+ rc = lod_sub_ref_del(env, next, th);
+ if (rc)
+ break;
+ }
+
+ lod_striping_free(env, lod_dt_obj(dt));
+
+ RETURN(rc);
+}
+
+/**
* Implementation of dt_object_operations::do_declare_xattr_set.
*
* Used with regular (non-striped) objects. Basically it
}
filter_fid_le_to_cpu(ff, ff, sizeof(*ff));
- if (lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) &&
+
+ /*
+ * mdd_declare_migrate_create() declares this via source object because
+ * target is not ready yet, so declare anyway.
+ */
+ if (!data->locd_declare &&
+ lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) &&
ff->ff_layout.ol_comp_id == comp->llc_id)
return 0;
RETURN(-ENOENT);
rc = lod_declare_modify_layout(env, dt, name, buf, th);
+ } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
+ strlen(name) > strlen(XATTR_NAME_LMV) + 1) {
+ const char *op = name + strlen(XATTR_NAME_LMV) + 1;
+
+ rc = -ENOTSUPP;
+ if (strcmp(op, "add") == 0)
+ rc = lod_dir_declare_layout_add(env, dt, buf, th);
+ else if (strcmp(op, "del") == 0)
+ rc = lod_dir_declare_layout_delete(env, dt, buf, th);
+
+ RETURN(rc);
} else if (S_ISDIR(mode)) {
rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
} else if (strcmp(name, XATTR_NAME_FID) == 0) {
rec->rec_type = S_IFDIR;
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- struct dt_object *dto;
- char *stripe_name = info->lti_key;
- struct lu_name *sname;
- struct linkea_data ldata = { NULL };
- struct lu_buf linkea_buf;
-
- dto = lo->ldo_stripe[i];
+ struct dt_object *dto = lo->ldo_stripe[i];
+ char *stripe_name = info->lti_key;
+ struct lu_name *sname;
+ struct linkea_data ldata = { NULL };
+ struct lu_buf linkea_buf;
+
+ /* if it's source stripe of migrating directory, don't create */
+ if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
+ i >= lo->ldo_dir_migrate_offset)) {
+ dt_write_lock(env, dto, MOR_TGT_CHILD);
+ rc = lod_sub_create(env, dto, attr, NULL, dof, th);
+ if (rc != 0) {
+ dt_write_unlock(env, dto);
+ GOTO(out, rc);
+ }
- dt_write_lock(env, dto, MOR_TGT_CHILD);
- rc = lod_sub_create(env, dto, attr, NULL, dof, th);
- if (rc != 0) {
+ rc = lod_sub_ref_add(env, dto, th);
dt_write_unlock(env, dto);
- GOTO(out, rc);
- }
-
- rc = lod_sub_ref_add(env, dto, th);
- dt_write_unlock(env, dto);
- if (rc != 0)
- GOTO(out, rc);
+ if (rc != 0)
+ GOTO(out, rc);
- rec->rec_fid = lu_object_fid(&dto->do_lu);
- rc = lod_sub_insert(env, dto, (const struct dt_rec *)rec,
- (const struct dt_key *)dot, th, 0);
- if (rc != 0)
- GOTO(out, rc);
+ rec->rec_fid = lu_object_fid(&dto->do_lu);
+ rc = lod_sub_insert(env, dto,
+ (const struct dt_rec *)rec,
+ (const struct dt_key *)dot, th, 0);
+ if (rc != 0)
+ GOTO(out, rc);
+ }
rec->rec_fid = lu_object_fid(&dt->do_lu);
rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_LMV) == 0) {
- struct lmv_mds_md_v1 *lmm = buf->lb_buf;
+ rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
+ RETURN(rc);
+ } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+ strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
+ strlen(name) > strlen(XATTR_NAME_LMV) + 1) {
+ const char *op = name + strlen(XATTR_NAME_LMV) + 1;
- if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
- LMV_HASH_FLAG_MIGRATION)
- rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
- else
- rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
+ rc = -ENOTSUPP;
+ if (strcmp(op, "del") == 0)
+ rc = lod_dir_layout_delete(env, dt, buf, th);
+ /*
+ * XATTR_NAME_LMV".add" is never called, but only declared,
+ * because lod_xattr_set_lmv() will do the addition.
+ */
RETURN(rc);
- }
-
- if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+ } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
strcmp(name, XATTR_NAME_LOV) == 0) {
struct lod_thread_info *info = lod_env_info(env);
struct lod_default_striping *lds = &info->lti_def_striping;
struct dt_object *dt, const char *name,
struct thandle *th)
{
- struct lod_object *lo = lod_dt_obj(dt);
- int rc;
- int i;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object *next = dt_object_child(dt);
+ int i;
+ int rc;
ENTRY;
- rc = lod_sub_declare_xattr_del(env, dt_object_child(dt), name, th);
+ rc = lod_sub_declare_xattr_del(env, next, name, th);
if (rc != 0)
RETURN(rc);
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
- rc = lod_sub_declare_xattr_del(env, lo->ldo_stripe[i],
- name, th);
+ struct dt_object *dto = lo->ldo_stripe[i];
+
+ LASSERT(dto);
+ rc = lod_sub_declare_xattr_del(env, dto, name, th);
if (rc != 0)
break;
}
int i;
ENTRY;
- if (!strcmp(name, XATTR_NAME_LOV))
+ if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV))
lod_striping_free(env, lod_dt_obj(dt));
rc = lod_sub_xattr_del(env, next, name, th);
RETURN(0);
for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
- LASSERT(lo->ldo_stripe[i]);
+ struct dt_object *dto = lo->ldo_stripe[i];
- rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th);
+ LASSERT(dto);
+
+ rc = lod_sub_xattr_del(env, dto, name, th);
if (rc != 0)
break;
}
} else {
/* transfer defaults LMV to new directory */
lod_striping_from_default(lc, lds, child_mode);
+
+ /* set count 0 to create normal directory */
+ if (lc->ldo_dir_stripe_count == 1)
+ lc->ldo_dir_stripe_count = 0;
}
/* shrink the stripe_count to the avaible MDT count */
if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 &&
- !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) {
lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1;
-
- /* Directory will be striped only if stripe_count > 1, if
- * stripe_count == 1, let's reset stripe_count = 0 to avoid
- * create single master stripe and also help to unify the
- * stripe handling of directories and files */
- if (lc->ldo_dir_stripe_count == 1)
- lc->ldo_dir_stripe_count = 0;
+ if (lc->ldo_dir_stripe_count == 1)
+ lc->ldo_dir_stripe_count = 0;
+ }
CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n",
lc->ldo_dir_stripe_count,
RETURN(0);
LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
- LASSERT(lo->ldo_dir_stripe_count > 1);
/* Note: for remote lock for single stripe dir, MDT will cancel
* the lock by lockh directly */
LASSERT(!dt_object_remote(dt_object_child(dt)));
void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
const char *old, size_t oldlen,
const char *new, size_t newlen);
+void mdc_migrate_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+ const char *name, size_t namelen);
void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
/* mdc/mdc_locks.c */
rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
/* XXX do something about time, uid, gid */
- rec->rn_opcode = op_data->op_cli_flags & CLI_MIGRATE ?
- REINT_MIGRATE : REINT_RENAME;
+ rec->rn_opcode = REINT_RENAME;
rec->rn_fsuid = op_data->op_fsuid;
rec->rn_fsgid = op_data->op_fsgid;
rec->rn_cap = op_data->op_cap;
if (new != NULL)
mdc_pack_name(req, &RMF_SYMTGT, new, newlen);
+}
- if (op_data->op_cli_flags & CLI_MIGRATE) {
- char *tmp;
+void mdc_migrate_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+ const char *name, size_t namelen)
+{
+ struct mdt_rec_rename *rec;
+ char *ea;
- if (op_data->op_bias & MDS_CLOSE_MIGRATE) {
- struct mdt_ioepoch *epoch;
+ CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_rename));
+ rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
- mdc_close_intent_pack(req, op_data);
- epoch = req_capsule_client_get(&req->rq_pill,
- &RMF_MDT_EPOCH);
- mdc_ioepoch_pack(epoch, op_data);
- }
+ rec->rn_opcode = REINT_MIGRATE;
+ rec->rn_fsuid = op_data->op_fsuid;
+ rec->rn_fsgid = op_data->op_fsgid;
+ rec->rn_cap = op_data->op_cap;
+ rec->rn_suppgid1 = op_data->op_suppgids[0];
+ rec->rn_suppgid2 = op_data->op_suppgids[1];
+ rec->rn_fid1 = op_data->op_fid1;
+ rec->rn_fid2 = op_data->op_fid4;
+ rec->rn_time = op_data->op_mod_time;
+ rec->rn_mode = op_data->op_mode;
+ rec->rn_bias = op_data->op_bias;
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
- memcpy(tmp, op_data->op_data, op_data->op_data_size);
+ mdc_pack_name(req, &RMF_NAME, name, namelen);
+
+ if (op_data->op_bias & MDS_CLOSE_MIGRATE) {
+ struct mdt_ioepoch *epoch;
+
+ mdc_close_intent_pack(req, op_data);
+ epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
+ mdc_ioepoch_pack(epoch, op_data);
}
+
+ ea = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
+ memcpy(ea, op_data->op_data, op_data->op_data_size);
}
void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, __u32 flags,
if (exp_connect_cancelset(exp) && req)
ldlm_cli_cancel_list(&cancels, count, req, 0);
- mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
+ if (op_data->op_cli_flags & CLI_MIGRATE)
+ mdc_migrate_pack(req, op_data, old, oldlen);
+ else
+ mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
obd->u.cli.cl_default_mds_easize);
#include <obd_support.h>
#include <lustre_mds.h>
#include <lustre_fid.h>
+#include <lustre_lmv.h>
#include "mdd_internal.h"
static int mdd_object_initialize(const struct lu_env *env,
const struct lu_fid *pfid,
struct mdd_object *child,
- struct lu_attr *attr, struct thandle *handle,
- const struct md_op_spec *spec)
+ struct lu_attr *attr,
+ struct thandle *handle)
{
int rc = 0;
ENTRY;
GOTO(out, rc);
#ifdef CONFIG_FS_POSIX_ACL
- if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
+ if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
/* if dir, then can inherit default ACl */
rc = mdo_declare_xattr_set(env, c, def_acl_buf,
XATTR_NAME_ACL_DEFAULT,
GOTO(out, rc);
}
- if (acl_buf->lb_len > 0) {
+ if (acl_buf && acl_buf->lb_len > 0) {
rc = mdo_declare_attr_set(env, c, attr, handle);
if (rc)
GOTO(out, rc);
(spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
spec->u.sp_ea.eadatalen);
- rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
- handle);
+ rc = mdo_declare_xattr_set(env, c, buf,
+ S_ISDIR(attr->la_mode) ?
+ XATTR_NAME_LMV : XATTR_NAME_LOV,
+ 0, handle);
if (rc)
GOTO(out, rc);
}
* created in declare phase, they also needs to be added to master
* object as sub-directory entry. So it has to initialize the master
* object, then set dir striped EA.(in mdo_xattr_set) */
- rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
- spec);
+ rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle);
if (rc != 0)
GOTO(err_destroy, rc);
spec->u.sp_ea.eadatalen);
rc = mdo_xattr_set(env, son, buf,
S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
- XATTR_NAME_LOV, 0,
- handle);
+ XATTR_NAME_LOV,
+ 0, handle);
if (rc != 0)
GOTO(err_destroy, rc);
}
if (rc < 0)
GOTO(out_stop, rc);
+ if (S_ISDIR(attr->la_mode)) {
+ struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
+
+ /*
+ * migrate may create 1-stripe directory, so lod_ah_init()
+ * doesn't adjust stripe count from lmu.
+ */
+ if (lmu && lmu->lum_stripe_count == cpu_to_le32(1)) {
+ info->mti_lmu = *lmu;
+ info->mti_lmu.lum_stripe_count = 0;
+ spec->u.sp_ea.eadata = &info->mti_lmu;
+ }
+ }
+
mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
memset(ldata, 0, sizeof(*ldata));
}
/**
- * During migration once the parent FID has been changed,
- * we need update the parent FID in linkea.
+ * Check whether we should migrate the file/dir
+ * return val
+ * < 0 permission check failed or other error.
+ * = 0 the file can be migrated.
**/
-static int mdd_linkea_update_child_internal(const struct lu_env *env,
- struct mdd_object *parent,
- struct mdd_object *newparent,
- struct mdd_object *child,
- const char *name, int namelen,
- struct thandle *handle,
- bool declare)
+static int mdd_migrate_sanity_check(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_attr *spattr,
+ const struct lu_attr *tpattr,
+ const struct lu_attr *attr)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct linkea_data ldata = { NULL };
- struct lu_buf *buf = &info->mti_link_buf;
- int count;
- int rc = 0;
+ int rc;
ENTRY;
- buf = lu_buf_check_and_alloc(buf, PATH_MAX);
- if (buf->lb_buf == NULL)
- RETURN(-ENOMEM);
-
- ldata.ld_buf = buf;
- rc = mdd_links_read(env, child, &ldata);
- if (rc != 0) {
- if (rc == -ENOENT || rc == -ENODATA)
- rc = 0;
- RETURN(rc);
+ if (!mdd_object_remote(sobj)) {
+ mdd_read_lock(env, sobj, MOR_SRC_CHILD);
+ if (sobj->mod_count > 0) {
+ CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
+ mdd2obd_dev(mdd)->obd_name, PFID(mdo2fid(sobj)),
+ sobj->mod_count);
+ mdd_read_unlock(env, sobj);
+ RETURN(-EBUSY);
+ }
+ mdd_read_unlock(env, sobj);
}
- LASSERT(ldata.ld_leh != NULL);
- ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
- for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
- struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
- struct lu_name lname;
- struct lu_fid fid;
-
- linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
- &lname, &fid);
-
- if (strncmp(lname.ln_name, name, namelen) != 0 ||
- !lu_fid_eq(&fid, mdd_object_fid(parent))) {
- ldata.ld_lee = (struct link_ea_entry *)
- ((char *)ldata.ld_lee +
- ldata.ld_reclen);
- continue;
- }
+ if (mdd_object_exists(tobj))
+ RETURN(-EEXIST);
- CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
- mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
- lname.ln_namelen, lname.ln_name,
- PFID(mdd_object_fid(newparent)));
- /* update to the new parent fid */
- linkea_entry_pack(ldata.ld_lee, &lname,
- mdd_object_fid(newparent));
- if (declare)
- rc = mdd_declare_links_add(env, child, handle, &ldata);
- else
- rc = mdd_links_write(env, child, &ldata, handle);
- break;
- }
+ rc = mdd_rename_sanity_check(env, spobj, spattr, tpobj, tpattr, sobj,
+ attr, NULL, NULL);
RETURN(rc);
}
-static int mdd_linkea_declare_update_child(const struct lu_env *env,
- struct mdd_object *parent,
- struct mdd_object *newparent,
- struct mdd_object *child,
- const char *name, int namelen,
- struct thandle *handle)
+typedef int (*mdd_dir_stripe_cb)(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ int index,
+ struct thandle *handle);
+
+static int mdd_dir_declare_delete_stripe(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ int index,
+ struct thandle *handle)
{
- return mdd_linkea_update_child_internal(env, parent, newparent,
- child, name,
- namelen, handle, true);
-}
+ struct mdd_thread_info *info = mdd_env_info(env);
+ char *stripe_name = info->mti_name;
+ struct lmv_user_md *lmu = lmu_buf->lb_buf;
+ int rc;
-static int mdd_linkea_update_child(const struct lu_env *env,
- struct mdd_object *parent,
- struct mdd_object *newparent,
- struct mdd_object *child,
- const char *name, int namelen,
- struct thandle *handle)
-{
- return mdd_linkea_update_child_internal(env, parent, newparent,
- child, name,
- namelen, handle, false);
+ if (index < le32_to_cpu(lmu->lum_stripe_count))
+ return 0;
+
+ rc = mdo_declare_index_delete(env, stripe, dotdot, handle);
+ if (rc)
+ return rc;
+
+ snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
+ PFID(mdd_object_fid(stripe)), index);
+
+ rc = mdo_declare_index_delete(env, obj, stripe_name, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_ref_del(env, obj, handle);
+
+ return rc;
}
-static int mdd_update_linkea_internal(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *child_name,
- struct linkea_data *ldata,
- struct thandle *handle,
- int declare)
+/* delete stripe from its master object namespace */
+static int mdd_dir_delete_stripe(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_object *stripe,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ int index,
+ struct thandle *handle)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- int count;
- int rc = 0;
- ENTRY;
-
- LASSERT(ldata->ld_buf != NULL);
- LASSERT(ldata->ld_leh != NULL);
+ struct mdd_thread_info *info = mdd_env_info(env);
+ char *stripe_name = info->mti_name;
+ struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+ struct lmv_user_md *lmu = lmu_buf->lb_buf;
+ __u32 del_offset = le32_to_cpu(lmu->lum_stripe_count);
+ int rc;
- /* If it is mulitple links file, we need update the name entry for
- * all parent */
- ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
- for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct mdd_object *pobj;
- struct lu_name lname;
- struct lu_fid fid;
-
- linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
- &lname, &fid);
- pobj = mdd_object_find(env, mdd, &fid);
- if (IS_ERR(pobj)) {
- CWARN("%s: cannot find obj "DFID": rc = %ld\n",
- mdd2obd_dev(mdd)->obd_name, PFID(&fid),
- PTR_ERR(pobj));
- continue;
- }
+ ENTRY;
- if (!mdd_object_exists(pobj)) {
- CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
- mdd2obd_dev(mdd)->obd_name, PFID(&fid));
- goto next_put;
- }
+ /* local dir will delete via LOD */
+ LASSERT(mdd_object_remote(obj));
+ LASSERT(del_offset < le32_to_cpu(lmv->lmv_stripe_count));
- if (pobj == mdd_pobj &&
- lname.ln_namelen == child_name->ln_namelen &&
- strncmp(lname.ln_name, child_name->ln_name,
- lname.ln_namelen) == 0) {
- CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
- mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
- PFID(&fid));
- goto next_put;
- }
+ if (index < del_offset)
+ RETURN(0);
- CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
- mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
- PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
+ mdd_write_lock(env, stripe, MOR_SRC_CHILD);
+ rc = __mdd_index_delete_only(env, stripe, dotdot, handle);
+ if (rc)
+ GOTO(out, rc);
- if (declare) {
- /* Remove source name from source directory */
- /* Insert new fid with target name into target dir */
- rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
- handle);
- if (rc != 0)
- GOTO(next_put, rc);
+ snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
+ PFID(mdd_object_fid(stripe)), index);
- rc = mdo_declare_index_insert(env, pobj,
- mdd_object_fid(mdd_tobj),
- mdd_object_type(mdd_tobj),
- lname.ln_name, handle);
- if (rc != 0)
- GOTO(next_put, rc);
+ rc = __mdd_index_delete_only(env, obj, stripe_name, handle);
+ if (rc)
+ GOTO(out, rc);
- rc = mdo_declare_ref_add(env, mdd_tobj, handle);
- if (rc)
- GOTO(next_put, rc);
+ rc = mdo_ref_del(env, obj, handle);
+ GOTO(out, rc);
+out:
+ mdd_write_unlock(env, stripe);
- rc = mdo_declare_ref_del(env, mdd_sobj, handle);
- if (rc)
- GOTO(next_put, rc);
- } else {
- char *tmp_name = info->mti_key;
-
- if (lname.ln_namelen >= sizeof(info->mti_key)) {
- /* lnamelen is too big(> NAME_MAX + 16),
- * something wrong about this linkea, let's
- * skip it */
- CWARN("%s: the name %.*s is too long under "
- DFID"\n", mdd2obd_dev(mdd)->obd_name,
- lname.ln_namelen, lname.ln_name,
- PFID(&fid));
- goto next_put;
- }
+ return rc;
+}
- /* Note: lname might be without \0 at the end, see
- * linkea_entry_unpack(), let's add extra \0 by
- * snprintf */
- snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
- lname.ln_namelen, lname.ln_name);
- lname.ln_name = tmp_name;
-
- /* Let's check if this linkEA still valid, before
- * it might be packed into the RPC buffer. */
- rc = mdd_lookup(env, &pobj->mod_obj, &lname,
- &info->mti_fid, NULL);
- if (rc < 0 || !lu_fid_eq(&info->mti_fid,
- mdd_object_fid(mdd_sobj)))
- GOTO(next_put, rc == -ENOENT ? 0 : rc);
-
- rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
- if (rc != 0)
- GOTO(next_put, rc);
+/*
+ * iterate stripes of striped directory on remote MDT, local striped directory
+ * is accessed via LOD.
+ */
+static int mdd_dir_iterate_stripes(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct thandle *handle,
+ mdd_dir_stripe_cb cb)
+{
+ struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+ struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+ struct mdd_object *stripe;
+ int i;
+ int rc;
- rc = __mdd_index_insert(env, pobj,
- mdd_object_fid(mdd_tobj),
- mdd_object_type(mdd_tobj),
- tmp_name, handle);
- if (rc != 0)
- GOTO(next_put, rc);
+ ENTRY;
- mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
- rc = mdo_ref_add(env, mdd_tobj, handle);
- mdd_write_unlock(env, mdd_tobj);
- if (rc)
- GOTO(next_put, rc);
+ LASSERT(lmv);
- mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
- mdo_ref_del(env, mdd_sobj, handle);
- mdd_write_unlock(env, mdd_sobj);
- }
-next_put:
- mdd_object_put(env, pobj);
- if (rc != 0)
- break;
+ for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
+ stripe = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(stripe))
+ RETURN(PTR_ERR(stripe));
- ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
- ldata->ld_reclen);
+ rc = cb(env, obj, stripe, lmv_buf, lmu_buf, i, handle);
+ mdd_object_put(env, stripe);
+ if (rc)
+ RETURN(rc);
}
- RETURN(rc);
+ RETURN(0);
}
-static int mdd_migrate_xattrs(const struct lu_env *env,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj)
+typedef int (*mdd_xattr_cb)(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *buf,
+ const char *name,
+ int fl, struct thandle *handle);
+
+/* iterate xattrs, but ignore LMA, LMV, and LINKEA if 'skip_linkea' is set. */
+static int mdd_iterate_xattrs(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ bool skip_linkea,
+ struct thandle *handle,
+ mdd_xattr_cb cb)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- char *xname;
- struct thandle *handle;
- struct lu_buf xbuf;
- int xlen;
- int rem;
- int xsize;
- int list_xsize;
- struct lu_buf list_xbuf;
- int rc;
+ struct mdd_thread_info *info = mdd_env_info(env);
+ char *xname;
+ struct lu_buf list_xbuf;
+ struct lu_buf xbuf = { NULL };
+ int list_xsize;
+ int xlen;
+ int rem;
+ int xsize;
+ int rc;
+
+ ENTRY;
/* retrieve xattr list from the old object */
- list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
+ list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
if (list_xsize == -ENODATA)
- return 0;
+ RETURN(0);
if (list_xsize < 0)
- return list_xsize;
+ RETURN(list_xsize);
lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
if (info->mti_big_buf.lb_buf == NULL)
- return -ENOMEM;
+ RETURN(-ENOMEM);
list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
list_xbuf.lb_len = list_xsize;
- rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
+ rc = mdo_xattr_list(env, sobj, &list_xbuf);
if (rc < 0)
- return rc;
+ RETURN(rc);
+
+ rem = rc;
rc = 0;
- rem = list_xsize;
xname = list_xbuf.lb_buf;
while (rem > 0) {
xlen = strnlen(xname, rem - 1) + 1;
strcmp(XATTR_NAME_LMV, xname) == 0)
goto next;
- /* For directory, if there are default layout, migrate here */
- if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
- !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
+ if (skip_linkea &&
+ strcmp(XATTR_NAME_LINK, xname) == 0)
goto next;
- xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
+ xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
if (xsize == -ENODATA)
goto next;
if (xsize < 0)
- GOTO(out, rc);
+ GOTO(out, rc = xsize);
- lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
- if (info->mti_link_buf.lb_buf == NULL)
+ lu_buf_check_and_alloc(&xbuf, xsize);
+ if (xbuf.lb_buf == NULL)
GOTO(out, rc = -ENOMEM);
- xbuf.lb_len = xsize;
- xbuf.lb_buf = info->mti_link_buf.lb_buf;
- rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
+ rc = mdo_xattr_get(env, sobj, &xbuf, xname);
if (rc == -ENODATA)
goto next;
if (rc < 0)
GOTO(out, rc);
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- GOTO(out, rc = PTR_ERR(handle));
-
- rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
- handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
- /* Note: this transaction is part of migration, and it is not
- * the last step of migration, so we set th_local = 1 to avoid
- * update last rcvd for this transaction */
- handle->th_local = 1;
- rc = mdd_trans_start(env, mdd, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
-
-again:
- rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
- if (rc == -EEXIST)
- GOTO(stop_trans, rc = 0);
-
+repeat:
+ rc = cb(env, tobj, &xbuf, xname, 0, handle);
if (unlikely(rc == -ENOSPC &&
strcmp(xname, XATTR_NAME_LINK) == 0)) {
rc = linkea_overflow_shrink(
(struct linkea_data *)(xbuf.lb_buf));
if (likely(rc > 0)) {
xbuf.lb_len = rc;
- goto again;
+ goto repeat;
}
}
- if (rc != 0)
- GOTO(stop_trans, rc);
-stop_trans:
- rc = mdd_trans_stop(env, mdd, rc, handle);
- if (rc != 0)
+ if (rc)
GOTO(out, rc);
next:
+ xname += xlen;
rem -= xlen;
- memmove(xname, xname + xlen, rem);
}
+
out:
- return rc;
+ lu_buf_free(&xbuf);
+ RETURN(rc);
}
-static int mdd_declare_migrate_create(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- struct md_op_spec *spec,
- struct lu_attr *la,
- union lmv_mds_md *mgr_ea,
- struct linkea_data *ldata,
- struct thandle *handle)
+typedef int (*mdd_linkea_cb)(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *sname,
+ const struct lu_fid *sfid,
+ const struct lu_name *lname,
+ const struct lu_fid *fid,
+ void *opaque,
+ struct thandle *handle);
+
+static int mdd_declare_update_link(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *tname,
+ const struct lu_fid *tpfid,
+ const struct lu_name *lname,
+ const struct lu_fid *fid,
+ void *unused,
+ struct thandle *handle)
{
- struct lu_attr *la_flag = MDD_ENV_VAR(env, la_for_fix);
- const struct lu_buf *buf;
- int rc;
- int mgr_easize;
-
- rc = mdd_declare_create_object_internal(env, mdd_pobj, mdd_tobj, la,
- handle, spec, NULL);
- if (rc != 0)
- return rc;
-
- rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
- handle);
- if (rc != 0)
- return rc;
+ struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+ struct mdd_object *pobj;
+ int rc;
- if (S_ISLNK(la->la_mode)) {
- const char *target_name = spec->u.sp_symname;
- int sym_len = strlen(target_name);
- const struct lu_buf *buf;
+ /* ignore tobj */
+ if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+ !strcmp(tname->ln_name, lname->ln_name))
+ return 0;
- buf = mdd_buf_get_const(env, target_name, sym_len);
- rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
- buf, 0, handle);
- if (rc != 0)
- return rc;
- } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
- rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
- if (rc != 0)
- return rc;
- }
+ pobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(pobj))
+ return PTR_ERR(pobj);
- if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
- buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
- spec->u.sp_ea.eadatalen);
- rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
- 0, handle);
- if (rc)
- return rc;
- }
- mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
- buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
- rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
- 0, handle);
+ rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+ if (!rc)
+ rc = mdo_declare_index_insert(env, pobj, mdo2fid(tobj),
+ mdd_object_type(sobj),
+ lname->ln_name, handle);
+ mdd_object_put(env, pobj);
if (rc)
return rc;
- la_flag->la_valid = LA_FLAGS;
- la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
- rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
+ rc = mdo_declare_ref_add(env, tobj, handle);
+ if (rc)
+ return rc;
+ rc = mdo_declare_ref_del(env, sobj, handle);
return rc;
}
-static int mdd_migrate_create(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *lname,
- struct lu_attr *la)
+static int mdd_update_link(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *tname,
+ const struct lu_fid *tpfid,
+ const struct lu_name *lname,
+ const struct lu_fid *fid,
+ void *unused,
+ struct thandle *handle)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct md_op_spec *spec = &info->mti_spec;
- struct lu_buf lmm_buf = { NULL };
- struct lu_buf link_buf = { NULL };
- struct lu_buf mgr_buf;
- struct thandle *handle;
- struct lmv_mds_md_v1 *mgr_ea;
- struct lu_attr *la_flag = MDD_ENV_VAR(env, la_for_fix);
- struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
- int mgr_easize;
- struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
- int rc;
- ENTRY;
-
- /* prepare spec for create */
- memset(spec, 0, sizeof(*spec));
- spec->sp_cr_lookup = 0;
- spec->sp_feat = &dt_directory_features;
- if (S_ISLNK(la->la_mode)) {
- const struct lu_buf *buf;
+ struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+ struct mdd_object *pobj;
+ int rc;
- buf = lu_buf_check_and_alloc(
- &mdd_env_info(env)->mti_big_buf,
- la->la_size + 1);
- link_buf = *buf;
- link_buf.lb_len = la->la_size + 1;
- memset(link_buf.lb_buf, 0, link_buf.lb_len);
- rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
- if (rc <= 0) {
- rc = rc != 0 ? rc : -EFAULT;
- CERROR("%s: "DFID" readlink failed: rc = %d\n",
- mdd2obd_dev(mdd)->obd_name,
- PFID(mdd_object_fid(mdd_sobj)), rc);
- RETURN(rc);
- }
- spec->u.sp_symname = link_buf.lb_buf;
- } else if (S_ISREG(la->la_mode)) {
- /* retrieve lov of the old object */
- rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
- if (rc != 0 && rc != -ENODATA)
- RETURN(rc);
- if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
- spec->u.sp_ea.eadata = lmm_buf.lb_buf;
- spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
- spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
- }
- } else if (S_ISDIR(la->la_mode)) {
- rc = mdd_links_read_with_rec(env, mdd_sobj, ldata);
- if (rc == -ENODATA) {
- /* ignore the non-linkEA error */
- ldata = NULL;
- rc = 0;
- }
- if (rc < 0)
- RETURN(rc);
- }
+ ENTRY;
- mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
- lu_buf_check_and_alloc(&info->mti_xattr_buf, mgr_easize);
- mgr_buf.lb_buf = info->mti_xattr_buf.lb_buf;
- mgr_buf.lb_len = mgr_easize;
- mgr_ea = mgr_buf.lb_buf;
- memset(mgr_ea, 0, sizeof(*mgr_ea));
- mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
- mgr_ea->lmv_stripe_count = cpu_to_le32(2);
- mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
- mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
- fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
- fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
-
- mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
+ LASSERT(lu_name_is_valid(lname));
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- GOTO(out_free, rc = PTR_ERR(handle));
+ /* ignore tobj */
+ if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+ !strncmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
+ RETURN(0);
- /* Note: this transaction is part of migration, and it is not
- * the last step of migration, so we set th_local = 1 to avoid
- * update last rcvd for this transaction */
- handle->th_local = 1;
- rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj, spec,
- la, mgr_buf.lb_buf, ldata, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
+ PFID(fid), PNAME(lname), PFID(mdo2fid(tobj)));
- rc = mdd_trans_start(env, mdd, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ pobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(pobj)) {
+ CWARN("%s: cannot find obj "DFID": %ld\n",
+ mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
+ RETURN(PTR_ERR(pobj));
+ }
- /* don't set nlink from the original object */
- la->la_valid &= ~LA_NLINK;
+ if (!mdd_object_exists(pobj)) {
+ CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
+ mdd_object_put(env, pobj);
+ RETURN(-ENOENT);
+ }
- /* create the target object */
- rc = mdd_create_object(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
- hint, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+ rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
+ if (!rc)
+ rc = __mdd_index_insert_only(env, pobj, mdo2fid(tobj),
+ mdd_object_type(sobj),
+ lname->ln_name, handle);
+ mdd_write_unlock(env, pobj);
+ mdd_object_put(env, pobj);
+ if (rc)
+ RETURN(rc);
- if (S_ISDIR(la->la_mode) && ldata != NULL) {
- rc = mdd_links_write(env, mdd_tobj, ldata, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
- }
+ mdd_write_lock(env, tobj, MOR_TGT_CHILD);
+ rc = mdo_ref_add(env, tobj, handle);
+ mdd_write_unlock(env, tobj);
+ if (rc)
+ RETURN(rc);
- /* Set MIGRATE EA on the source inode, so once the migration needs
- * to be re-done during failover, the re-do process can locate the
- * target object which is already being created. */
- rc = mdo_xattr_set(env, mdd_sobj, &mgr_buf, XATTR_NAME_LMV, 0, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ mdd_write_lock(env, sobj, MOR_SRC_CHILD);
+ rc = mdo_ref_del(env, sobj, handle);
+ mdd_write_unlock(env, sobj);
- /* Set immutable flag, so any modification is disabled until
- * the migration is done. Once the migration is interrupted,
- * if the resume process find the migrating object has both
- * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
- * flag and approve the migration */
- la_flag->la_valid = LA_FLAGS;
- la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
- rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
-stop_trans:
- if (handle != NULL)
- rc = mdd_trans_stop(env, mdd, rc, handle);
-out_free:
- if (lmm_buf.lb_buf != NULL)
- OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
RETURN(rc);
}
-static int mdd_migrate_entries(const struct lu_env *env,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj)
+static inline int mdd_fld_lookup(const struct lu_env *env,
+ struct mdd_device *mdd,
+ const struct lu_fid *fid,
+ __u32 *mdt_index)
{
- struct dt_object *next = mdd_object_child(mdd_sobj);
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct dt_object *dt_tobj = mdd_object_child(mdd_tobj);
- struct thandle *handle;
- struct dt_it *it;
- const struct dt_it_ops *iops;
- int result;
- struct lu_dirent *ent;
- int rc;
- ENTRY;
+ struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
+ struct seq_server_site *ss;
+ int rc;
- OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
- if (ent == NULL)
- RETURN(-ENOMEM);
+ ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
- if (!dt_try_as_dir(env, next))
- GOTO(out_ent, rc = -ENOTDIR);
- /*
- * iterate directories
- */
- iops = &next->do_index_ops->dio_it;
- it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
- if (IS_ERR(it))
- GOTO(out_ent, rc = PTR_ERR(it));
+ range->lsr_flags = LU_SEQ_RANGE_MDT;
+ rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
+ if (rc)
+ return rc;
- rc = iops->load(env, it, 0);
- if (rc == 0)
- rc = iops->next(env, it);
- else if (rc > 0)
- rc = 0;
- /*
- * At this point and across for-loop:
- *
- * rc == 0 -> ok, proceed.
- * rc > 0 -> end of directory.
- * rc < 0 -> error.
- */
- do {
- struct mdd_object *child;
- char *name = mdd_env_info(env)->mti_key;
- int len;
- int is_dir;
- bool target_exist = false;
-
- len = iops->key_size(env, it);
- if (len == 0)
- goto next;
+ *mdt_index = range->lsr_index;
- result = iops->rec(env, it, (struct dt_rec *)ent,
- LUDA_FID | LUDA_TYPE);
- if (result == -ESTALE)
- goto next;
- if (result != 0) {
- rc = result;
- goto out;
- }
+ return 0;
+}
- fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
+static int mdd_is_link_on_source_mdt(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *tname,
+ const struct lu_fid *tpfid,
+ const struct lu_name *lname,
+ const struct lu_fid *fid,
+ void *opaque,
+ struct thandle *handle)
+{
+ struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+ __u32 source_mdt_index = *(__u32 *)opaque;
+ __u32 link_mdt_index;
+ int rc;
- /* Insert new fid with target name into target dir */
- if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
- (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
- ent->lde_name[1] == '.'))
- goto next;
+ ENTRY;
- child = mdd_object_find(env, mdd, &ent->lde_fid);
- if (IS_ERR(child))
- GOTO(out, rc = PTR_ERR(child));
+ /* ignore tobj */
+ if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+ !strcmp(tname->ln_name, lname->ln_name))
+ return 0;
- /* child may not exist, but lu_object_attr will assert this,
- * get type from loh_attr directly */
- is_dir = S_ISDIR(child->mod_obj.mo_lu.lo_header->loh_attr);
-
- mdd_write_lock(env, child, MOR_SRC_CHILD);
-
- snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
-
- /* Check whether the name has been inserted to the target */
- if (dt_try_as_dir(env, dt_tobj)) {
- struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+ rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
+ if (rc)
+ RETURN(rc);
- rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
- (struct dt_key *)name);
- if (unlikely(rc == 0))
- target_exist = true;
- }
+ RETURN(link_mdt_index == source_mdt_index);
+}
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- GOTO(out_put, rc = PTR_ERR(handle));
-
- /* Note: this transaction is part of migration, and it is not
- * the last step of migration, so we set th_local = 1 to avoid
- * updating last rcvd for this transaction */
- handle->th_local = 1;
- if (likely(!target_exist)) {
- rc = mdo_declare_index_insert(env, mdd_tobj,
- &ent->lde_fid,
- child->mod_obj.mo_lu.lo_header->loh_attr,
- name, handle);
- if (rc != 0)
- GOTO(out_put, rc);
+static int mdd_iterate_linkea(const struct lu_env *env,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *tname,
+ const struct lu_fid *tpfid,
+ struct linkea_data *ldata,
+ void *opaque,
+ struct thandle *handle,
+ mdd_linkea_cb cb)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ char *filename = info->mti_name;
+ struct lu_name lname;
+ struct lu_fid fid;
+ int rc = 0;
- if (is_dir) {
- rc = mdo_declare_ref_add(env, mdd_tobj, handle);
- if (rc != 0)
- GOTO(out_put, rc);
- }
- }
+ if (!ldata->ld_buf)
+ return 0;
- rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
+ linkea_next_entry(ldata)) {
+ linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
+ &fid);
- if (is_dir) {
- rc = mdo_declare_ref_del(env, mdd_sobj, handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ /* Note: lname might miss \0 at the end */
+ snprintf(filename, sizeof(info->mti_name), "%.*s",
+ lname.ln_namelen, lname.ln_name);
+ lname.ln_name = filename;
- /* Update .. for child */
- rc = mdo_declare_index_delete(env, child, dotdot,
- handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
- rc = mdo_declare_index_insert(env, child,
- mdd_object_fid(mdd_tobj),
- S_IFDIR, dotdot, handle);
- if (rc != 0)
- GOTO(out_put, rc);
- }
+ rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
+ handle);
+ }
- rc = mdd_linkea_declare_update_child(env, mdd_sobj,mdd_tobj,
- child, name,
- strlen(name),
- handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ return rc;
+}
- rc = mdd_trans_start(env, mdd, handle);
- if (rc != 0) {
- CERROR("%s: transaction start failed: rc = %d\n",
- mdd2obd_dev(mdd)->obd_name, rc);
- GOTO(out_put, rc);
- }
+/**
+ * Prepare linkea, and check whether file needs migrate: if source still has
+ * link on source MDT, no need to migrate, just update namespace on source and
+ * target parents.
+ *
+ * \retval 0 do migrate
+ * \retval 1 don't migrate
+ * \retval -errno on failure
+ */
+static int migrate_linkea_prepare(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ const struct lu_name *lname,
+ const struct lu_attr *attr,
+ struct linkea_data *ldata)
+{
+ __u32 source_mdt_index;
+ int rc;
- if (likely(!target_exist)) {
- rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
- child->mod_obj.mo_lu.lo_header->loh_attr, name,
- handle);
- if (rc != 0)
- GOTO(out_put, rc);
- }
+ ENTRY;
- rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ memset(ldata, 0, sizeof(*ldata));
+ rc = mdd_linkea_prepare(env, sobj, mdo2fid(spobj), lname,
+ mdo2fid(tpobj), lname, 1, 0, ldata);
+ if (rc)
+ RETURN(rc);
- if (is_dir) {
- rc = __mdd_index_delete_only(env, child, dotdot,
- handle);
- if (rc != 0)
- GOTO(out_put, rc);
+ /*
+ * Then it will check if the file should be migrated. If the file has
+ * mulitple links, we only need migrate the file if all of its entries
+ * has been migrated to the remote MDT.
+ */
+ if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
+ RETURN(0);
- rc = __mdd_index_insert_only(env, child,
- mdd_object_fid(mdd_tobj), S_IFDIR,
- dotdot, handle);
- if (rc != 0)
- GOTO(out_put, rc);
- }
+ /* If there are still links locally, don't migrate this file */
+ LASSERT(ldata->ld_leh != NULL);
- rc = mdd_linkea_update_child(env, mdd_sobj, mdd_tobj,
- child, name,
- strlen(name), handle);
+ /*
+ * If linkEA is overflow, it means there are some unknown name entries
+ * under unknown parents, which will prevent the migration.
+ */
+ if (unlikely(ldata->ld_leh->leh_overflow_time))
+ RETURN(-EOVERFLOW);
-out_put:
- mdd_write_unlock(env, child);
- mdd_object_put(env, child);
- rc = mdd_trans_stop(env, mdd, rc, handle);
- if (rc != 0)
- GOTO(out, rc);
-next:
- result = iops->next(env, it);
- if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
- GOTO(out, rc = -EINTR);
+ rc = mdd_fld_lookup(env, mdd, mdo2fid(sobj), &source_mdt_index);
+ if (rc)
+ RETURN(rc);
- if (result == -ESTALE)
- goto next;
- } while (result == 0);
-out:
- iops->put(env, it);
- iops->fini(env, it);
-out_ent:
- OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
+ rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdo2fid(tpobj), ldata,
+ &source_mdt_index, NULL,
+ mdd_is_link_on_source_mdt);
RETURN(rc);
}
-static int mdd_declare_update_linkea(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *child_name,
- struct linkea_data *ldata,
- struct thandle *handle)
+static int mdd_dir_declare_layout_delete(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct thandle *handle)
{
- return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
- child_name, ldata, handle, 1);
+ int rc;
+
+ if (!lmv_buf->lb_buf)
+ rc = mdo_declare_index_delete(env, obj, dotdot, handle);
+ else if (mdd_object_remote(obj))
+ rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+ mdd_dir_declare_delete_stripe);
+ else
+ rc = mdo_declare_xattr_set(env, obj, lmu_buf,
+ XATTR_NAME_LMV".del", 0, handle);
+
+ return rc;
}
-static int mdd_update_linkea(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *child_name,
- struct linkea_data *ldata,
- struct thandle *handle)
+static int mdd_dir_layout_delete(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *lmv_buf,
+ const struct lu_buf *lmu_buf,
+ struct thandle *handle)
{
- return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
- child_name, ldata, handle, 0);
+ int rc;
+
+ ENTRY;
+
+ mdd_write_lock(env, obj, MOR_SRC_PARENT);
+ if (!lmv_buf->lb_buf)
+ /* normal dir */
+ rc = __mdd_index_delete_only(env, obj, dotdot, handle);
+ else if (mdd_object_remote(obj))
+ /* striped, but remote */
+ rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+ mdd_dir_delete_stripe);
+ else
+ rc = mdo_xattr_set(env, obj, lmu_buf, XATTR_NAME_LMV".del", 0,
+ handle);
+ mdd_write_unlock(env, obj);
+
+ RETURN(rc);
}
-static int mdd_declare_migrate_update_name(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *lname,
- struct lu_attr *la,
- struct lu_attr *parent_la,
- struct linkea_data *ldata,
- struct thandle *handle)
+static int mdd_declare_migrate_create(const struct lu_env *env,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ struct lu_buf *sbuf,
+ struct linkea_data *ldata,
+ struct md_op_spec *spec,
+ struct dt_allocation_hint *hint,
+ struct thandle *handle)
{
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
int rc;
- /* Revert IMMUTABLE flag */
- la_flag->la_valid = LA_FLAGS;
- la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
- rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
- if (rc != 0)
- return rc;
-
- /* delete entry from source dir */
- rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
- if (rc != 0)
- return rc;
+ if (S_ISDIR(attr->la_mode)) {
+ struct lu_buf lmu_buf = { NULL };
- if (ldata->ld_buf != NULL) {
- rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
- mdd_tobj, lname, ldata, handle);
- if (rc != 0)
- return rc;
- }
+ if (lmv) {
+ struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
- if (S_ISREG(mdd_object_type(mdd_sobj))) {
- rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
- handle);
- if (rc != 0)
- return rc;
+ lmu->lum_stripe_count = 0;
+ lmu_buf.lb_buf = lmu;
+ lmu_buf.lb_len = sizeof(*lmu);
+ }
- handle->th_complex = 1;
- rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
- XATTR_NAME_FID,
- LU_XATTR_REPLACE, handle);
- if (rc < 0)
+ rc = mdd_dir_declare_layout_delete(env, sobj, sbuf, &lmu_buf,
+ handle);
+ if (rc)
return rc;
- }
- if (S_ISDIR(mdd_object_type(mdd_sobj))) {
- rc = mdo_declare_ref_del(env, mdd_pobj, handle);
- if (rc != 0)
- return rc;
+ if (lmv) {
+ rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+ handle);
+ if (rc)
+ return rc;
+ }
}
- /* new name */
- rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
- mdd_object_type(mdd_tobj),
- lname->ln_name, handle);
- if (rc != 0)
- return rc;
-
- rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL);
- if (rc != 0)
+ rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
+ lname, attr, handle, spec, ldata, NULL, NULL,
+ hint);
+ if (rc)
return rc;
- if (S_ISDIR(mdd_object_type(mdd_sobj))) {
- rc = mdo_declare_ref_add(env, mdd_pobj, handle);
- if (rc != 0)
+ if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
+ if (!lmv) {
+ /*
+ * if sobj is not striped, fake a 1-stripe LMV, which
+ * will be used to generate a compound LMV for tobj.
+ */
+ LASSERT(sizeof(info->mti_key) >
+ lmv_mds_md_size(1, LMV_MAGIC_V1));
+ lmv = (typeof(lmv))info->mti_key;
+ memset(lmv, 0, sizeof(*lmv));
+ lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+ lmv->lmv_stripe_count = cpu_to_le32(1);
+ fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdo2fid(sobj));
+ sbuf->lb_buf = lmv;
+ sbuf->lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+
+ rc = mdo_declare_xattr_set(env, tobj, sbuf,
+ XATTR_NAME_LMV".add", 0,
+ handle);
+ sbuf->lb_buf = NULL;
+ sbuf->lb_len = 0;
+ } else {
+ rc = mdo_declare_xattr_set(env, tobj, sbuf,
+ XATTR_NAME_LMV".add", 0,
+ handle);
+ }
+ if (rc)
return rc;
}
- /* delete old object */
- rc = mdo_declare_ref_del(env, mdd_sobj, handle);
- if (rc != 0)
+ /*
+ * tobj mode will be used in lod_declare_xattr_set(), but it's not
+ * createb yet, copy from sobj.
+ */
+ tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
+ tobj->mod_obj.mo_lu.lo_header->loh_attr |=
+ sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
+
+ rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
+ mdo_declare_xattr_set);
+ if (rc)
return rc;
- if (S_ISDIR(mdd_object_type(mdd_sobj))) {
- /* delete old object */
- rc = mdo_declare_ref_del(env, mdd_sobj, handle);
- if (rc != 0)
+ if (S_ISREG(attr->la_mode)) {
+ handle->th_complex = 1;
+
+ rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+ if (rc)
return rc;
- /* set nlink to 0 */
- rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
- if (rc != 0)
+
+ /*
+ * target is not initalized because its LOV is copied from
+ * source in mdd_migrate_create(), declare via sobj.
+ */
+ rc = mdo_declare_xattr_set(env, sobj, NULL, XATTR_NAME_FID, 0,
+ handle);
+ if (rc)
return rc;
}
- rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
- if (rc)
- return rc;
-
- rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
- if (rc != 0)
- return rc;
+ if (!S_ISDIR(attr->la_mode)) {
+ rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
+ ldata, NULL, handle,
+ mdd_declare_update_link);
+ if (rc)
+ return rc;
- rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
- handle);
+ if (lmv) {
+ rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+ handle);
+ if (rc)
+ return rc;
+ }
+ }
return rc;
}
-static int mdd_migrate_update_name(const struct lu_env *env,
- struct mdd_object *mdd_pobj,
- struct mdd_object *mdd_sobj,
- struct mdd_object *mdd_tobj,
- const struct lu_name *lname,
- struct md_attr *ma)
+/**
+ * Create target, migrate xattrs and update links.
+ *
+ * Create target according to \a spec, and then migrate xattrs, if it's
+ * directory, migrate source stripes to target, else update fid to target
+ * for links.
+ *
+ * \param[in] env execution environment
+ * \param[in] tpobj target parent object
+ * \param[in] sobj source object
+ * \param[in] tobj target object
+ * \param[in] lname file name
+ * \param[in] attr source attributes
+ * \param[in] sbuf source LMV buf
+ * \param[in] ldata source linkea
+ * \param[in] spec migrate create spec
+ * \param[in] hint target creation hint
+ * \param[in] handle tranasction handle
+ *
+ * \retval 0 on success
+ * \retval -errno on failure
+ **/
+static int mdd_migrate_create(const struct lu_env *env,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ const struct lu_buf *sbuf,
+ struct linkea_data *ldata,
+ struct md_op_spec *spec,
+ struct dt_allocation_hint *hint,
+ struct thandle *handle)
{
- struct lu_attr *p_la = MDD_ENV_VAR(env, la_for_fix);
- struct lu_attr *so_attr = MDD_ENV_VAR(env, cattr);
- struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
- struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
- struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
- struct thandle *handle;
- int is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
- const char *name = lname->ln_name;
- int rc;
+ int rc;
+
ENTRY;
- /* update time for parent */
- LASSERT(ma->ma_attr.la_valid & LA_CTIME);
- p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
- p_la->la_valid = LA_CTIME;
+ /*
+ * directory will migrate sobj stripes to tobj:
+ * 1. delete stripes from sobj.
+ * 2. add stripes to tobj, see lod_dir_declare_layout_add().
+ * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
+ */
+ if (S_ISDIR(attr->la_mode)) {
+ struct lu_buf lmu_buf = { NULL };
- rc = mdd_la_get(env, mdd_sobj, so_attr);
- if (rc != 0)
- RETURN(rc);
+ if (sbuf->lb_buf) {
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
- ldata->ld_buf = NULL;
- rc = mdd_links_read(env, mdd_sobj, ldata);
- if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
- RETURN(rc);
+ lmu->lum_stripe_count = 0;
+ lmu_buf.lb_buf = lmu;
+ lmu_buf.lb_len = sizeof(*lmu);
+ }
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ rc = mdd_dir_layout_delete(env, sobj, sbuf, &lmu_buf, handle);
+ if (rc)
+ RETURN(rc);
- rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
- lname, so_attr, p_la, ldata,
- handle);
- if (rc != 0) {
- /* If the migration can not be fit in one transaction, just
- * leave it in the original MDT */
- if (rc == -E2BIG)
- GOTO(stop_trans, rc = 0);
- else
- GOTO(stop_trans, rc);
+ /*
+ * delete LMV so that later when destroying sobj it won't delete
+ * stripes again.
+ */
+ if (sbuf->lb_buf) {
+ mdd_write_lock(env, sobj, MOR_SRC_CHILD);
+ rc = mdo_xattr_del(env, sobj, XATTR_NAME_LMV, handle);
+ mdd_write_unlock(env, sobj);
+ if (rc)
+ RETURN(rc);
+ }
}
- CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
- mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
- PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
- PFID(mdd_object_fid(mdd_tobj)));
+ /* don't set nlink from sobj */
+ attr->la_valid &= ~LA_NLINK;
- rc = mdd_trans_start(env, mdd, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
-
- /* Revert IMMUTABLE flag */
- la_flag->la_valid = LA_FLAGS;
- la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
- rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
-
- /* Remove source name from source directory */
- rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, hint,
+ handle);
+ if (rc)
+ RETURN(rc);
- if (ldata->ld_buf != NULL) {
- rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
- lname, ldata, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
+ mdd_write_lock(env, tobj, MOR_TGT_CHILD);
+ rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
+ mdd_write_unlock(env, tobj);
+ if (rc)
+ RETURN(rc);
- /* linkea update might decrease the source object
- * nlink, let's get the attr again after ref_del */
- rc = mdd_la_get(env, mdd_sobj, so_attr);
- if (rc != 0)
- GOTO(stop_trans, rc);
- }
+ if (S_ISREG(attr->la_mode)) {
+ /* delete LOV to avoid deleting OST objs when destroying sobj */
+ mdd_write_lock(env, sobj, MOR_SRC_CHILD);
+ rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+ mdd_write_unlock(env, sobj);
+ if (rc)
+ RETURN(rc);
- if (S_ISREG(so_attr->la_mode)) {
- if (so_attr->la_nlink == 1) {
- rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
- handle);
- if (rc != 0 && rc != -ENODATA)
- GOTO(stop_trans, rc);
-
- rc = mdo_xattr_set(env, mdd_tobj, NULL,
- XATTR_NAME_FID,
- LU_XATTR_REPLACE, handle);
- if (rc < 0)
- GOTO(stop_trans, rc);
- }
+ /* for regular file, update OST objects XATTR_NAME_FID */
+ rc = mdo_xattr_set(env, tobj, NULL, XATTR_NAME_FID, 0, handle);
+ if (rc)
+ RETURN(rc);
}
- /* Insert new fid with target name into target dir */
- rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
- mdd_object_type(mdd_tobj), name, handle);
- if (rc != 0)
- GOTO(stop_trans, rc);
-
- mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+ if (!S_ISDIR(attr->la_mode))
+ rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
+ ldata, NULL, handle, mdd_update_link);
- mdd_sobj->mod_flags |= DEAD_OBJ;
- rc = mdd_mark_orphan_object(env, mdd_sobj, handle, false);
- if (rc != 0)
- GOTO(out_unlock, rc);
+ RETURN(rc);
+}
- rc = mdd_orphan_insert(env, mdd_sobj, handle);
- if (rc != 0)
- GOTO(out_unlock, rc);
+static int mdd_declare_migrate_update(const struct lu_env *env,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ struct lu_attr *spattr,
+ struct lu_attr *tpattr,
+ struct linkea_data *ldata,
+ bool do_create,
+ bool do_destroy,
+ struct md_attr *ma,
+ struct thandle *handle)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
+ struct lu_attr *la = &info->mti_la_for_fix;
+ int rc;
- mdo_ref_del(env, mdd_sobj, handle);
- if (is_dir)
- mdo_ref_del(env, mdd_sobj, handle);
+ rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
+ if (rc)
+ return rc;
- /* Get the attr again after ref_del */
- rc = mdd_la_get(env, mdd_sobj, so_attr);
- if (rc != 0)
- GOTO(out_unlock, rc);
+ if (S_ISDIR(attr->la_mode)) {
+ rc = mdo_declare_ref_del(env, spobj, handle);
+ if (rc)
+ return rc;
+ }
- ma->ma_attr = *so_attr;
- ma->ma_valid |= MA_INODE;
+ rc = mdo_declare_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+ lname->ln_name, handle);
+ if (rc)
+ return rc;
- rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
- if (rc != 0)
- GOTO(out_unlock, rc);
+ rc = mdd_declare_links_add(env, do_create ? tobj : sobj, handle, ldata);
+ if (rc)
+ return rc;
- rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
- mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
- mdo2fid(mdd_pobj), lname, lname, handle);
- if (rc != 0) {
- CWARN("%s: changelog for migrate %s "DFID
- "under "DFID" failed: rc = %d\n",
- mdd2obd_dev(mdd)->obd_name, lname->ln_name,
- PFID(mdd_object_fid(mdd_sobj)),
- PFID(mdd_object_fid(mdd_pobj)), rc);
- /* Sigh, there are no easy way to migrate back the object, so
- * let's reset the result to 0 for now XXX */
- rc = 0;
+ if (S_ISDIR(attr->la_mode)) {
+ rc = mdo_declare_ref_add(env, tpobj, handle);
+ if (rc)
+ return rc;
}
-out_unlock:
- mdd_write_unlock(env, mdd_sobj);
-stop_trans:
- rc = mdd_trans_stop(env, mdd, rc, handle);
-
- RETURN(rc);
-}
-
-static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
- const struct lu_fid *fid, __u32 *mdt_index)
-{
- struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
- struct seq_server_site *ss;
- int rc;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdo_declare_attr_set(env, spobj, la, handle);
+ if (rc)
+ return rc;
- ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
+ if (tpobj != spobj) {
+ rc = mdo_declare_attr_set(env, tpobj, la, handle);
+ if (rc)
+ return rc;
+ }
- range->lsr_flags = LU_SEQ_RANGE_MDT;
- rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
- if (rc != 0)
- return rc;
+ if (do_create && do_destroy) {
+ rc = mdo_declare_ref_del(env, sobj, handle);
+ if (rc)
+ return rc;
- *mdt_index = range->lsr_index;
+ rc = mdo_declare_destroy(env, sobj, handle);
+ if (rc)
+ return rc;
+ }
- return 0;
+ return rc;
}
+
/**
- * Check whether we should migrate the file/dir
- * return val
- * < 0 permission check failed or other error.
- * = 0 the file can be migrated.
- * > 0 the file does not need to be migrated, mostly
- * for multiple link file
+ * migrate dirent from \a spobj to \a tpobj, and destroy \a sobj
**/
-static int mdd_migrate_sanity_check(const struct lu_env *env,
- struct mdd_object *pobj,
- const struct lu_attr *pattr,
- struct mdd_object *sobj,
- struct lu_attr *sattr)
+static int mdd_migrate_update(const struct lu_env *env,
+ struct mdd_object *spobj,
+ struct mdd_object *tpobj,
+ struct mdd_object *sobj,
+ struct mdd_object *tobj,
+ const struct lu_name *lname,
+ struct lu_attr *attr,
+ struct lu_attr *spattr,
+ struct lu_attr *tpattr,
+ struct linkea_data *ldata,
+ bool do_create,
+ bool do_destroy,
+ struct md_attr *ma,
+ struct thandle *handle)
{
- struct mdd_thread_info *info = mdd_env_info(env);
- struct linkea_data *ldata = &info->mti_link_data;
- struct mdd_device *mdd = mdo2mdd(&pobj->mod_obj);
- int mgr_easize;
- struct lu_buf *mgr_buf;
- int count;
- int rc;
- __u64 mdt_index;
+ struct mdd_thread_info *info = mdd_env_info(env);
+ const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
+ struct lu_attr *la = &info->mti_la_for_fix;
+ int rc;
+
ENTRY;
- mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
- mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
- if (mgr_buf->lb_buf == NULL)
- RETURN(-ENOMEM);
+ CDEBUG(D_INFO, "update %s "DFID"/"DFID" to "DFID"/"DFID"\n",
+ lname->ln_name, PFID(mdo2fid(spobj)),
+ PFID(mdo2fid(sobj)), PFID(mdo2fid(tpobj)),
+ PFID(fid));
- rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
- if (rc > 0) {
- union lmv_mds_md *lmm = mgr_buf->lb_buf;
-
- /* If the object has migrateEA, it means IMMUTE flag
- * is being set by previous migration process, so it
- * needs to override the IMMUTE flag, otherwise the
- * following sanity check will fail */
- if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
- LMV_HASH_FLAG_MIGRATION) {
- struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
-
- sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
- CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
- mdd2obd_dev(mdd)->obd_name,
- PFID(mdd_object_fid(sobj)));
- }
- }
+ rc = __mdd_index_delete(env, spobj, lname->ln_name,
+ S_ISDIR(attr->la_mode), handle);
+ if (rc)
+ RETURN(rc);
- rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
- sobj, sattr, NULL, NULL);
- if (rc != 0)
+ rc = __mdd_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+ lname->ln_name, handle);
+ if (rc)
RETURN(rc);
- /* Then it will check if the file should be migrated. If the file
- * has mulitple links, we only need migrate the file if all of its
- * entries has been migrated to the remote MDT */
- if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
- RETURN(0);
+ rc = mdd_links_write(env, do_create ? tobj : sobj, ldata, handle);
+ if (rc)
+ RETURN(rc);
- rc = mdd_links_read(env, sobj, ldata);
- if (rc != 0) {
- /* For multiple links files, if there are no linkEA data at all,
- * means the file might be created before linkEA is enabled, and
- * all of its links should not be migrated yet, otherwise it
- * should have some linkEA there */
- if (rc == -ENOENT || rc == -ENODATA)
- RETURN(1);
+ la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ mdd_write_lock(env, spobj, MOR_SRC_PARENT);
+ rc = mdd_update_time(env, spobj, spattr, la, handle);
+ mdd_write_unlock(env, spobj);
+ if (rc)
RETURN(rc);
+
+ if (tpobj != spobj) {
+ la->la_valid = LA_CTIME | LA_MTIME;
+ mdd_write_lock(env, tpobj, MOR_TGT_PARENT);
+ rc = mdd_update_time(env, tpobj, tpattr, la, handle);
+ mdd_write_unlock(env, tpobj);
+ if (rc)
+ RETURN(rc);
}
- mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
- /* If there are still links locally, then the file will not be
- * migrated. */
- LASSERT(ldata->ld_leh != NULL);
+ /*
+ * there are three situations we shouldn't destroy source:
+ * 1. if source is not dir, and it happens to be located on the same MDT
+ * as target parent.
+ * 2. if source is not dir, and has link on the same MDT where source is
+ * located.
+ * 3. if source is dir, and it's a normal, non-empty dir.
+ *
+ * the first two situations equals to !do_create, and the 3rd equals to
+ * !do_destroy, so the below condition is actually
+ * !(!do_create || !do_destroy).
+ *
+ * NB, if user has opened source dir before migration, he will get
+ * -ENOENT error when close it later, because source is likely to be
+ * remote, which can't be moved to orphan list, but except this error
+ * message, this won't cause any inconsistency or trouble.
+ */
+ if (do_create && do_destroy) {
+ mdd_write_lock(env, sobj, MOR_SRC_CHILD);
+ mdo_ref_del(env, sobj, handle);
+ rc = mdo_destroy(env, sobj, handle);
+ mdd_write_unlock(env, sobj);
+ }
- /* If the linkEA is overflow, then means there are some unknown name
- * entries under unknown parents, that will prevent the migration. */
- if (unlikely(ldata->ld_leh->leh_overflow_time))
- RETURN(1);
+ RETURN(rc);
+}
- ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
- for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
- struct lu_name lname;
- struct lu_fid fid;
- __u32 parent_mdt_index;
+/**
+ * Migrate directory or file.
+ *
+ * migrate source to target in following steps:
+ * 1. create target, append source stripes after target's if it's directory,
+ * migrate xattrs and update fid of source links.
+ * 2. update namespace: migrate dirent from source parent to target parent,
+ * update file linkea, and destroy source if it's not needed any more.
+ *
+ * \param[in] env execution environment
+ * \param[in] md_pobj parent master object
+ * \param[in] md_sobj source object
+ * \param[in] lname file name
+ * \param[in] md_tobj target object
+ * \param[in] spec target creation spec
+ * \param[in] ma used to update \a pobj mtime and ctime
+ *
+ * \retval 0 on success
+ * \retval -errno on failure
+ */
+static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
+ struct md_object *md_sobj, const struct lu_name *lname,
+ struct md_object *md_tobj, struct md_op_spec *spec,
+ struct md_attr *ma)
+{
+ struct mdd_device *mdd = mdo2mdd(md_pobj);
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct mdd_object *pobj = md2mdd_obj(md_pobj);
+ struct mdd_object *sobj = md2mdd_obj(md_sobj);
+ struct mdd_object *tobj = md2mdd_obj(md_tobj);
+ struct mdd_object *spobj = NULL;
+ struct mdd_object *tpobj = NULL;
+ struct lu_attr *spattr = &info->mti_pattr;
+ struct lu_attr *tpattr = &info->mti_tpattr;
+ struct lu_attr *attr = &info->mti_cattr;
+ struct linkea_data *ldata = &info->mti_link_data;
+ struct dt_allocation_hint *hint = &info->mti_hint;
+ struct lu_fid *fid = &info->mti_fid2;
+ struct lu_buf pbuf = { NULL };
+ struct lu_buf sbuf = { NULL };
+ struct lmv_mds_md_v1 *plmv;
+ struct thandle *handle;
+ bool do_create = true;
+ bool do_destroy = true;
+ int rc;
+ ENTRY;
- linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
- &lname, &fid);
- ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
- ldata->ld_reclen);
+ rc = mdd_la_get(env, sobj, attr);
+ if (rc)
+ RETURN(rc);
- rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
- if (rc != 0)
- RETURN(rc);
+ /* locate source and target stripe on pobj, which are the real parent */
+ rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
+ if (rc < 0 && rc != -ENODATA)
+ RETURN(rc);
+
+ plmv = pbuf.lb_buf;
+ if (plmv) {
+ __u32 hash_type = le32_to_cpu(plmv->lmv_hash_type);
+ __u32 count = le32_to_cpu(plmv->lmv_stripe_count);
+ int index;
- /* Migrate the object only if none of its parents are on the
- * current MDT. */
- if (parent_mdt_index != mdt_index)
- continue;
+ /* locate target parent stripe */
+ if (hash_type & LMV_HASH_FLAG_MIGRATION) {
+ /*
+ * fail check here to make sure top dir migration
+ * succeed.
+ */
+ if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+ GOTO(out, rc = -EIO);
+ hash_type &= ~LMV_HASH_FLAG_MIGRATION;
+ count = le32_to_cpu(plmv->lmv_migrate_offset);
+ }
+ index = lmv_name_to_stripe_index(hash_type, count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (index < 0)
+ GOTO(out, rc = index);
+
+ fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+ tpobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(tpobj))
+ GOTO(out, rc = PTR_ERR(tpobj));
+
+ /* locate source parent stripe */
+ if (le32_to_cpu(plmv->lmv_hash_type) &
+ LMV_HASH_FLAG_MIGRATION) {
+ hash_type = le32_to_cpu(plmv->lmv_migrate_hash);
+ count = le32_to_cpu(plmv->lmv_stripe_count) -
+ le32_to_cpu(plmv->lmv_migrate_offset);
+
+ index = lmv_name_to_stripe_index(hash_type, count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (index < 0) {
+ mdd_object_put(env, tpobj);
+ GOTO(out, rc = index);
+ }
- CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
- PFID(mdd_object_fid(sobj)), lname.ln_namelen,
- lname.ln_name, PFID(&fid));
- rc = 1;
- break;
+ index += le32_to_cpu(plmv->lmv_migrate_offset);
+ fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+ spobj = mdd_object_find(env, mdd, fid);
+ if (IS_ERR(spobj)) {
+ mdd_object_put(env, tpobj);
+ GOTO(out, rc = PTR_ERR(spobj));
+ }
+ } else {
+ spobj = tpobj;
+ mdd_object_get(spobj);
+ }
+ } else {
+ tpobj = pobj;
+ spobj = pobj;
+ mdd_object_get(tpobj);
+ mdd_object_get(spobj);
}
- RETURN(rc);
-}
+ rc = mdd_la_get(env, spobj, spattr);
+ if (rc)
+ GOTO(out, rc);
-static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
- struct md_object *sobj, const struct lu_name *lname,
- struct md_object *tobj, struct md_attr *ma)
-{
- struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
- struct mdd_device *mdd = mdo2mdd(pobj);
- struct mdd_object *mdd_sobj = md2mdd_obj(sobj);
- struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
- struct lu_attr *so_attr = MDD_ENV_VAR(env, cattr);
- struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
- bool created = false;
- int rc;
+ rc = mdd_la_get(env, tpobj, tpattr);
+ if (rc)
+ GOTO(out, rc);
- ENTRY;
- /* If the file will being migrated, it will check whether
- * the file is being opened by someone else right now */
- mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
- if (mdd_sobj->mod_count > 0) {
- CDEBUG(D_OTHER,
- "%s: "DFID"%s is already opened count %d: rc = %d\n",
- mdd2obd_dev(mdd)->obd_name,
- PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
- mdd_sobj->mod_count, -EBUSY);
- mdd_read_unlock(env, mdd_sobj);
- GOTO(put, rc = -EBUSY);
- }
- mdd_read_unlock(env, mdd_sobj);
+ if (S_ISDIR(attr->la_mode)) {
+ struct lmv_user_md_v1 *lmu = spec->u.sp_ea.eadata;
- rc = mdd_la_get(env, mdd_sobj, so_attr);
- if (rc != 0)
- GOTO(put, rc);
+ LASSERT(lmu);
- rc = mdd_la_get(env, mdd_pobj, pattr);
- if (rc != 0)
- GOTO(put, rc);
+ /*
+ * if user use default value '0' for stripe_count, we need to
+ * adjust it to '1' to create a 1-stripe directory.
+ */
+ if (lmu->lum_stripe_count == 0) {
+ /* eadata is from request, don't alter it */
+ info->mti_lmu = *lmu;
+ info->mti_lmu.lum_stripe_count = cpu_to_le32(1);
+ spec->u.sp_ea.eadata = &info->mti_lmu;
+ lmu = spec->u.sp_ea.eadata;
+ }
- rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
- if (rc != 0) {
- if (rc > 0)
- rc = 0;
- GOTO(put, rc);
+ rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
+ if (rc == -ENODATA) {
+ if (mdd_dir_is_empty(env, sobj) == 0) {
+ /*
+ * if sobj is empty, and target is not striped,
+ * create target as a normal directory.
+ */
+ if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+ info->mti_lmu = *lmu;
+ info->mti_lmu.lum_stripe_count = 0;
+ spec->u.sp_ea.eadata = &info->mti_lmu;
+ lmu = spec->u.sp_ea.eadata;
+ }
+ } else {
+ /*
+ * sobj is not striped dir, if it's not empty,
+ * it will be migrated to be a stripe of target,
+ * don't destroy it after migration.
+ */
+ do_destroy = false;
+ }
+ } else if (rc) {
+ GOTO(out, rc);
+ } else {
+ struct lmv_mds_md_v1 *lmv = sbuf.lb_buf;
+
+ if (le32_to_cpu(lmv->lmv_hash_type) &
+ LMV_HASH_FLAG_MIGRATION) {
+ __u32 lum_stripe_count = lmu->lum_stripe_count;
+ __u32 lmv_hash_type = lmv->lmv_hash_type &
+ cpu_to_le32(LMV_HASH_TYPE_MASK);
+
+ if (!lum_stripe_count)
+ lum_stripe_count = cpu_to_le32(1);
+
+ /* TODO: check specific MDTs */
+ if (lmv->lmv_migrate_offset !=
+ lum_stripe_count ||
+ lmv->lmv_master_mdt_index !=
+ lmu->lum_stripe_offset ||
+ (lmv_hash_type != 0 &&
+ lmv_hash_type != lmu->lum_hash_type)) {
+ CERROR("%s: \'"DNAME"\' migration was "
+ "interrupted, run \'lfs migrate "
+ "-m %d -c %d -H %d "DNAME"\' to "
+ "finish migration.\n",
+ mdd2obd_dev(mdd)->obd_name,
+ PNAME(lname),
+ le32_to_cpu(
+ lmv->lmv_master_mdt_index),
+ le32_to_cpu(
+ lmv->lmv_migrate_offset),
+ le32_to_cpu(lmv_hash_type),
+ PNAME(lname));
+ GOTO(out, rc = -EPERM);
+ }
+ GOTO(out, rc = -EALREADY);
+ }
+ }
+ } else if (!mdd_object_remote(tpobj)) {
+ /*
+ * if source is already on MDT where target parent is located,
+ * no need to create, just update namespace.
+ */
+ do_create = false;
+ } else if (S_ISLNK(attr->la_mode)) {
+ lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
+ if (!sbuf.lb_buf)
+ GOTO(out, rc = -ENOMEM);
+ rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
+ if (rc <= 0) {
+ rc = rc ?: -EFAULT;
+ CERROR("%s: "DFID" readlink failed: rc = %d\n",
+ mdd2obd_dev(mdd)->obd_name,
+ PFID(mdo2fid(sobj)), rc);
+ GOTO(out, rc);
+ }
+ spec->u.sp_symname = sbuf.lb_buf;
+ } else if (S_ISREG(attr->la_mode)) {
+ spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+ spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
}
- /* Sigh, it is impossible to finish all of migration in a single
- * transaction, for example migrating big directory entries to the
- * new MDT, it needs insert all of name entries of children in the
- * new directory.
- *
- * So migration will be done in multiple steps and transactions.
- *
- * 1. create an orphan object on the remote MDT in one transaction.
- * 2. migrate extend attributes to the new target file/directory.
- * 3. For directory, migrate the entries to the new MDT and update
- * linkEA of each children. Because we can not migrate all entries
- * in a single transaction, so the migrating directory will become
- * a striped directory during migration, so once the process is
- * interrupted, the directory is still accessible. (During lookup,
- * client will locate the name by searching both original and target
- * object).
- * 4. Finally, update the name/FID to point to the new file/directory
- * in a separate transaction.
+ /*
+ * if sobj has link on the same MDT, no need to create, just update
+ * namespace, and it will be a remote file on target parent, which is
+ * similar to rename.
*/
+ rc = migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname, attr,
+ ldata);
+ if (rc > 0)
+ do_create = false;
+ else if (rc)
+ GOTO(out, rc);
- /* step 1: Check whether the orphan object has been created, and create
- * orphan object on the remote MDT if needed */
- if (!mdd_object_exists(mdd_tobj)) {
- rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
- lname, so_attr);
- if (rc != 0)
- GOTO(put, rc);
- created = true;
+ rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
+ spattr, tpattr, attr);
+ if (rc)
+ GOTO(out, rc);
+
+ mdd_object_make_hint(env, NULL, tobj, attr, spec, hint);
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ GOTO(out, rc = PTR_ERR(handle));
+
+ if (do_create) {
+ rc = mdd_declare_migrate_create(env, tpobj, sobj, tobj, lname,
+ attr, &sbuf, ldata, spec, hint,
+ handle);
+ if (rc)
+ GOTO(stop_trans, rc);
}
- LASSERT(mdd_object_exists(mdd_tobj));
- /* step 2: migrate xattr */
- rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
- if (rc != 0)
- GOTO(put, rc);
+ rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, tobj, lname,
+ attr, spattr, tpattr, ldata, do_create,
+ do_destroy, ma, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
- /* step 3: migrate name entries to the orphan object */
- if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
- rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
- if (rc != 0)
- GOTO(put, rc);
- if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
- OBD_FAIL_MDS_REINT_NET_REP)))
- GOTO(put, rc = 0);
- } else {
- OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
+ rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+ handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ if (do_create) {
+ rc = mdd_migrate_create(env, tpobj, sobj, tobj, lname, attr,
+ &sbuf, ldata, spec, hint, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
}
- LASSERT(mdd_object_exists(mdd_tobj));
- /* step 4: update name entry to the new object */
- rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
- ma);
- if (rc != 0)
- GOTO(put, rc);
+ rc = mdd_migrate_update(env, spobj, tpobj, sobj, tobj, lname, attr,
+ spattr, tpattr, ldata, do_create, do_destroy,
+ ma, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
- /* newly created target was not locked, don't cache its attributes */
- if (created)
- mdd_invalidate(env, tobj);
-put:
- RETURN(rc);
+ rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
+ mdo2fid(spobj), mdo2fid(sobj),
+ mdo2fid(tpobj), lname, lname, handle);
+ if (rc)
+ GOTO(stop_trans, rc);
+
+ EXIT;
+stop_trans:
+ rc = mdd_trans_stop(env, mdd, rc, handle);
+out:
+ if (spobj && !IS_ERR(spobj))
+ mdd_object_put(env, spobj);
+ if (tpobj && !IS_ERR(tpobj))
+ mdd_object_put(env, tpobj);
+ lu_buf_free(&sbuf);
+ lu_buf_free(&pbuf);
+ return rc;
}
const struct md_dir_operations mdd_dir_ops = {
* then mti_ent::lde_name will be mti_key. */
struct lu_dirent mti_ent;
char mti_key[NAME_MAX + 16];
+ char mti_name[NAME_MAX + 1];
struct lu_buf mti_buf[4];
struct lu_buf mti_big_buf; /* biggish persistent buf */
struct lu_buf mti_link_buf; /* buf for link ea */
struct lu_buf mti_xattr_buf;
struct obdo mti_oa;
+ struct lmv_user_md mti_lmu;
struct dt_allocation_hint mti_hint;
struct dt_object_format mti_dof;
struct linkea_data mti_link_data;
struct thandle *handle,
const struct md_op_spec *spec,
struct dt_allocation_hint *hint);
-int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
- struct lu_buf *lmm_buf);
+int mdd_stripe_get(const struct lu_env *env, struct mdd_object *obj,
+ struct lu_buf *lmm_buf, const char *name);
/* mdd_trans.c */
void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
if (la_copy->la_valid & LA_SIZE) {
struct lu_buf *lov_buf = mdd_buf_get(env, NULL, 0);
- rc = mdd_get_lov_ea(env, mdd_obj, lov_buf);
+ rc = mdd_stripe_get(env, mdd_obj, lov_buf, XATTR_NAME_LOV);
if (rc) {
rc = 0;
} else {
/* get EA of victim file */
memset(buf_vic, 0, sizeof(*buf_vic));
- rc = mdd_get_lov_ea(env, vic, buf_vic);
+ rc = mdd_stripe_get(env, vic, buf_vic, XATTR_NAME_LOV);
if (rc < 0) {
if (rc == -ENODATA)
rc = 0;
/* save EA of target file for restore */
memset(buf, 0, sizeof(*buf));
- rc = mdd_get_lov_ea(env, obj, buf);
+ rc = mdd_stripe_get(env, obj, buf, XATTR_NAME_LOV);
if (rc < 0)
GOTO(out, rc);
/* get EA of mirrored file */
memset(buf_save, 0, sizeof(*buf));
- rc = mdd_get_lov_ea(env, obj, buf_save);
+ rc = mdd_stripe_get(env, obj, buf_save, XATTR_NAME_LOV);
if (rc < 0)
GOTO(out, rc);
}
/*
- * read lov EA of an object
- * return the lov EA in an allocated lu_buf
+ * read lov/lmv EA of an object
+ * return the lov/lmv EA in an allocated lu_buf
*/
-int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
- struct lu_buf *lmm_buf)
+int mdd_stripe_get(const struct lu_env *env, struct mdd_object *obj,
+ struct lu_buf *lmm_buf, const char *name)
{
- struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
- int rc, bufsize;
+ struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
+ int rc;
+
ENTRY;
-repeat:
- rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV);
+ if (buf->lb_buf == NULL) {
+ buf = lu_buf_check_and_alloc(buf, 4096);
+ if (buf->lb_buf == NULL)
+ RETURN(-ENOMEM);
+ }
+repeat:
+ rc = mdo_xattr_get(env, obj, buf, name);
if (rc == -ERANGE) {
/* mti_big_buf is allocated but is too small
* we need to increase it */
buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
buf->lb_len * 2);
if (buf->lb_buf == NULL)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
goto repeat;
- }
-
- if (rc < 0)
+ } else if (rc < 0) {
RETURN(rc);
-
- if (rc == 0)
+ } else if (rc == 0) {
RETURN(-ENODATA);
-
- bufsize = rc;
- if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
- /* mti_big_buf was not allocated, so we have to
- * allocate it based on the ea size */
- buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
- bufsize);
- if (buf->lb_buf == NULL)
- GOTO(out, rc = -ENOMEM);
- goto repeat;
}
- lu_buf_alloc(lmm_buf, bufsize);
+ lu_buf_alloc(lmm_buf, rc);
if (lmm_buf->lb_buf == NULL)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
- memcpy(lmm_buf->lb_buf, buf->lb_buf, bufsize);
- rc = 0;
- EXIT;
+ /*
+ * we don't use lmm_buf directly, because we don't know xattr size, so
+ * by using mti_big_buf we can avoid calling mdo_xattr_get() twice.
+ */
+ memcpy(lmm_buf->lb_buf, buf->lb_buf, rc);
-out:
- if (rc < 0)
- lu_buf_free(lmm_buf);
- return rc;
+ RETURN(0);
}
static int mdd_xattr_hsm_replace(const struct lu_env *env,
mdd_write_lock(env, fst_o, MOR_TGT_CHILD);
mdd_write_lock(env, snd_o, MOR_TGT_CHILD);
- rc = mdd_get_lov_ea(env, fst_o, fst_buf);
+ rc = mdd_stripe_get(env, fst_o, fst_buf, XATTR_NAME_LOV);
if (rc < 0 && rc != -ENODATA)
GOTO(stop, rc);
- rc = mdd_get_lov_ea(env, snd_o, snd_buf);
+ rc = mdd_stripe_get(env, snd_o, snd_buf, XATTR_NAME_LOV);
if (rc < 0 && rc != -ENODATA)
GOTO(stop, rc);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- rc = mdd_get_lov_ea(env, obj, buf);
+ rc = mdd_stripe_get(env, obj, buf, XATTR_NAME_LOV);
if (rc < 0) {
if (rc == -ENODATA)
rc = -EINVAL;
{
struct mdt_device *mdt = info->mti_mdt;
struct lu_name *lname = &info->mti_name;
- char *name = NULL;
+ char *filename = info->mti_filename;
struct mdt_object *parent;
u32 mode;
int rc = 0;
LASSERT(!info->mti_cross_ref);
- OBD_ALLOC(name, NAME_MAX + 1);
- if (name == NULL)
- return -ENOMEM;
- lname->ln_name = name;
-
/*
* We may want to allow this to mount a completely separate
* fileset from the MDT in the future, but keeping it to
break;
}
- strncpy(name, s1, lname->ln_namelen);
- name[lname->ln_namelen] = '\0';
+ strncpy(filename, s1, lname->ln_namelen);
+ filename[lname->ln_namelen] = '\0';
+ lname->ln_name = filename;
parent = mdt_object_find(info->mti_env, mdt, fid);
if (IS_ERR(parent)) {
}
}
- OBD_FREE(name, NAME_MAX + 1);
-
return rc;
}
return -EINVAL;
}
+ LASSERT(buf->lb_buf);
+
rc = mo_xattr_get(info->mti_env, next, buf, name);
if (rc > 0) {
}
/* this should sync the whole device */
-static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
+int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
{
struct dt_device *dt = mdt->mdt_bottom;
int rc;
mo_acl:1,
mo_cos:1,
mo_evict_tgt_nids:1,
- mo_dom_read_open:1;
+ mo_dom_read_open:1,
+ mo_migrate_hsm_allowed:1;
unsigned int mo_dom_lock;
} mdt_opts;
/* mdt state flags */
/* Ops object filename */
struct lu_name mti_name;
+ char mti_filename[NAME_MAX + 1];
/* per-thread values, can be re-used, may be vmalloc'd */
void *mti_big_lmm;
void *mti_big_acl;
int mti_big_aclsize;
/* should be enough to fit lustre_mdt_attrs */
char mti_xattr_buf[128];
- struct ldlm_enqueue_info mti_einfo;
+ struct ldlm_enqueue_info mti_einfo[2];
/* einfo used by mdt_remote_object_lock_try() */
struct ldlm_enqueue_info mti_remote_einfo;
struct tg_reply_data *mti_reply_data;
void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *);
int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
struct md_layout_change *spec);
+int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt);
struct lu_buf *mdt_buf(const struct lu_env *env, void *area, ssize_t len);
const struct lu_buf *mdt_buf_const(const struct lu_env *env,
if (rc < 0)
RETURN(rc);
+ spec->no_create = !!req_is_replay(mdt_info_req(info));
+
+ rc = mdt_dlmreq_unpack(info);
+
+ RETURN(rc);
+}
+
+static int mdt_migrate_unpack(struct mdt_thread_info *info)
+{
+ struct lu_ucred *uc = mdt_ucred(info);
+ struct mdt_rec_rename *rec;
+ struct lu_attr *attr = &info->mti_attr.ma_attr;
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct req_capsule *pill = info->mti_pill;
+ struct md_op_spec *spec = &info->mti_spec;
+ int rc;
+
+ ENTRY;
+
+ CLASSERT(sizeof(*rec) == sizeof(struct mdt_rec_reint));
+ rec = req_capsule_client_get(pill, &RMF_REC_REINT);
+ if (rec == NULL)
+ RETURN(-EFAULT);
+
+ /* This prior initialization is needed for old_init_ucred_reint() */
+ uc->uc_fsuid = rec->rn_fsuid;
+ uc->uc_fsgid = rec->rn_fsgid;
+ uc->uc_cap = rec->rn_cap;
+ uc->uc_suppgids[0] = rec->rn_suppgid1;
+ uc->uc_suppgids[1] = rec->rn_suppgid2;
+
+ attr->la_uid = rec->rn_fsuid;
+ attr->la_gid = rec->rn_fsgid;
+ rr->rr_fid1 = &rec->rn_fid1;
+ rr->rr_fid2 = &rec->rn_fid2;
+ attr->la_ctime = rec->rn_time;
+ attr->la_mtime = rec->rn_time;
+ /* rename_tgt contains the mode already */
+ attr->la_mode = rec->rn_mode;
+ attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME | LA_MODE;
+
+ rc = mdt_name_unpack(pill, &RMF_NAME, &rr->rr_name, 0);
+ if (rc < 0)
+ RETURN(rc);
+
if (rec->rn_bias & MDS_CLOSE_MIGRATE) {
- req_capsule_extend(info->mti_pill, &RQF_MDS_REINT_MIGRATE);
rc = mdt_close_handle_unpack(info);
if (rc)
RETURN(rc);
[REINT_OPEN] = mdt_open_unpack,
[REINT_SETXATTR] = mdt_setxattr_unpack,
[REINT_RMENTRY] = mdt_rmentry_unpack,
- [REINT_MIGRATE] = mdt_rename_unpack,
+ [REINT_MIGRATE] = mdt_migrate_unpack,
[REINT_RESYNC] = mdt_resync_unpack,
};
}
LPROC_SEQ_FOPS(mdt_dom_read_open);
+static int mdt_migrate_hsm_allowed_seq_show(struct seq_file *m, void *data)
+{
+ struct obd_device *obd = m->private;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+ seq_printf(m, "%u\n", (mdt->mdt_opts.mo_migrate_hsm_allowed != 0));
+ return 0;
+}
+
+static ssize_t
+mdt_migrate_hsm_allowed_seq_write(struct file *file, const char __user *buffer,
+ size_t count, loff_t *off)
+{
+ struct seq_file *m = file->private_data;
+ struct obd_device *obd = m->private;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ bool val;
+ int rc;
+
+ rc = kstrtobool_from_user(buffer, count, &val);
+ if (rc)
+ return rc;
+
+ mdt->mdt_opts.mo_migrate_hsm_allowed = val;
+ return count;
+}
+LPROC_SEQ_FOPS(mdt_migrate_hsm_allowed);
+
LPROC_SEQ_FOPS_RO_TYPE(mdt, recovery_status);
LPROC_SEQ_FOPS_RO_TYPE(mdt, num_exports);
LPROC_SEQ_FOPS_RO_TYPE(mdt, target_instance);
.fops = &mdt_sync_count_fops },
{ .name = "dom_lock",
.fops = &mdt_dom_lock_fops },
- { .name = "dom_read_open",
- .fops = &mdt_dom_read_open_fops },
+ { .name = "migrate_hsm_allowed",
+ .fops = &mdt_migrate_hsm_allowed_fops },
{ NULL }
};
*/
if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
struct mdt_lock_handle *lhc;
- struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
bool cos_incompat;
rc = mdt_object_striped(info, child);
int do_vbr = ma->ma_attr.la_valid &
(LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
__u64 lockpart = MDS_INODELOCK_UPDATE;
- struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
bool cos_incompat;
int rc;
ENTRY;
struct mdt_object *mc;
struct mdt_lock_handle *parent_lh;
struct mdt_lock_handle *child_lh;
- struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+ struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
__u64 lock_ibits;
bool cos_incompat = false, discard = false;
int no_name = 0;
EXIT;
}
-/* Update object linkEA */
-struct mdt_lock_list {
- struct mdt_object *mll_obj;
- struct mdt_lock_handle mll_lh;
- struct list_head mll_list;
+static struct mdt_object *mdt_object_find_check(struct mdt_thread_info *info,
+ const struct lu_fid *fid,
+ int idx)
+{
+ struct mdt_object *dir;
+ int rc;
+
+ ENTRY;
+
+ dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+ if (IS_ERR(dir))
+ RETURN(dir);
+
+ /* check early, the real version will be saved after locking */
+ rc = mdt_version_get_check(info, dir, idx);
+ if (rc)
+ GOTO(out_put, rc);
+
+ RETURN(dir);
+out_put:
+ mdt_object_put(info->mti_env, dir);
+ return ERR_PTR(rc);
+}
+
+/*
+ * in case obj is remote obj on its parent, revoke LOOKUP lock,
+ * herein we don't really check it, just do revoke.
+ */
+static int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
+ struct mdt_object *pobj,
+ struct mdt_object *obj)
+{
+ struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+ int rc;
+
+ mdt_lock_handle_init(lh);
+ mdt_lock_reg_init(lh, LCK_EX);
+
+ if (mdt_object_remote(pobj)) {
+ rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
+ &lh->mlh_rreg_lh, LCK_EX,
+ MDS_INODELOCK_LOOKUP, false);
+ } else {
+ struct ldlm_res_id *res = &info->mti_res_id;
+ union ldlm_policy_data *policy = &info->mti_policy;
+ __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
+ LDLM_FL_COS_INCOMPAT;
+
+ fid_build_reg_res_name(mdt_object_fid(obj), res);
+ memset(policy, 0, sizeof(*policy));
+ policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
+ rc = mdt_fid_lock(info->mti_mdt->mdt_namespace, &lh->mlh_reg_lh,
+ LCK_EX, policy, res, dlmflags, NULL);
+ }
+
+ if (rc != ELDLM_OK)
+ return rc;
+
+ /*
+ * TODO, currently we don't save this lock because there is no place to
+ * hold this lock handle, but to avoid race we need to save this lock.
+ */
+ mdt_object_unlock(info, NULL, lh, 1);
+
+ return 0;
+}
+
+/*
+ * operation may takes locks of linkea, or directory stripes, group them in
+ * different list.
+ */
+struct mdt_sub_lock {
+ struct mdt_object *msl_obj;
+ struct mdt_lock_handle msl_lh;
+ struct list_head msl_linkage;
};
static void mdt_unlock_list(struct mdt_thread_info *info,
- struct list_head *list, int rc)
+ struct list_head *list, int decref)
{
- struct mdt_lock_list *mll;
- struct mdt_lock_list *mll2;
+ struct mdt_sub_lock *msl;
+ struct mdt_sub_lock *tmp;
- list_for_each_entry_safe(mll, mll2, list, mll_list) {
- mdt_object_unlock_put(info, mll->mll_obj, &mll->mll_lh, rc);
- list_del(&mll->mll_list);
- OBD_FREE_PTR(mll);
+ list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
+ mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
+ list_del(&msl->msl_linkage);
+ OBD_FREE_PTR(msl);
}
}
-static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
- struct mdt_object *obj,
- struct mdt_object *pobj,
- struct list_head *lock_list)
+/*
+ * lock parents of links, and also check whether total locks don't exceed
+ * RS_MAX_LOCKS.
+ *
+ * \retval 0 on success, and locks can be saved in ptlrpc_reply_stat
+ * \retval 1 on success, but total lock count may exceed RS_MAX_LOCKS
+ * \retval -ev negative errno upon error
+ */
+static int mdt_lock_links(struct mdt_thread_info *info,
+ struct mdt_object *pobj,
+ const struct md_attr *ma,
+ struct mdt_object *obj,
+ struct list_head *link_locks)
{
- struct lu_buf *buf = &info->mti_big_buf;
- struct linkea_data ldata = { NULL };
- int count;
- int retry_count;
- int rc;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct lu_buf *buf = &info->mti_big_buf;
+ struct lu_name *lname = &info->mti_name;
+ struct linkea_data ldata = { NULL };
+ bool blocked = false;
+ int retries = 5;
+ int local_lnkp_cnt = 0;
+ int rc;
+
ENTRY;
if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
RETURN(0);
- buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+ buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
if (buf->lb_buf == NULL)
RETURN(-ENOMEM);
ldata.ld_buf = buf;
rc = mdt_links_read(info, obj, &ldata);
- if (rc != 0) {
+ if (rc) {
if (rc == -ENOENT || rc == -ENODATA)
rc = 0;
RETURN(rc);
}
- /* ignore the migrating parent(@pobj) */
- retry_count = ldata.ld_leh->leh_reccount - 1;
-
-again:
- LASSERT(ldata.ld_leh != NULL);
- ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
- for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
- struct mdt_device *mdt = info->mti_mdt;
- struct mdt_object *mdt_pobj;
- struct mdt_lock_list *mll;
- struct lu_name name;
- struct lu_fid fid;
+repeat:
+ for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
+ linkea_next_entry(&ldata)) {
+ struct mdt_object *lnkp;
+ struct mdt_sub_lock *msl;
+ struct lu_fid fid;
__u64 ibits;
- linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
- &name, &fid);
- mdt_pobj = mdt_object_find(info->mti_env, mdt, &fid);
- if (IS_ERR(mdt_pobj)) {
- CWARN("%s: cannot find obj "DFID": rc = %ld\n",
- mdt_obd_name(mdt), PFID(&fid), PTR_ERR(mdt_pobj));
- goto next;
+ linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
+ &fid);
+
+ /* check if it's also linked to parent */
+ if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
+ CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
+ PFID(&fid), PNAME(lname));
+ /* in case link is remote object, revoke LOOKUP lock */
+ rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
+ continue;
}
- if (!mdt_object_exists(mdt_pobj)) {
- CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
- mdt_obd_name(mdt), PFID(&fid));
- mdt_object_put(info->mti_env, mdt_pobj);
- goto next;
+ lnkp = NULL;
+
+ /* check if it's linked to a stripe of parent */
+ if (ma->ma_valid & MA_LMV) {
+ struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+ struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
+ int j = 0;
+
+ for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
+ fid_le_to_cpu(stripe_fid,
+ &lmv->lmv_stripe_fids[j]);
+ if (lu_fid_eq(stripe_fid, &fid)) {
+ CDEBUG(D_INFO, "skip stripe "DFID
+ ", reovke "DNAME"\n",
+ PFID(&fid), PNAME(lname));
+ lnkp = mdt_object_find(info->mti_env,
+ mdt, &fid);
+ if (IS_ERR(lnkp))
+ GOTO(out, rc = PTR_ERR(lnkp));
+ break;
+ }
+ }
+
+ if (lnkp) {
+ rc = mdt_revoke_remote_lookup_lock(info, lnkp,
+ obj);
+ mdt_object_put(info->mti_env, lnkp);
+ continue;
+ }
}
- /* Check if the object already exists in the list */
- list_for_each_entry(mll, lock_list, mll_list) {
- if (mll->mll_obj == mdt_pobj) {
- mdt_object_put(info->mti_env, mdt_pobj);
- goto next;
+ /* Check if it's already locked */
+ list_for_each_entry(msl, link_locks, msl_linkage) {
+ if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
+ CDEBUG(D_INFO,
+ DFID" was locked, revoke "DNAME"\n",
+ PFID(&fid), PNAME(lname));
+ lnkp = msl->msl_obj;
+ break;
}
}
- if (mdt_pobj == pobj) {
- CDEBUG(D_INFO, "%s: skipping parent obj "DFID"\n",
- mdt_obd_name(mdt), PFID(&fid));
- mdt_object_put(info->mti_env, mdt_pobj);
- goto next;
+ if (lnkp) {
+ rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
+ continue;
}
- OBD_ALLOC_PTR(mll);
- if (mll == NULL) {
- mdt_object_put(info->mti_env, mdt_pobj);
- GOTO(out, rc = -ENOMEM);
+ CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
+ PFID(&fid), PNAME(lname));
+
+ lnkp = mdt_object_find(info->mti_env, mdt, &fid);
+ if (IS_ERR(lnkp)) {
+ CWARN("%s: cannot find obj "DFID": %ld\n",
+ mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
+ continue;
+ }
+
+ if (!mdt_object_exists(lnkp)) {
+ CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
+ PFID(&fid), PNAME(lname));
+ mdt_object_put(info->mti_env, lnkp);
+ continue;
}
- /* Since this needs to lock all of objects in linkea, to avoid
- * deadlocks, because it does not follow parent-child order as
- * other MDT operation, let's use try_lock here and if the lock
- * cannot be gotten because of conflicting locks, then drop all
- * current locks, send an AST to the client, and start again. */
- mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
+ if (!mdt_object_remote(lnkp))
+ local_lnkp_cnt++;
+
+ OBD_ALLOC_PTR(msl);
+ if (msl == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ /*
+ * we can't follow parent-child lock order like other MD
+ * operations, use lock_try here to avoid deadlock, if the lock
+ * cannot be taken, drop all locks taken, revoke the blocked
+ * one, and continue processing the remaining entries, and in
+ * the end of the loop restart from beginning.
+ */
+ mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
ibits = 0;
- rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh, &ibits,
+ rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
MDS_INODELOCK_UPDATE, true);
if (!(ibits & MDS_INODELOCK_UPDATE)) {
- mdt_unlock_list(info, lock_list, 0);
+ blocked = true;
- CDEBUG(D_INFO, "%s: busy lock on "DFID" %s retry %d\n",
- mdt_obd_name(mdt), PFID(&fid), name.ln_name,
- retry_count);
+ CDEBUG(D_INFO, "busy lock on "DFID" "DNAME" retry %d\n",
+ PFID(&fid), PNAME(lname), retries);
- if (retry_count == 0) {
- mdt_object_put(info->mti_env, mdt_pobj);
- OBD_FREE_PTR(mll);
- GOTO(out, rc = -EBUSY);
- }
+ mdt_unlock_list(info, link_locks, 1);
- mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
- rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
+ mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
+ rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
MDS_INODELOCK_UPDATE);
- if (rc != 0) {
- mdt_object_put(info->mti_env, mdt_pobj);
- OBD_FREE_PTR(mll);
+ if (rc) {
+ mdt_object_put(info->mti_env, lnkp);
+ OBD_FREE_PTR(msl);
GOTO(out, rc);
}
- if (mdt_object_remote(mdt_pobj)) {
+ if (mdt_object_remote(lnkp)) {
struct ldlm_lock *lock;
- /* For remote object, Set lock to cb_atomic,
+ /*
+ * for remote object, set lock cb_atomic,
* so lock can be released in blocking_ast()
- * immediately, then the next try_lock will
- * have better chance to succeds */
- lock =
- ldlm_handle2lock(&mll->mll_lh.mlh_rreg_lh);
+ * immediately, then the next lock_try will
+ * have better chance of success.
+ */
+ lock = ldlm_handle2lock(
+ &msl->msl_lh.mlh_rreg_lh);
LASSERT(lock != NULL);
lock_res_and_lock(lock);
ldlm_set_atomic_cb(lock);
unlock_res_and_lock(lock);
LDLM_LOCK_PUT(lock);
}
- mdt_object_unlock_put(info, mdt_pobj, &mll->mll_lh, rc);
- OBD_FREE_PTR(mll);
- retry_count--;
- goto again;
+
+ mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
+ OBD_FREE_PTR(msl);
+ continue;
+ }
+
+ INIT_LIST_HEAD(&msl->msl_linkage);
+ msl->msl_obj = lnkp;
+ list_add_tail(&msl->msl_linkage, link_locks);
+
+ rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
+ }
+
+ if (blocked) {
+ rc = -EBUSY;
+ if (--retries > 0) {
+ mdt_unlock_list(info, link_locks, rc);
+ blocked = false;
+ local_lnkp_cnt = 0;
+ goto repeat;
}
- rc = 0;
- INIT_LIST_HEAD(&mll->mll_list);
- mll->mll_obj = mdt_pobj;
- list_add_tail(&mll->mll_list, lock_list);
-next:
- ldata.ld_lee = (struct link_ea_entry *)((char *)ldata.ld_lee +
- ldata.ld_reclen);
}
+
+ EXIT;
out:
- if (rc != 0)
- mdt_unlock_list(info, lock_list, rc);
- RETURN(rc);
+ if (rc)
+ mdt_unlock_list(info, link_locks, rc);
+ else if (local_lnkp_cnt > RS_MAX_LOCKS - 6)
+ /*
+ * parent may have 3 local objects: master object and 2 stripes
+ * (if it's being migrated too); source may have 2 local
+ * objects: master and 1 stripe; target has 1 local object.
+ */
+ rc = 1;
+ return rc;
}
-/* migrate files from one MDT to another MDT */
-static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
- struct mdt_lock_handle *lhc)
+static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ const struct md_attr *ma,
+ struct list_head *slave_locks)
{
- struct mdt_reint_record *rr = &info->mti_rr;
- struct md_attr *ma = &info->mti_attr;
- struct mdt_object *msrcdir;
- struct mdt_object *mold;
- struct mdt_object *mnew = NULL;
- struct mdt_lock_handle *lh_dirp;
- struct mdt_lock_handle *lh_childp;
- struct mdt_lock_handle *lh_tgtp = NULL;
- struct lu_fid *old_fid = &info->mti_tmp_fid1;
- struct list_head lock_list;
- __u64 lock_ibits;
- struct ldlm_lock *lease = NULL;
- bool lock_open_sem = false;
- int rc;
- ENTRY;
+ struct mdt_device *mdt = info->mti_mdt;
+ const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+ struct lu_fid *fid = &info->mti_tmp_fid1;
+ struct mdt_object *slave;
+ struct mdt_sub_lock *msl;
+ int i;
+ int rc;
- CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
- PNAME(&rr->rr_name), PFID(rr->rr_fid2));
+ ENTRY;
- /* 1: lock the source dir. */
- msrcdir = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
- if (IS_ERR(msrcdir)) {
- CDEBUG(D_OTHER, "%s: cannot find source dir "DFID" : rc = %d\n",
- mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
- (int)PTR_ERR(msrcdir));
- RETURN(PTR_ERR(msrcdir));
- }
+ LASSERT(mdt_object_remote(obj));
+ LASSERT(ma->ma_valid & MA_LMV);
+ LASSERT(lmv);
- lh_dirp = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
- rc = mdt_reint_object_lock(info, msrcdir, lh_dirp, MDS_INODELOCK_UPDATE,
- true);
- if (rc)
- GOTO(out_put_parent, rc);
+ if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+ RETURN(-EINVAL);
- if (!mdt_object_remote(msrcdir)) {
- rc = mdt_version_get_check_save(info, msrcdir, 0);
- if (rc)
- GOTO(out_unlock_parent, rc);
- }
+ if (le32_to_cpu(lmv->lmv_stripe_count) < 1)
+ RETURN(0);
- /* 2: sanity check and find the object to be migrated. */
- fid_zero(old_fid);
- rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
- if (rc != 0)
- GOTO(out_unlock_parent, rc);
+ for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
- if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
- GOTO(out_unlock_parent, rc = -EINVAL);
+ slave = mdt_object_find(info->mti_env, mdt, fid);
+ if (IS_ERR(slave))
+ GOTO(out, rc = PTR_ERR(slave));
- if (!fid_is_md_operative(old_fid))
- GOTO(out_unlock_parent, rc = -EPERM);
+ OBD_ALLOC_PTR(msl);
+ if (!msl) {
+ mdt_object_put(info->mti_env, slave);
+ GOTO(out, rc = -ENOMEM);
+ }
- if (lu_fid_eq(old_fid, &info->mti_mdt->mdt_md_root_fid))
- GOTO(out_unlock_parent, rc = -EPERM);
+ mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
+ rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
+ MDS_INODELOCK_UPDATE, true);
+ if (rc) {
+ OBD_FREE_PTR(msl);
+ mdt_object_put(info->mti_env, slave);
+ GOTO(out, rc);
+ }
- mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
- if (IS_ERR(mold))
- GOTO(out_unlock_parent, rc = PTR_ERR(mold));
+ INIT_LIST_HEAD(&msl->msl_linkage);
+ msl->msl_obj = slave;
+ list_add_tail(&msl->msl_linkage, slave_locks);
- if (!mdt_object_exists(mold)) {
- LU_OBJECT_DEBUG(D_INODE, info->mti_env,
- &mold->mot_obj,
- "object does not exist");
- GOTO(out_put_child, rc = -ENOENT);
}
+ EXIT;
- if (mdt_object_remote(mold)) {
- CDEBUG(D_OTHER, "%s: source "DFID" is on the remote MDT\n",
- mdt_obd_name(info->mti_mdt), PFID(old_fid));
- GOTO(out_put_child, rc = -EREMOTE);
- }
+out:
+ if (rc)
+ mdt_unlock_list(info, slave_locks, rc);
+ return rc;
+}
- if (S_ISREG(lu_object_attr(&mold->mot_obj)) &&
- !mdt_object_remote(msrcdir)) {
- CDEBUG(D_OTHER, "%s: parent "DFID" is still on the same"
- " MDT, which should be migrated first:"
- " rc = %d\n", mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(msrcdir)), -EPERM);
- GOTO(out_put_child, rc = -EPERM);
+static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo,
+ struct list_head *slave_locks,
+ int decref)
+{
+ if (mdt_object_remote(obj)) {
+ mdt_unlock_list(info, slave_locks, decref);
+ mdt_object_unlock(info, obj, lh, decref);
+ } else {
+ mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
}
+}
- rc = mdt_remote_permission(info);
- if (rc != 0)
- GOTO(out_put_child, rc);
+/* lock parent and its stripes */
+static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ const struct md_attr *ma,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo,
+ struct list_head *slave_locks)
+{
+ int rc;
- /* 3: iterate the linkea of the object and lock all of the objects */
- INIT_LIST_HEAD(&lock_list);
- rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list);
- if (rc != 0)
- GOTO(out_put_child, rc);
+ if (mdt_object_remote(obj)) {
+ rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
+ &lh->mlh_rreg_lh, LCK_PW,
+ MDS_INODELOCK_UPDATE, false);
+ if (rc != ELDLM_OK)
+ return rc;
- if (info->mti_spec.sp_migrate_close) {
- struct close_data *data;
- struct mdt_body *repbody;
- bool lease_broken = false;
+ /*
+ * if obj is remote and striped, lock its stripes explicitly
+ * because it's not striped in LOD layer on this MDT.
+ */
+ if (ma->ma_valid & MA_LMV) {
+ rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
+ if (rc)
+ mdt_object_unlock(info, obj, lh, rc);
+ }
+ } else {
+ rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
+ einfo, true);
+ }
- if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
- RCL_CLIENT) ||
- !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
- RCL_CLIENT))
- GOTO(out_lease, rc = -EPROTO);
+ return rc;
+}
- data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
- if (data == NULL)
- GOTO(out_lease, rc = -EPROTO);
+/*
+ * in migration, object may be remote, and we need take full lock of it and its
+ * stripes if it's directory, besides, object may be a remote object on its
+ * parent, revoke its LOOKUP lock on where its parent is located.
+ */
+static int mdt_migrate_object_lock(struct mdt_thread_info *info,
+ struct mdt_object *pobj,
+ struct mdt_object *obj,
+ struct mdt_lock_handle *lh,
+ struct ldlm_enqueue_info *einfo,
+ struct list_head *slave_locks)
+{
+ int rc;
- lease = ldlm_handle2lock(&data->cd_handle);
- if (lease == NULL)
- GOTO(out_lease, rc = -ESTALE);
+ if (mdt_object_remote(obj)) {
+ /* don't bother to check if pobj and obj are on the same MDT. */
+ rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
+ if (rc)
+ return rc;
- /* try to hold open_sem so that nobody else can open the file */
- if (!down_write_trylock(&mold->mot_open_sem)) {
- ldlm_lock_cancel(lease);
- GOTO(out_lease, rc = -EBUSY);
- }
+ rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
+ &lh->mlh_rreg_lh, LCK_EX,
+ MDS_INODELOCK_FULL, false);
+ if (rc != ELDLM_OK)
+ return rc;
- lock_open_sem = true;
- /* Check if the lease open lease has already canceled */
- lock_res_and_lock(lease);
- lease_broken = ldlm_is_cancel(lease);
- unlock_res_and_lock(lease);
+ /*
+ * if obj is remote and striped, lock its stripes explicitly
+ * because it's not striped in LOD layer on this MDT.
+ */
+ if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
+ struct md_attr *ma = &info->mti_attr;
- LDLM_DEBUG(lease, DFID " lease broken? %d",
- PFID(mdt_object_fid(mold)), lease_broken);
+ ma->ma_lmv = info->mti_big_lmm;
+ ma->ma_lmv_size = info->mti_big_lmmsize;
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
+ if (rc) {
+ mdt_object_unlock(info, obj, lh, rc);
+ return rc;
+ }
- /* Cancel server side lease. Client side counterpart should
- * have been cancelled. It's okay to cancel it now as we've
- * held mot_open_sem. */
- ldlm_lock_cancel(lease);
+ if (ma->ma_valid & MA_LMV) {
+ rc = mdt_lock_remote_slaves(info, obj, ma,
+ slave_locks);
+ if (rc)
+ mdt_object_unlock(info, obj, lh, rc);
+ }
+ }
+ } else {
+ if (mdt_object_remote(pobj)) {
+ rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
+ if (rc)
+ return rc;
+ }
- if (lease_broken)
- GOTO(out_lease, rc = -EAGAIN);
-out_lease:
- rc = mdt_close_internal(info, mdt_info_req(info), NULL);
- repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
- repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
- if (rc != 0)
- GOTO(out_unlock_list, rc);
+ rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
+ einfo, true);
}
- /* 4: lock of the object migrated object */
- lh_childp = &info->mti_lh[MDT_LH_OLD];
- mdt_lock_reg_init(lh_childp, LCK_EX);
- lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
- MDS_INODELOCK_LAYOUT;
- if (mdt_object_remote(msrcdir)) {
- /* Enqueue lookup lock from the parent MDT */
- rc = mdt_remote_object_lock(info, msrcdir, mdt_object_fid(mold),
- &lh_childp->mlh_rreg_lh,
- lh_childp->mlh_rreg_mode,
- MDS_INODELOCK_LOOKUP, false);
- if (rc != ELDLM_OK)
- GOTO(out_unlock_list, rc);
+ return rc;
+}
- lock_ibits &= ~MDS_INODELOCK_LOOKUP;
- }
+/*
+ * lookup source by name, if parent is striped directory, we need to find the
+ * corresponding stripe where source is located, and then lookup there.
+ *
+ * besides, if parent is migrating too, and file is already in target stripe,
+ * this should be a redo of 'lfs migrate' on client side.
+ */
+static int mdt_migrate_lookup(struct mdt_thread_info *info,
+ struct mdt_object *pobj,
+ const struct md_attr *ma,
+ const struct lu_name *lname,
+ struct mdt_object **spobj,
+ struct mdt_object **sobj)
+{
+ const struct lu_env *env = info->mti_env;
+ struct lu_fid *fid = &info->mti_tmp_fid1;
+ struct mdt_object *stripe;
+ int rc;
- rc = mdt_reint_object_lock(info, mold, lh_childp, lock_ibits, true);
- if (rc != 0)
- GOTO(out_unlock_child, rc);
+ if (ma->ma_valid & MA_LMV) {
+ /* if parent is striped, lookup on corresponding stripe */
+ struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+ __u32 hash_type = le32_to_cpu(lmv->lmv_hash_type);
+ __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+ bool is_migrating = le32_to_cpu(lmv->lmv_hash_type) &
+ LMV_HASH_FLAG_MIGRATION;
+
+ if (is_migrating) {
+ hash_type = le32_to_cpu(lmv->lmv_migrate_hash);
+ stripe_count -= le32_to_cpu(lmv->lmv_migrate_offset);
+ }
- /* Migration is incompatible with HSM. */
- ma->ma_need = MA_HSM;
- ma->ma_valid = 0;
- rc = mdt_attr_get_complex(info, mold, ma);
- if (rc != 0)
- GOTO(out_unlock_child, rc);
-
- if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0) {
- rc = -ENOSYS;
- CDEBUG(D_OTHER,
- "%s: cannot migrate HSM archived file "DFID": rc = %d\n",
- mdt_obd_name(info->mti_mdt), PFID(old_fid), rc);
- GOTO(out_unlock_child, rc);
- }
+ rc = lmv_name_to_stripe_index(hash_type, stripe_count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (rc < 0)
+ return rc;
- ma->ma_need = MA_LMV;
- ma->ma_valid = 0;
- ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
- ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
- rc = mdt_stripe_get(info, mold, ma, XATTR_NAME_LMV);
- if (rc != 0)
- GOTO(out_unlock_child, rc);
-
- if ((ma->ma_valid & MA_LMV)) {
- struct lmv_mds_md_v1 *lmm1;
-
- lmv_le_to_cpu(ma->ma_lmv, ma->ma_lmv);
- lmm1 = &ma->ma_lmv->lmv_md_v1;
- if (!(lmm1->lmv_hash_type & LMV_HASH_FLAG_MIGRATION)) {
- CDEBUG(D_OTHER, "%s: can not migrate striped dir "DFID
- ": rc = %d\n", mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(mold)), -EPERM);
- GOTO(out_unlock_child, rc = -EPERM);
- }
+ if (le32_to_cpu(lmv->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
+ rc += le32_to_cpu(lmv->lmv_migrate_offset);
+
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+
+ stripe = mdt_object_find(env, info->mti_mdt, fid);
+ if (IS_ERR(stripe))
+ return PTR_ERR(stripe);
+
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
+ &info->mti_spec);
+ if (rc == -ENOENT && is_migrating) {
+ /*
+ * if parent is migrating, and lookup child failed on
+ * source stripe, lookup again on target stripe, if it
+ * exists, it means previous migration was interrupted,
+ * and current file was migrated already.
+ */
+ mdt_object_put(env, stripe);
+
+ hash_type = le32_to_cpu(lmv->lmv_hash_type);
+ stripe_count = le32_to_cpu(lmv->lmv_migrate_offset);
+
+ rc = lmv_name_to_stripe_index(hash_type, stripe_count,
+ lname->ln_name,
+ lname->ln_namelen);
+ if (rc < 0)
+ return rc;
- if (!fid_is_sane(&lmm1->lmv_stripe_fids[1]))
- GOTO(out_unlock_child, rc = -EINVAL);
+ fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
- mnew = mdt_object_find(info->mti_env, info->mti_mdt,
- &lmm1->lmv_stripe_fids[1]);
- if (IS_ERR(mnew))
- GOTO(out_unlock_child, rc = PTR_ERR(mnew));
-
- if (!mdt_object_remote(mnew)) {
- CDEBUG(D_OTHER,
- "%s: "DFID" being migrated is on this MDT:"
- " rc = %d\n", mdt_obd_name(info->mti_mdt),
- PFID(rr->rr_fid2), -EPERM);
- GOTO(out_put_new, rc = -EPERM);
- }
+ stripe = mdt_object_find(env, info->mti_mdt, fid);
+ if (IS_ERR(stripe))
+ return PTR_ERR(stripe);
- lh_tgtp = &info->mti_lh[MDT_LH_CHILD];
- mdt_lock_reg_init(lh_tgtp, LCK_EX);
- rc = mdt_remote_object_lock(info, mnew,
- mdt_object_fid(mnew),
- &lh_tgtp->mlh_rreg_lh,
- lh_tgtp->mlh_rreg_mode,
- MDS_INODELOCK_UPDATE, false);
- if (rc != 0) {
- lh_tgtp = NULL;
- GOTO(out_put_new, rc);
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(stripe), lname,
+ fid, &info->mti_spec);
+ mdt_object_put(env, stripe);
+ return rc ?: -EALREADY;
+ } else if (rc) {
+ mdt_object_put(env, stripe);
+ return rc;
}
} else {
- mnew = mdt_object_find(info->mti_env, info->mti_mdt,
- rr->rr_fid2);
- if (IS_ERR(mnew))
- GOTO(out_unlock_child, rc = PTR_ERR(mnew));
- if (!mdt_object_remote(mnew)) {
- CDEBUG(D_OTHER, "%s: Migration "DFID" is on this MDT:"
- " rc = %d\n", mdt_obd_name(info->mti_mdt),
- PFID(rr->rr_fid2), -EXDEV);
- GOTO(out_put_new, rc = -EXDEV);
- }
+ fid_zero(fid);
+ rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
+ &info->mti_spec);
+ if (rc)
+ return rc;
+
+ stripe = pobj;
+ mdt_object_get(env, stripe);
}
- /* 5: migrate it */
- mdt_reint_init_ma(info, ma);
+ *spobj = stripe;
- mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
- OBD_FAIL_MDS_REINT_RENAME_WRITE);
+ *sobj = mdt_object_find(env, info->mti_mdt, fid);
+ if (IS_ERR(*sobj)) {
+ mdt_object_put(env, stripe);
+ rc = PTR_ERR(*sobj);
+ *spobj = NULL;
+ *sobj = NULL;
+ }
- rc = mdo_migrate(info->mti_env, mdt_object_child(msrcdir),
- mdt_object_child(mold), &rr->rr_name,
- mdt_object_child(mnew), ma);
- if (rc != 0)
- GOTO(out_unlock_new, rc);
+ return rc;
+}
-out_unlock_new:
- if (lh_tgtp != NULL)
- mdt_object_unlock(info, mnew, lh_tgtp, rc);
-out_put_new:
- if (mnew)
- mdt_object_put(info->mti_env, mnew);
-out_unlock_child:
- mdt_object_unlock(info, mold, lh_childp, rc);
-out_unlock_list:
- /* we don't really modify linkea objects, so we can safely decref these
- * locks, and this can avoid saving them as COS locks, which may prevent
- * subsequent migrate. */
- mdt_unlock_list(info, &lock_list, 1);
- if (lease != NULL) {
- ldlm_reprocess_all(lease->l_resource);
- LDLM_LOCK_PUT(lease);
+/* end lease and close file for regular file */
+static int mdd_migrate_close(struct mdt_thread_info *info,
+ struct mdt_object *obj)
+{
+ struct close_data *data;
+ struct mdt_body *repbody;
+ struct ldlm_lock *lease;
+ int rc;
+ int rc2;
+
+ rc = -EPROTO;
+ if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
+ RCL_CLIENT) ||
+ !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
+ RCL_CLIENT))
+ goto close;
+
+ data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
+ if (!data)
+ goto close;
+
+ rc = -ESTALE;
+ lease = ldlm_handle2lock(&data->cd_handle);
+ if (!lease)
+ goto close;
+
+ /* check if the lease was already canceled */
+ lock_res_and_lock(lease);
+ rc = ldlm_is_cancel(lease);
+ unlock_res_and_lock(lease);
+
+ if (rc) {
+ rc = -EAGAIN;
+ LDLM_DEBUG(lease, DFID" lease broken",
+ PFID(mdt_object_fid(obj)));
}
- if (lock_open_sem)
- up_write(&mold->mot_open_sem);
-out_put_child:
- mdt_object_put(info->mti_env, mold);
-out_unlock_parent:
- mdt_object_unlock(info, msrcdir, lh_dirp, rc);
-out_put_parent:
- mdt_object_put(info->mti_env, msrcdir);
+ /*
+ * cancel server side lease, client side counterpart should have been
+ * cancelled, it's okay to cancel it now as we've held mot_open_sem.
+ */
+ ldlm_lock_cancel(lease);
+ ldlm_reprocess_all(lease->l_resource);
+ LDLM_LOCK_PUT(lease);
- RETURN(rc);
+close:
+ rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
+
+ return rc ?: rc2;
}
-static struct mdt_object *mdt_object_find_check(struct mdt_thread_info *info,
- const struct lu_fid *fid,
- int idx)
+/*
+ * migrate file in below steps:
+ * 1. lock parent and its stripes
+ * 2. lookup source by name
+ * 3. lock parents of source links if source is not directory
+ * 4. reject if source is in HSM
+ * 5. take source open_sem and close file if source is regular file
+ * 6. lock source and its stripes if it's directory
+ * 7. lock target so subsequent change to it can trigger COS
+ * 8. migrate file
+ * 9. unlock above locks
+ * 10. sync device if source has links
+ */
+static int mdt_reint_migrate_internal(struct mdt_thread_info *info)
{
- struct mdt_object *dir;
+ const struct lu_env *env = info->mti_env;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct md_attr *ma = &info->mti_attr;
+ struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
+ struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
+ struct mdt_object *pobj;
+ struct mdt_object *spobj = NULL;
+ struct mdt_object *sobj = NULL;
+ struct mdt_object *tobj;
+ struct mdt_lock_handle *lhp;
+ struct mdt_lock_handle *lhs;
+ struct mdt_lock_handle *lht;
+ LIST_HEAD(parent_slave_locks);
+ LIST_HEAD(child_slave_locks);
+ LIST_HEAD(link_locks);
+ bool open_sem_locked = false;
+ bool do_sync = false;
int rc;
ENTRY;
- dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
- if (IS_ERR(dir))
- RETURN(dir);
+ CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
+ PNAME(&rr->rr_name), PFID(rr->rr_fid2));
- /* check early, the real version will be saved after locking */
- rc = mdt_version_get_check(info, dir, idx);
+ /* don't allow migrate . or .. */
+ if (lu_name_is_dot_or_dotdot(&rr->rr_name))
+ RETURN(-EBUSY);
+
+ rc = mdt_remote_permission(info);
if (rc)
- GOTO(out_put, rc);
+ RETURN(rc);
- RETURN(dir);
-out_put:
- mdt_object_put(info->mti_env, dir);
- return ERR_PTR(rc);
+ /* pobj is master object of parent */
+ pobj = mdt_object_find_check(info, rr->rr_fid1, 0);
+ if (IS_ERR(pobj))
+ RETURN(PTR_ERR(pobj));
+
+ if (unlikely(!info->mti_big_lmm)) {
+ info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
+ OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
+ if (!info->mti_big_lmm)
+ GOTO(put_parent, rc = -ENOMEM);
+ }
+
+ ma->ma_lmv = info->mti_big_lmm;
+ ma->ma_lmv_size = info->mti_big_lmmsize;
+ ma->ma_valid = 0;
+ rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
+ if (rc)
+ GOTO(put_parent, rc);
+
+ /* lock parent object */
+ lhp = &info->mti_lh[MDT_LH_PARENT];
+ mdt_lock_reg_init(lhp, LCK_PW);
+ rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
+ &parent_slave_locks);
+ if (rc)
+ GOTO(put_parent, rc);
+
+ /*
+ * spobj is the corresponding stripe against name if pobj is striped
+ * directory, which is the real parent, and no need to lock, because
+ * we've taken full lock of pobj.
+ */
+ rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
+ if (rc)
+ GOTO(unlock_parent, rc);
+
+ /* lock parents of source links, and revoke LOOKUP lock of links */
+ rc = mdt_lock_links(info, pobj, ma, sobj, &link_locks);
+ if (rc < 0)
+ GOTO(put_source, rc);
+
+ /*
+ * RS_MAX_LOCKS is the limit of number of locks that can be saved along
+ * with one request, if total lock count exceeds this limit, we will
+ * drop all locks after migration, and synchronous device in the end.
+ */
+ do_sync = rc;
+
+ /* if migration HSM is allowed */
+ if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
+ ma->ma_need = MA_HSM;
+ ma->ma_valid = 0;
+ rc = mdt_attr_get_complex(info, sobj, ma);
+ if (rc)
+ GOTO(unlock_links, rc);
+
+ if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
+ GOTO(unlock_links, rc = -EOPNOTSUPP);
+ }
+
+ /* end lease and close file for regular file */
+ if (info->mti_spec.sp_migrate_close) {
+ /* try to hold open_sem so that nobody else can open the file */
+ if (!down_write_trylock(&sobj->mot_open_sem)) {
+ /* close anyway */
+ mdd_migrate_close(info, sobj);
+ GOTO(unlock_links, rc = -EBUSY);
+ } else {
+ open_sem_locked = true;
+ rc = mdd_migrate_close(info, sobj);
+ if (rc)
+ GOTO(unlock_open_sem, rc);
+ }
+ }
+
+ /* lock source */
+ lhs = &info->mti_lh[MDT_LH_OLD];
+ mdt_lock_reg_init(lhs, LCK_EX);
+ rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
+ &child_slave_locks);
+ if (rc)
+ GOTO(unlock_open_sem, rc);
+
+ /* lock target */
+ tobj = mdt_object_find(env, mdt, rr->rr_fid2);
+ if (IS_ERR(tobj))
+ GOTO(unlock_source, rc = PTR_ERR(tobj));
+
+ lht = &info->mti_lh[MDT_LH_NEW];
+ mdt_lock_reg_init(lht, LCK_EX);
+ rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
+ if (rc)
+ GOTO(put_target, rc);
+
+ /* Don't do lookup sanity check. We know name doesn't exist. */
+ info->mti_spec.sp_cr_lookup = 0;
+ info->mti_spec.sp_feat = &dt_directory_features;
+
+ rc = mdo_migrate(env, mdt_object_child(pobj),
+ mdt_object_child(sobj), &rr->rr_name,
+ mdt_object_child(tobj), &info->mti_spec, ma);
+ EXIT;
+
+ mdt_object_unlock(info, tobj, lht, rc);
+put_target:
+ mdt_object_put(env, tobj);
+unlock_source:
+ mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
+ &child_slave_locks, rc);
+unlock_open_sem:
+ if (open_sem_locked)
+ up_write(&sobj->mot_open_sem);
+unlock_links:
+ mdt_unlock_list(info, &link_locks, rc);
+put_source:
+ mdt_object_put(env, sobj);
+ mdt_object_put(env, spobj);
+unlock_parent:
+ mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
+ &parent_slave_locks, rc);
+put_parent:
+ mdt_object_put(env, pobj);
+
+ if (!rc && do_sync)
+ mdt_device_sync(env, mdt);
+
+ return rc;
}
static int mdt_object_lock_save(struct mdt_thread_info *info,
if (!req_is_replay(req)) {
rc = mdt_rename_lock(info, &rename_lh);
if (rc != 0) {
- CERROR("%s: can't lock FS for rename: rc = %d\n",
+ CERROR("%s: can't lock FS for rename: rc = %d\n",
mdt_obd_name(info->mti_mdt), rc);
RETURN(rc);
}
if (rename)
rc = mdt_reint_rename_internal(info, lhc);
else
- rc = mdt_reint_migrate_internal(info, lhc);
+ rc = mdt_reint_migrate_internal(info);
if (lustre_handle_is_used(&rename_lh))
mdt_rename_unlock(&rename_lh);
osd_trans_declare_op(env, oh, OSD_OT_INSERT,
osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
+ /* will help to find FID->ino mapping at dt_insert() */
+ rc = osd_idc_find_and_init(env, osd_obj2dev(osd_dt_obj(dt)),
+ osd_dt_obj(dt));
+ if (rc != 0)
+ RETURN(rc);
+
if (!attr)
RETURN(0);
if (rc != 0)
RETURN(rc);
- /* will help to find FID->ino mapping at dt_insert() */
- rc = osd_idc_find_and_init(env, osd_obj2dev(osd_dt_obj(dt)),
- osd_dt_obj(dt));
-
RETURN(rc);
}
idc = osd_idc_find(env, osd, fid);
if (unlikely(idc == NULL)) {
- /*
- * this dt_insert() wasn't declared properly, so
- * FID is missing in OI cache. we better do not
- * lookup FID in FLDB/OI and don't risk to deadlock,
- * but in some special cases (lfsck testing, etc)
- * it's much simpler than fixing a caller
- */
- CERROR("%s: "DFID" wasn't declared for insert\n",
- osd_name(osd), PFID(fid));
- dump_stack();
idc = osd_idc_find_or_init(env, osd, fid);
- if (IS_ERR(idc))
+ if (IS_ERR(idc)) {
+ /*
+ * this dt_insert() wasn't declared properly, so
+ * FID is missing in OI cache. we better do not
+ * lookup FID in FLDB/OI and don't risk to deadlock,
+ * but in some special cases (lfsck testing, etc)
+ * it's much simpler than fixing a caller.
+ *
+ * normally this error should be placed after the first
+ * find, but migrate may attach source stripes to
+ * target, which doesn't create stripes.
+ */
+ CERROR("%s: "DFID" wasn't declared for insert\n",
+ osd_name(osd), PFID(fid));
+ dump_stack();
RETURN(PTR_ERR(idc));
+ }
}
if (idc->oic_remote) {
* lookup FID in FLDB/OI and don't risk to deadlock,
* but in some special cases (lfsck testing, etc)
* it's much simpler than fixing a caller */
- CERROR("%s: "DFID" wasn't declared for insert\n",
- osd_name(osd), PFID(fid));
idc = osd_idc_find_or_init(env, osd, fid);
- if (IS_ERR(idc))
+ if (IS_ERR(idc)) {
+ CERROR("%s: "DFID" wasn't declared for insert\n",
+ osd_name(osd), PFID(fid));
RETURN(PTR_ERR(idc));
+ }
}
CLASSERT(sizeof(zde->lzd_reg) == 8);
return rc;
rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
- if (rc == 0)
+ if (rc)
+ return rc;
+
+ /*
+ * only migrate delete LMV, and it needs to be done immediately, because
+ * it's used in deleting sub stripes, and if this is delayed, later when
+ * destroying the master object, it will delete sub stripes again.
+ */
+ if (!strcmp(name, XATTR_NAME_LMV))
+ rc = __osd_sa_xattr_update(env, obj, oh);
+ else
rc = __osd_sa_xattr_schedule_update(env, obj, oh);
return rc;
}
(long long)(int)offsetof(struct lmv_mds_md_v1, lmv_layout_version));
LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_layout_version) == 4, "found %lld\n",
(long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_layout_version));
- LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding1) == 20, "found %lld\n",
- (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding1));
- LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding1) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding1));
- LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding2) == 24, "found %lld\n",
+ LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_migrate_offset) == 20, "found %lld\n",
+ (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_migrate_offset));
+ LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_offset) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_offset));
+ LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_migrate_hash) == 24, "found %lld\n",
+ (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_migrate_hash));
+ LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_hash) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_hash));
+ LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding2) == 28, "found %lld\n",
(long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding2));
- LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2) == 8, "found %lld\n",
+ LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2) == 4, "found %lld\n",
(long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2));
LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding3) == 32, "found %lld\n",
(long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding3));
RACERDIRS=${RACERDIRS:-"$DIR $DIR2"}
echo RACERDIRS=$RACERDIRS
+#LU-4684
+RACER_ENABLE_MIGRATION=false
+
if ((MDSCOUNT > 1 &&
$(lustre_version_code $SINGLEMDS) >= $(version_code 2.8.0))); then
RACER_ENABLE_REMOTE_DIRS=${RACER_ENABLE_REMOTE_DIRS:-true}
run_test 110f "remove remote directory: drop slave rep"
test_110g () {
- [[ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.6.57) ]] ||
- { skip "Need MDS version at least 2.6.57"; return 0; }
+ [[ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.11.0) ]] ||
+ { skip "Need MDS version at least 2.11.0"; return 0; }
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
- local remote_dir=$DIR/$tdir/remote_dir
- local mdtidx=1
-
- mkdir -p $remote_dir
-
- createmany -o $remote_dir/f 100
- #define OBD_FAIL_MIGRATE_NET_REP 0x1800
- do_facet mds$mdtidx lctl set_param fail_loc=0x1800
- $LFS migrate -m $mdtidx $remote_dir || error "migrate failed"
- do_facet mds$mdtidx lctl set_param fail_loc=0x0
+ mkdir -p $DIR/$tdir
+ touch $DIR/$tdir/$tfile
- for file in $(find $remote_dir); do
- mdt_index=$($LFS getstripe -m $file)
- [ $mdt_index == $mdtidx ] ||
- error "$file is not on MDT${mdtidx}"
- done
+ # OBD_FAIL_MDS_REINT_NET_REP 0x119
+ do_facet mds1 $LCTL set_param fail_loc=0x119
+ $LFS migrate -m 1 $DIR/$tdir &
+ migrate_pid=$!
+ sleep 5
+ do_facet mds1 $LCTL set_param fail_loc=0
+ wait $migrate_pid
+
+ local mdt_index
+ mdt_index=$($LFS getstripe -m $DIR/$tdir)
+ [ $mdt_index == 1 ] || error "$tdir is not on MDT1"
+ mdt_index=$($LFS getstripe -m $DIR/$tdir/$tfile)
+ [ $mdt_index == 1 ] || error "$tfile is not on MDT1"
rm -rf $DIR/$tdir || error "rmdir failed"
}
cancel_lru_locks mdc
if [ $MDSCOUNT -ge 2 ]; then
- $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null ||
- error "(3.1) Migrate failure"
+ $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null &&
+ error "(3.1) Migrate should fail"
echo "The object with linkEA overflow should NOT be migrated"
local newfid=$($LFS path2fid $DIR/$tdir/guard/f0)
unlinkmany $DIR/$tdir/foo/ttttttttttt 100 || error "(4) Fail to unlink"
if [ $MDSCOUNT -ge 2 ]; then
- $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null ||
- error "(5.1) Migrate failure"
+ $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null &&
+ error "(5.1) Migrate should fail"
# The overflow timestamp is still there, so migration will fail.
local newfid=$($LFS path2fid $DIR/$tdir/guard/f0)
test_230a() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
[ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+ skip "Need MDS version at least 2.11.52"
local MDTIDX=1
test_230b() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
[ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+ skip "Need MDS version at least 2.11.52"
local MDTIDX=1
local mdt_index
run_test 230b "migrate directory"
test_230c() {
- [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
[ $PARALLEL == "yes" ] && skip "skip parallel run"
+ [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
remote_mds_nodsh && skip "remote MDS with nodsh"
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+ skip "Need MDS version at least 2.11.52"
local MDTIDX=1
local mdt_index
createmany -o $migrate_dir/f 10 ||
error "create files under ${migrate_dir} failed"
- #failed after migrating 5 entries
+ # fail after migrating top dir, and this will fail only once, so one
+ # sub file migration will fail, others succeed.
#OBD_FAIL_MIGRATE_ENTRIES 0x1801
- do_facet mds1 lctl set_param fail_loc=0x20001801
- do_facet mds1 lctl set_param fail_val=5
+ do_facet mds1 lctl set_param fail_loc=0x1801
local t=$(ls $migrate_dir | wc -l)
$LFS migrate --mdt-index $MDTIDX $migrate_dir &&
- error "migrate should fail after 5 entries"
-
- mkdir $migrate_dir/dir &&
- error "mkdir succeeds under migrating directory"
- touch $migrate_dir/file &&
- error "touch file succeeds under migrating directory"
+ error "migrate should fail"
+
+ # add new dir/file should succeed
+ mkdir $migrate_dir/dir ||
+ error "mkdir failed under migrating directory"
+ touch $migrate_dir/file ||
+ error "touch file failed under migrating directory"
+ # add file with existing name should fail
+ $OPENFILE -f O_CREAT:O_EXCL $migrate_dir/f1 &&
+ error "open(O_CREAT|O_EXCL) f1 should fail"
+ $MULTIOP $migrate_dir/f1 m &&
+ error "create f1 should fail"
+ $MULTIOP $migrate_dir/f3 m &&
+ error "create f3 should fail"
local u=$(ls $migrate_dir | wc -l)
+ u=$((u - 2))
[ "$u" == "$t" ] || error "$u != $t during migration"
for file in $(find $migrate_dir); do
stat $file || error "stat $file failed"
done
- do_facet mds1 lctl set_param fail_loc=0
- do_facet mds1 lctl set_param fail_val=0
+ # resume migration with different options should fail
+ $LFS migrate -m 0 $migrate_dir &&
+ error "migrate -m 0 $migrate_dir should fail"
+
+ $LFS migrate -m $MDTIDX -c 2 $migrate_dir &&
+ error "migrate -c 2 $migrate_dir should fail"
+ # resume migration should succeed
$LFS migrate -m $MDTIDX $migrate_dir ||
- error "migrate open files should failed with open files"
+ error "migrate $migrate_dir failed"
echo "Finish migration, then checking.."
for file in $(find $migrate_dir); do
rm -rf $DIR/$tdir || error "rm dir failed after migration"
}
-run_test 230c "check directory accessiblity if migration is failed"
+run_test 230c "check directory accessiblity if migration failed"
test_230d() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
[ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+ skip "Need MDS version at least 2.11.52"
- local MDTIDX=1
- local mdt_index
local migrate_dir=$DIR/$tdir/migrate_dir
+ local old_index
+ local new_index
+ local old_count
+ local new_count
+ local new_hash
+ local mdt_index
local i
local j
+ old_index=$((RANDOM % MDSCOUNT))
+ old_count=$((MDSCOUNT - old_index))
+ new_index=$((RANDOM % MDSCOUNT))
+ new_count=$((MDSCOUNT - new_index))
+ new_hash="all_char"
+
+ [ $old_count -gt 1 ] && old_count=$((old_count - RANDOM % old_count))
+ [ $new_count -gt 1 ] && new_count=$((new_count - RANDOM % new_count))
+
test_mkdir $DIR/$tdir
- test_mkdir -i0 -c1 $migrate_dir
+ test_mkdir -i $old_index -c $old_count $migrate_dir
for ((i=0; i<100; i++)); do
test_mkdir -i0 -c1 $migrate_dir/dir_${i}
error "create files under remote dir failed $i"
done
- $LFS migrate -m $MDTIDX $migrate_dir ||
+ echo -n "Migrate from MDT$old_index "
+ [ $old_count -gt 1 ] && echo -n "... MDT$((old_index + old_count - 1)) "
+ echo -n "to MDT$new_index"
+ [ $new_count -gt 1 ] && echo -n " ... MDT$((new_index + new_count - 1))"
+ echo
+
+ echo "$LFS migrate -m$new_index -c$new_count -H $new_hash $migrate_dir"
+ $LFS migrate -m $new_index -c $new_count -H $new_hash $migrate_dir ||
error "migrate remote dir error"
echo "Finish migration, then checking.."
for file in $(find $migrate_dir); do
mdt_index=$($LFS getstripe -m $file)
- [ $mdt_index == $MDTIDX ] ||
- error "$file is not on MDT${MDTIDX}"
+ if [ $mdt_index -lt $new_index ] ||
+ [ $mdt_index -gt $((new_index + new_count - 1)) ]; then
+ error "$file is on MDT$mdt_index"
+ fi
done
rm -rf $DIR/$tdir || error "rm dir failed after migration"
test_230e() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
[ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+ skip "Need MDS version at least 2.11.52"
local i
local j
test_230f() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
[ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+ skip "Need MDS version at least 2.11.52"
local a_fid
local ln_fid
test_230g() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
[ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+ skip "Need MDS version at least 2.11.52"
mkdir -p $DIR/$tdir/migrate_dir
test_230h() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
[ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
- [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.64) ] &&
- skip "Need MDS version at least 2.7.64"
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+ skip "Need MDS version at least 2.11.52"
local mdt_index
$LFS migrate -m1 $DIR/$tdir/.. &&
error "migrating mountpoint2 should fail"
- $LFS migrate -m1 $DIR/$tdir/migrate_dir/.. ||
- error "migrating $tdir fail"
-
- mdt_index=$($LFS getstripe -m $DIR/$tdir)
- [ $mdt_index == 1 ] || error "$mdt_index != 1 after migration"
-
- mdt_index=$($LFS getstripe -m $DIR/$tdir/migrate_dir)
- [ $mdt_index == 1 ] || error "$mdt_index != 1 after migration"
+ # same as mv
+ $LFS migrate -m1 $DIR/$tdir/migrate_dir/.. &&
+ error "migrating $tdir/migrate_dir/.. should fail"
+ true
}
run_test 230h "migrate .. and root"
test_230i() {
[ $PARALLEL == "yes" ] && skip "skip parallel run"
[ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+ skip "Need MDS version at least 2.11.52"
mkdir -p $DIR/$tdir/migrate_dir
#define OBD_FAIL_LARGE_STRIPE 0x1703
$LCTL set_param fail_loc=0x1703
- $LFS setdirstripe -i 0 -c512 $DIR/$tdir/striped_dir ||
+ $LFS setdirstripe -i 0 -c192 $DIR/$tdir/striped_dir ||
error "set striped dir error"
$LCTL set_param fail_loc=0
rc = 0;
if (sem_init) {
rc = sem_init(path, d, NULL, data, dent);
- if (rc < 0 && ret == 0)
+ if (rc < 0 && ret == 0) {
ret = rc;
+ break;
+ }
}
if (sem_fini && rc == 0)
sem_fini(path, d, NULL, data, dent);
sync();
retry = true;
goto migrate;
+ } else if (errno == EALREADY) {
+ if (param->fp_verbose & VERBOSE_DETAIL)
+ fprintf(stdout,
+ "%s was migrated to MDT%d already\n",
+ path, lmu->lum_stripe_offset);
+ ret = 0;
+ } else {
+ ret = -errno;
+ fprintf(stderr, "%s migrate failed: %s (%d)\n",
+ path, strerror(-ret), ret);
+ goto out;
}
- ret = -errno;
- fprintf(stderr, "%s migrate failed: %s (%d)\n",
- path, strerror(-ret), ret);
- goto out;
} else if (param->fp_verbose & VERBOSE_DETAIL) {
fprintf(stdout, "migrate %s to MDT%d stripe count %d\n",
path, lmu->lum_stripe_offset, lmu->lum_stripe_count);
CHECK_MEMBER(lmv_mds_md_v1, lmv_master_mdt_index);
CHECK_MEMBER(lmv_mds_md_v1, lmv_hash_type);
CHECK_MEMBER(lmv_mds_md_v1, lmv_layout_version);
- CHECK_MEMBER(lmv_mds_md_v1, lmv_padding1);
+ CHECK_MEMBER(lmv_mds_md_v1, lmv_migrate_offset);
+ CHECK_MEMBER(lmv_mds_md_v1, lmv_migrate_hash);
CHECK_MEMBER(lmv_mds_md_v1, lmv_padding2);
CHECK_MEMBER(lmv_mds_md_v1, lmv_padding3);
CHECK_MEMBER(lmv_mds_md_v1, lmv_pool_name[LOV_MAXPOOLNAME]);
(long long)(int)offsetof(struct lmv_mds_md_v1, lmv_layout_version));
LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_layout_version) == 4, "found %lld\n",
(long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_layout_version));
- LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding1) == 20, "found %lld\n",
- (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding1));
- LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding1) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding1));
- LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding2) == 24, "found %lld\n",
+ LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_migrate_offset) == 20, "found %lld\n",
+ (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_migrate_offset));
+ LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_offset) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_offset));
+ LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_migrate_hash) == 24, "found %lld\n",
+ (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_migrate_hash));
+ LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_hash) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_hash));
+ LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding2) == 28, "found %lld\n",
(long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding2));
- LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2) == 8, "found %lld\n",
+ LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2) == 4, "found %lld\n",
(long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2));
LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding3) == 32, "found %lld\n",
(long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding3));