if (IS_ERR(th))
RETURN(PTR_ERR(th));
+ /* Store ranges in le format. */
+ range_cpu_to_le(&info->sti_space, &seq->lss_space);
+
rc = dt_declare_record_write(env, seq->lss_obj,
- sizeof(struct lu_seq_range), 0, th);
+ seq_store_buf(info), 0, th);
if (rc)
GOTO(exit, rc);
if (rc)
GOTO(exit, rc);
- /* Store ranges in le format. */
- range_cpu_to_le(&info->sti_space, &seq->lss_space);
-
rc = dt_record_write(env, seq->lss_obj, seq_store_buf(info), &pos, th);
if (rc) {
CERROR("%s: Can't write space data, rc %d\n",
ssize_t (*dbo_read)(const struct lu_env *env, struct dt_object *dt,
struct lu_buf *buf, loff_t *pos,
struct lustre_capa *capa);
- /**
- * precondition: dt_object_exists(dt);
- */
- ssize_t (*dbo_declare_write)(const struct lu_env *env,
- struct dt_object *dt,
- const loff_t size, loff_t pos,
- struct thandle *handle);
- ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
- const struct lu_buf *buf, loff_t *pos,
- struct thandle *handle, struct lustre_capa *capa,
- int ignore_quota);
+ /**
+ * precondition: dt_object_exists(dt);
+ */
+ ssize_t (*dbo_declare_write)(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf, loff_t pos,
+ struct thandle *handle);
+ ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, loff_t *pos,
+ struct thandle *handle, struct lustre_capa *capa,
+ int ignore_quota);
/*
* methods for zero-copy IO
*/
static inline int dt_declare_record_write(const struct lu_env *env,
- struct dt_object *dt,
- int size, loff_t pos,
- struct thandle *th)
-{
- int rc;
-
- LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
- LASSERT(th != NULL);
- LASSERT(dt->do_body_ops);
- LASSERT(dt->do_body_ops->dbo_declare_write);
- rc = dt->do_body_ops->dbo_declare_write(env, dt, size, pos, th);
- return rc;
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ loff_t pos,
+ struct thandle *th)
+{
+ int rc;
+
+ LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
+ LASSERT(th != NULL);
+ LASSERT(dt->do_body_ops);
+ LASSERT(dt->do_body_ops->dbo_declare_write);
+ rc = dt->do_body_ops->dbo_declare_write(env, dt, buf, pos, th);
+ return rc;
}
static inline int dt_declare_create(const struct lu_env *env,
wait_queue_head_t ccc_unstable_waitq; /* Signaled on BRW commit */
};
+enum {
+ LUSTRE_OPC_MKDIR = 0,
+ LUSTRE_OPC_SYMLINK = 1,
+ LUSTRE_OPC_MKNOD = 2,
+ LUSTRE_OPC_CREATE = 3,
+ LUSTRE_OPC_ANY = 5
+};
+
enum op_cli_flags {
CLI_SET_MEA = 1 << 0,
CLI_RM_ENTRY = 1 << 1,
CLI_HASH64 = 1 << 2,
CLI_API32 = 1 << 3,
+ CLI_MIGRATE = 1 << 4,
};
#endif /*LCLIENT_H */
} d;
};
+static inline int it_disposition(const struct lookup_intent *it, int flag)
+{
+ return it->d.lustre.it_disposition & flag;
+}
+
+static inline void it_set_disposition(struct lookup_intent *it, int flag)
+{
+ it->d.lustre.it_disposition |= flag;
+}
+
+static inline void it_clear_disposition(struct lookup_intent *it, int flag)
+{
+ it->d.lustre.it_disposition &= ~flag;
+}
+
static inline void intent_init(struct lookup_intent *it, int op, int flags)
{
memset(it, 0, sizeof(*it));
} d;
};
+static inline int it_disposition(const struct lookup_intent *it, int flag)
+{
+ return it->d.lustre.it_disposition & flag;
+}
+
+static inline void it_set_disposition(struct lookup_intent *it, int flag)
+{
+ it->d.lustre.it_disposition |= flag;
+}
+
+static inline void it_clear_disposition(struct lookup_intent *it, int flag)
+{
+ it->d.lustre.it_disposition &= ~flag;
+}
+
#endif
};
enum lu_xattr_flags {
- LU_XATTR_REPLACE = (1 << 0),
- LU_XATTR_CREATE = (1 << 1)
+ LU_XATTR_REPLACE = (1 << 0),
+ LU_XATTR_CREATE = (1 << 1)
};
/** @} helpers */
#define LOV_MAGIC LOV_MAGIC_V1
#define LOV_MAGIC_JOIN_V1 0x0BD20BD0
#define LOV_MAGIC_V3 0x0BD30BD0
+#define LOV_MAGIC_MIGRATE 0x0BD40BD0
/*
* magic for fully defined striping
REINT_OPEN = 6,
REINT_SETXATTR = 7,
REINT_RMENTRY = 8,
-// REINT_WRITE = 9,
+ REINT_MIGRATE = 9,
REINT_MAX
} mds_reint_t, mdt_reint_t;
MDS_CREATE_VOLATILE = 1 << 10,
MDS_OWNEROVERRIDE = 1 << 11,
MDS_HSM_RELEASE = 1 << 12,
+ MDS_RENAME_MIGRATE = 1 << 13,
};
/* instance of mdt_reint_rec */
/* lmv structures */
#define LMV_MAGIC_V1 0x0CD10CD0 /* normal stripe lmv magic */
#define LMV_USER_MAGIC 0x0CD20CD0 /* default lmv magic*/
+#define LMV_MAGIC_MIGRATE 0x0CD30CD0 /* migrate stripe lmv magic */
#define LMV_MAGIC LMV_MAGIC_V1
enum lmv_hash_type {
LMV_HASH_TYPE_ALL_CHARS = 1,
LMV_HASH_TYPE_FNV_1A_64 = 2,
+ LMV_HASH_TYPE_MIGRATION = 3,
};
#define LMV_HASH_NAME_ALL_CHARS "all_char"
static inline int lmv_mds_md_size(int stripe_count, unsigned int lmm_magic)
{
switch (lmm_magic) {
- case LMV_MAGIC_V1: {
+ case LMV_MAGIC_V1:
+ case LMV_MAGIC_MIGRATE: {
struct lmv_mds_md_v1 *lmm1;
return sizeof(*lmm1) + stripe_count *
{
switch (le32_to_cpu(lmm->lmv_magic)) {
case LMV_MAGIC_V1:
+ case LMV_MAGIC_MIGRATE:
return le32_to_cpu(lmm->lmv_md_v1.lmv_stripe_count);
case LMV_USER_MAGIC:
return le32_to_cpu(lmm->lmv_user_md.lum_stripe_count);
{
switch (le32_to_cpu(lmm->lmv_magic)) {
case LMV_MAGIC_V1:
+ case LMV_MAGIC_MIGRATE:
lmm->lmv_md_v1.lmv_stripe_count = cpu_to_le32(stripe_count);
break;
case LMV_USER_MAGIC:
OUT_INDEX_LOOKUP = 9,
OUT_INDEX_INSERT = 10,
OUT_INDEX_DELETE = 11,
+ OUT_WRITE = 12,
OUT_LAST
};
#define LL_IOC_GET_LEASE _IO('f', 244)
#define LL_IOC_HSM_IMPORT _IOWR('f', 245, struct hsm_user_import)
#define LL_IOC_LMV_SET_DEFAULT_STRIPE _IOWR('f', 246, struct lmv_user_md)
+#define LL_IOC_MIGRATE _IOR('f', 247, int)
#define LL_STATFS_LMV 1
#define LL_STATFS_LOV 2
exclude_stripecount:1,
check_layout:1,
exclude_layout:1,
- get_default_lmv:1; /* Get default LMV */
+ get_default_lmv:1, /* Get default LMV */
+ migrate:1;
int verbose;
int quiet;
extern int llapi_search_rootpath(char *pathname, const char *fsname);
extern int llapi_nodemap_exists(const char *name);
+extern int llapi_mv(char *path, struct find_param *param);
struct mntent;
#define HAVE_LLAPI_IS_LUSTRE_MNT
fid->f_ver = 0;
}
+static inline bool fid_is_md_operative(const struct lu_fid *fid)
+{
+ return fid_is_mdt0(fid) || fid_is_igif(fid) ||
+ fid_is_norm(fid) || fid_is_root(fid);
+}
+
/* seq client type */
enum lu_cli_type {
LUSTRE_SEQ_METADATA = 1,
int linkea_init(struct linkea_data *ldata);
void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
struct lu_name *lname, struct lu_fid *pfid);
+int linkea_entry_pack(struct link_ea_entry *lee, const struct lu_name *lname,
+ const struct lu_fid *pfid);
int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
const struct lu_fid *pfid);
void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname);
{
switch (lmv_src->lmv_magic) {
case LMV_MAGIC_V1:
+ case LMV_MAGIC_MIGRATE: {
lmv1_cpu_to_le(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1);
break;
+ }
default:
break;
}
{
switch (le32_to_cpu(lmv_src->lmv_magic)) {
case LMV_MAGIC_V1:
+ case LMV_MAGIC_MIGRATE: {
lmv1_le_to_cpu(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1);
break;
+ }
default:
break;
}
}
+
#endif
};
/* mdc/mdc_locks.c */
-int it_disposition(struct lookup_intent *it, int flag);
+int it_disposition(const struct lookup_intent *it, int flag);
void it_clear_disposition(struct lookup_intent *it, int flag);
void it_set_disposition(struct lookup_intent *it, int flag);
int it_open_error(int phase, struct lookup_intent *it);
struct md_object *cobj, const struct lu_name *lname,
struct md_attr *ma, int no_name);
+ int (*mdo_migrate)(const struct lu_env *env, struct md_object *pobj,
+ const struct lu_fid *lf, const struct lu_name *lname,
+ struct md_object *tobj, struct md_attr *ma);
/** This method is used to compare a requested layout to an existing
* layout (struct lov_mds_md_v1/3 vs struct lov_mds_md_v1/3) */
int (*mdo_lum_lmm_cmp)(const struct lu_env *env,
ma);
}
+static inline int mdo_migrate(const struct lu_env *env,
+ struct md_object *pobj,
+ const struct lu_fid *lf,
+ const struct lu_name *lname,
+ struct md_object *tobj,
+ struct md_attr *ma)
+{
+ LASSERT(pobj->mo_dir_ops->mdo_migrate);
+ return pobj->mo_dir_ops->mdo_migrate(env, pobj, lf, lname, tobj, ma);
+}
+
static inline int mdo_is_subdir(const struct lu_env *env,
struct md_object *mo,
const struct lu_fid *fid,
/* Various operation flags. */
enum mds_op_bias op_bias;
- /* Operation type */
- __u32 op_opc;
-
/* Used by readdir */
__u64 op_hash_offset;
* Also, add a wrapper function in include/linux/obd_class.h. */
};
-enum {
- LUSTRE_OPC_MKDIR = (1 << 0),
- LUSTRE_OPC_SYMLINK = (1 << 1),
- LUSTRE_OPC_MKNOD = (1 << 2),
- LUSTRE_OPC_CREATE = (1 << 3),
- LUSTRE_OPC_ANY = (1 << 4)
-};
-
/* lmv structures */
struct lustre_md {
struct mdt_body *body;
#define OBD_FAIL_OUT_UPDATE_NET 0x1700
#define OBD_FAIL_OUT_UPDATE_NET_REP 0x1701
+/* MIGRATE */
+#define OBD_FAIL_MIGRATE_NET_REP 0x1800
+#define OBD_FAIL_MIGRATE_ENTRIES 0x1801
+#define OBD_FAIL_MIGRATE_LINKEA 0x1802
/* Assign references to moved code to reduce code changes */
#define OBD_FAIL_PRECHECK(id) CFS_FAIL_PRECHECK(id)
RETURN(rc);
}
- rc = dt_declare_record_write(env, obj, len, 0, handle);
+ rc = dt_declare_record_write(env, obj,
+ lfsck_buf_get(env,
+ &lfsck->li_bookmark_disk, len),
+ 0, handle);
if (rc != 0) {
CERROR("%s: fail to declare trans for storing lfsck_bookmark: "
"rc = %d\n", lfsck_lfsck2name(lfsck), rc);
RETURN(rc);
}
- rc = dt_declare_record_write(env, obj, size, pos, handle);
+ rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
+ pos, handle);
if (rc != 0) {
CERROR("%s: fail to declare trans for storing lfsck_layout(1): "
"rc = %d\n", lfsck_lfsck2name(lfsck), rc);
if (rc != 0)
GOTO(stop, rc);
- rc = dt_declare_record_write(env, obj, sizeof(lastid), pos, th);
+ rc = dt_declare_record_write(env, obj,
+ lfsck_buf_get(env, &lastid,
+ sizeof(lastid)),
+ pos, th);
if (rc != 0)
GOTO(stop, rc);
continue;
}
+ lastid = cpu_to_le64(lls->lls_lastid);
rc = dt_declare_record_write(env, lls->lls_lastid_obj,
- sizeof(lastid), pos, th);
+ lfsck_buf_get(env, &lastid,
+ sizeof(lastid)), pos, th);
if (rc != 0)
goto stop;
if (rc != 0)
goto stop;
- lastid = cpu_to_le64(lls->lls_lastid);
dt_write_lock(env, lls->lls_lastid_obj, 0);
rc = dt_record_write(env, lls->lls_lastid_obj,
lfsck_buf_get(env, &lastid,
GOTO(stop, rc);
/* 5a. update bookmark */
- rc = dt_declare_record_write(env, bk_obj, len, 0, th);
+ rc = dt_declare_record_write(env, bk_obj,
+ lfsck_buf_get(env, bk, len), 0, th);
if (rc != 0)
GOTO(stop, rc);
GOTO(stop, rc);
/* 8a. update bookmark locally. */
- rc = dt_declare_record_write(env, bk_obj, len, 0, th);
+ rc = dt_declare_record_write(env, bk_obj,
+ lfsck_buf_get(env, bk, len), 0, th);
if (rc != 0)
GOTO(stop, rc);
else
fid_zero(&op_data->op_fid2);
- op_data->op_opc = opc;
op_data->op_name = name;
op_data->op_mode = mode;
op_data->op_namelen = namelen;
lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
break;
case LMV_MAGIC:
+ case LMV_MAGIC_MIGRATE:
if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC))
lustre_swab_lmv_mds_md((union lmv_mds_md *)lmm);
break;
return rc;
}
-static int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi,
- const struct lu_fid *fid)
+int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi, const struct lu_fid *fid)
{
struct md_op_data *op_data;
int rc;
OBD_FREE_PTR(copy);
RETURN(rc);
}
+ case LL_IOC_MIGRATE: {
+ char *buf = NULL;
+ const char *filename;
+ int namelen = 0;
+ int len;
+ int rc;
+ int mdtidx;
+
+ rc = obd_ioctl_getdata(&buf, &len, (void __user *)arg);
+ if (rc < 0)
+ RETURN(rc);
+
+ data = (struct obd_ioctl_data *)buf;
+ if (data->ioc_inlbuf1 == NULL || data->ioc_inlbuf2 == NULL ||
+ data->ioc_inllen1 == 0 || data->ioc_inllen2 == 0)
+ GOTO(migrate_free, rc = -EINVAL);
+
+ filename = data->ioc_inlbuf1;
+ namelen = data->ioc_inllen1;
+ if (namelen < 1)
+ GOTO(migrate_free, rc = -EINVAL);
+
+ if (data->ioc_inllen2 != sizeof(mdtidx))
+ GOTO(migrate_free, rc = -EINVAL);
+ mdtidx = *(int *)data->ioc_inlbuf2;
+
+ rc = ll_migrate(inode, file, mdtidx, filename, namelen);
+migrate_free:
+ obd_ioctl_freedata(buf, len);
+
+ RETURN(rc);
+ }
default:
RETURN(obd_iocontrol(cmd, sbi->ll_dt_exp, 0, NULL,
(void *)arg));
}
if (!S_ISDIR(inode->i_mode)) {
- lov_read_and_clear_async_rc(lli->lli_clob);
+ if (lli->lli_clob != NULL)
+ lov_read_and_clear_async_rc(lli->lli_clob);
lli->lli_async_rc = 0;
}
OBD_FREE_PTR(hui);
RETURN(rc);
}
+
default: {
int err;
* failed for pages in this mapping. */
rc = lli->lli_async_rc;
lli->lli_async_rc = 0;
- err = lov_read_and_clear_async_rc(lli->lli_clob);
- if (rc == 0)
- rc = err;
+ if (lli->lli_clob != NULL) {
+ err = lov_read_and_clear_async_rc(lli->lli_clob);
+ if (rc == 0)
+ rc = err;
+ }
/* The application has been told write failure already.
* Do not report failure again. */
RETURN(rc);
}
+static int ll_get_fid_by_name(struct inode *parent, const char *name,
+ int namelen, struct lu_fid *fid)
+{
+ struct md_op_data *op_data = NULL;
+ struct mdt_body *body;
+ struct ptlrpc_request *req;
+ int rc;
+
+ op_data = ll_prep_md_op_data(NULL, parent, NULL, name, namelen, 0,
+ LUSTRE_OPC_ANY, NULL);
+ if (IS_ERR(op_data))
+ return PTR_ERR(op_data);
+
+ op_data->op_valid = OBD_MD_FLID;
+ rc = md_getattr_name(ll_i2sbi(parent)->ll_md_exp, op_data, &req);
+ if (rc < 0)
+ GOTO(out_free, rc);
+
+ body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ GOTO(out_req, rc = -EFAULT);
+
+ *fid = body->fid1;
+out_req:
+ ptlrpc_req_finished(req);
+out_free:
+ if (op_data != NULL)
+ ll_finish_md_op_data(op_data);
+ return rc;
+}
+
+int ll_migrate(struct inode *parent, struct file *file, int mdtidx,
+ const char *name, int namelen)
+{
+ struct dentry *dchild = NULL;
+ struct md_op_data *op_data;
+ struct ptlrpc_request *request = NULL;
+ struct qstr qstr;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_VFSTRACE, "migrate %s under"DFID" to MDT%d\n",
+ name, PFID(ll_inode2fid(parent)), mdtidx);
+
+ op_data = ll_prep_md_op_data(NULL, parent, NULL, name, namelen,
+ 0, LUSTRE_OPC_ANY, NULL);
+ if (IS_ERR(op_data))
+ RETURN(PTR_ERR(op_data));
+
+ /* Get child FID first */
+ qstr.hash = full_name_hash(name, namelen);
+ qstr.name = name;
+ qstr.len = namelen;
+ dchild = d_lookup(file->f_dentry, &qstr);
+ if (dchild != NULL && dchild->d_inode != NULL) {
+ op_data->op_fid3 = *ll_inode2fid(dchild->d_inode);
+ } else {
+ rc = ll_get_fid_by_name(parent, name, strnlen(name, namelen),
+ &op_data->op_fid3);
+ if (rc != 0)
+ GOTO(out_free, rc);
+ }
+
+ if (!fid_is_sane(&op_data->op_fid3)) {
+ CERROR("%s: migrate %s , but fid "DFID" is insane\n",
+ ll_get_fsname(parent->i_sb, NULL, 0), name,
+ PFID(&op_data->op_fid3));
+ GOTO(out_free, rc);
+ }
+
+ rc = ll_get_mdt_idx_by_fid(ll_i2sbi(parent), &op_data->op_fid3);
+ if (rc < 0)
+ GOTO(out_free, rc);
+
+ if (rc == mdtidx) {
+ CDEBUG(D_INFO, "%s:"DFID" is already on MDT%d.\n", name,
+ PFID(&op_data->op_fid3), mdtidx);
+ GOTO(out_free, rc = 0);
+ }
+
+ op_data->op_mds = mdtidx;
+ op_data->op_cli_flags = CLI_MIGRATE;
+ rc = md_rename(ll_i2sbi(parent)->ll_md_exp, op_data, name,
+ strnlen(name, namelen), name, strnlen(name, namelen),
+ &request);
+ if (rc == 0)
+ ll_update_times(request, parent);
+
+ ptlrpc_req_finished(request);
+ if (rc != 0)
+ GOTO(out_free, rc);
+
+out_free:
+ if (dchild != NULL) {
+ if (dchild->d_inode != NULL)
+ ll_delete_inode(dchild->d_inode);
+ dput(dchild);
+ }
+
+ ll_finish_md_op_data(op_data);
+ RETURN(rc);
+}
+
int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
{
ENTRY;
int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
void *cookie, filldir_t filldir);
int ll_get_mdt_idx(struct inode *inode);
+int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi, const struct lu_fid *fid);
struct lu_dirent *ll_dir_entry_start(struct inode *dir,
struct md_op_data *op_data,
#endif
struct dentry *ll_splice_alias(struct inode *inode, struct dentry *de);
int ll_rmdir_entry(struct inode *dir, char *name, int namelen);
+int ll_d_mountpoint(struct dentry *dparent, struct dentry *dchild,
+ struct qstr *name);
+void ll_update_times(struct ptlrpc_request *request, struct inode *inode);
/* llite/rw.c */
int ll_writepage(struct page *page, struct writeback_control *wbc);
int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat);
struct ll_file_data *ll_file_data_get(void);
struct posix_acl * ll_get_acl(struct inode *inode, int type);
-
+int ll_migrate(struct inode *parent, struct file *file, int mdtidx,
+ const char *name, int namelen);
#ifdef HAVE_GENERIC_PERMISSION_4ARGS
int ll_inode_permission(struct inode *inode, int mask, unsigned int flags);
#else
struct ll_inode_info *lli = ll_i2info(inode);
struct lmv_stripe_md *lsm = md->lmv;
int idx;
+ ENTRY;
- LASSERT(lsm != NULL);
LASSERT(S_ISDIR(inode->i_mode));
+ CDEBUG(D_INODE, "update lsm %p of "DFID"\n", lli->lli_lsm_md,
+ PFID(ll_inode2fid(inode)));
+
+ /* no striped information from request. */
+ if (lsm == NULL) {
+ if (lli->lli_lsm_md == NULL) {
+ RETURN_EXIT;
+ } else if (lli->lli_lsm_md->lsm_md_magic == LMV_MAGIC_MIGRATE) {
+ /* migration is done, the temporay MIGRATE layout has
+ * been removed */
+ CDEBUG(D_INODE, DFID" finish migration.\n",
+ PFID(ll_inode2fid(inode)));
+ lmv_free_memmd(lli->lli_lsm_md);
+ lli->lli_lsm_md = NULL;
+ RETURN_EXIT;
+ } else {
+ /* The lustre_md from req does not include stripeEA,
+ * see ll_md_setattr */
+ RETURN_EXIT;
+ }
+ }
+
+ /* set the directory layout */
if (lli->lli_lsm_md == NULL) {
int rc;
/* set lsm_md to NULL, so the following free lustre_md
* will not free this lsm */
md->lmv = NULL;
- return;
+ CDEBUG(D_INODE, "Set lsm %p magic %x to "DFID"\n", lsm,
+ lsm->lsm_md_magic, PFID(ll_inode2fid(inode)));
+ RETURN_EXIT;
}
/* Compare the old and new stripe information */
md_update_lsm_md(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
md->body, ll_md_blocking_ast);
+
+ RETURN_EXIT;
}
void ll_clear_inode(struct inode *inode)
lli->lli_maxbytes = MAX_LFS_FILESIZE;
}
- if (S_ISDIR(inode->i_mode) && md->lmv != NULL)
+ if (S_ISDIR(inode->i_mode))
ll_update_lsm_md(inode, md);
if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
if ((opc == LUSTRE_OPC_CREATE) && (name != NULL) &&
filename_is_volatile(name, namelen, NULL))
op_data->op_bias |= MDS_CREATE_VOLATILE;
- op_data->op_opc = opc;
op_data->op_mds = 0;
op_data->op_data = data;
* Check if we have something mounted at the named dchild.
* In such a case there would always be dentry present.
*/
-static int ll_d_mountpoint(struct dentry *dparent, struct dentry *dchild,
- struct qstr *name)
+int ll_d_mountpoint(struct dentry *dparent, struct dentry *dchild,
+ struct qstr *name)
{
int mounted = 0;
RETURN(0);
}
-static void ll_update_times(struct ptlrpc_request *request,
- struct inode *inode)
+void ll_update_times(struct ptlrpc_request *request, struct inode *inode)
{
struct mdt_body *body = req_capsule_server_get(&request->rq_pill,
&RMF_MDT_BODY);
* evicted to avoid hitting LBUG when truncate_inode_pages()
* is called later on. */
ignore_layout = 1;
+
+ if (cl_i2info(inode)->lli_clob == NULL)
+ RETURN(0);
+
result = cl_sync_file_range(inode, start, end, mode, ignore_layout);
if (result > 0) {
wbc->nr_to_write -= result;
(*dentryp)->d_name.name,
PFID(ll_inode2fid((*dentryp)->d_inode)),
PFID(ll_inode2fid(inode)));
- ll_sai_unplug(sai, entry);
+ ll_intent_release(&it);
+ ll_sai_unplug(sai, entry);
RETURN(-ESTALE);
} else {
iput(inode);
#include <lustre_lib.h>
#include <lustre_net.h>
#include <lustre_dlm.h>
+#include <lustre_mdc.h>
#include <obd_class.h>
#include <lprocfs_status.h>
#include "lmv_internal.h"
oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
op_data->op_namelen);
+ if (IS_ERR(oinfo))
+ RETURN(PTR_ERR(oinfo));
op_data->op_fid1 = oinfo->lmo_fid;
}
ldlm_blocking_callback cb_blocking,
__u64 extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt = NULL;
- struct mdt_body *body;
- int rc = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt = NULL;
+ struct mdt_body *body;
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ int rc = 0;
ENTRY;
tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
fid_zero(&op_data->op_fid2);
CDEBUG(D_INODE, "LOOKUP_INTENT with fid1="DFID", fid2="DFID
- ", name='%s' -> mds #%d\n", PFID(&op_data->op_fid1),
- PFID(&op_data->op_fid2),
+ ", name='%s' -> mds #%d lsm=%p lsm_magic=%x\n",
+ PFID(&op_data->op_fid1), PFID(&op_data->op_fid2),
op_data->op_name ? op_data->op_name : "<NULL>",
- tgt->ltd_idx);
+ tgt->ltd_idx, lsm, lsm == NULL ? -1 : lsm->lsm_md_magic);
op_data->op_bias &= ~MDS_CROSS_REF;
rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
flags, reqp, cb_blocking, extra_lock_flags);
-
if (rc < 0)
RETURN(rc);
* during update_inode process (see ll_update_lsm_md) */
if (op_data->op_mea2 != NULL) {
rc = lmv_revalidate_slaves(exp, NULL, op_data->op_mea2,
- cb_blocking, extra_lock_flags);
+ cb_blocking,
+ extra_lock_flags);
if (rc != 0)
RETURN(rc);
}
RETURN(rc);
- }
+ } else if (it_disposition(it, DISP_LOOKUP_NEG) &&
+ lsm != NULL && lsm->lsm_md_magic == LMV_MAGIC_MIGRATE) {
+ /* For migrating directory, if it can not find the child in
+ * the source directory(master stripe), try the targeting
+ * directory(stripe 1) */
+ tgt = lmv_find_target(lmv, &lsm->lsm_md_oinfo[1].lmo_fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+ ptlrpc_req_finished(*reqp);
+ CDEBUG(D_INODE, "For migrating dir, try target dir "DFID"\n",
+ PFID(&lsm->lsm_md_oinfo[1].lmo_fid));
+
+ op_data->op_fid1 = lsm->lsm_md_oinfo[1].lmo_fid;
+ it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
+ rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
+ flags, reqp, cb_blocking, extra_lock_flags);
+ RETURN(rc);
+ }
/*
* MDS has returned success. Probably name has been resolved in
* remote inode. Let's check this.
stripe_index = lmv_name_to_stripe_index(lsm->lsm_md_hash_type,
lsm->lsm_md_stripe_count,
name, namelen);
- LASSERT(stripe_index < lsm->lsm_md_stripe_count);
+ if (stripe_index < 0)
+ return ERR_PTR(stripe_index);
+
+ LASSERTF(stripe_index < lsm->lsm_md_stripe_count,
+ "stripe_index = %d, stripe_count = %d hash_type = %x"
+ "name = %.*s\n", stripe_index, lsm->lsm_md_stripe_count,
+ lsm->lsm_md_hash_type, namelen, name);
return &lsm->lsm_md_oinfo[stripe_index];
}
case LMV_HASH_TYPE_FNV_1A_64:
idx = lmv_hash_fnv1a(max_mdt_index, name, namelen);
break;
+ /* LMV_HASH_TYPE_MIGRATION means the file is being migrated,
+ * and the file should be accessed by client, except for
+ * lookup(see lmv_intent_lookup), return -EACCES here */
+ case LMV_HASH_TYPE_MIGRATION:
+ CERROR("%.*s is being migrated: rc = %d\n", namelen,
+ name, -EACCES);
+ return -EACCES;
default:
CERROR("Unknown hash type 0x%x\n", hashtype);
return -EINVAL;
const struct lmv_oinfo *oinfo;
oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
+ if (IS_ERR(oinfo))
+ RETURN((void *)oinfo);
*fid = oinfo->lmo_fid;
*mds = oinfo->lmo_mds;
tgt = lmv_get_target(lmv, *mds);
struct lmv_tgt_desc *tgt;
if (lsm == NULL || lsm->lsm_md_stripe_count <= 1 ||
- op_data->op_namelen == 0) {
+ op_data->op_namelen == 0 ||
+ lsm->lsm_md_magic == LMV_MAGIC_MIGRATE) {
tgt = lmv_find_target(lmv, fid);
if (IS_ERR(tgt))
return tgt;
fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
NULL)
-static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
- int op_tgt, ldlm_mode_t mode, int bits, int flag)
+static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt,
+ struct md_op_data *op_data,
+ int op_tgt, ldlm_mode_t mode, int bits, int flag)
{
- struct lu_fid *fid = md_op_data_fid(op_data, flag);
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt;
- ldlm_policy_data_t policy = {{0}};
- int rc = 0;
- ENTRY;
+ struct lu_fid *fid = md_op_data_fid(op_data, flag);
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ ldlm_policy_data_t policy = {{ 0 }};
+ int rc = 0;
+ ENTRY;
- if (!fid_is_sane(fid))
- RETURN(0);
+ if (!fid_is_sane(fid))
+ RETURN(0);
- tgt = lmv_find_target(lmv, fid);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ if (tgt == NULL) {
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+ }
if (tgt->ltd_idx != op_tgt) {
CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
op_data->op_namelen);
+ if (IS_ERR(oinfo))
+ RETURN(PTR_ERR(oinfo));
+
op_data->op_fid2 = oinfo->lmo_fid;
}
* Cancel UPDATE lock on child (fid1).
*/
op_data->op_flags |= MF_MDC_CANCEL_FID2;
- rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+ rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
if (rc != 0)
RETURN(rc);
op_data->op_fsuid = current_fsuid();
op_data->op_fsgid = current_fsgid();
op_data->op_cap = cfs_curproc_cap_pack();
-
- if (op_data->op_mea1 != NULL) {
- struct lmv_stripe_md *lsm = op_data->op_mea1;
- const struct lmv_oinfo *oinfo;
-
- oinfo = lsm_name_to_stripe_info(lsm, old, oldlen);
- op_data->op_fid1 = oinfo->lmo_fid;
- op_data->op_mds = oinfo->lmo_mds;
- src_tgt = lmv_get_target(lmv, op_data->op_mds);
- if (IS_ERR(src_tgt))
- RETURN(PTR_ERR(src_tgt));
+ if (op_data->op_cli_flags & CLI_MIGRATE) {
+ LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
+ PFID(&op_data->op_fid3));
+ rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
+ if (rc)
+ RETURN(rc);
+ src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid3);
} else {
- src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
- if (IS_ERR(src_tgt))
- RETURN(PTR_ERR(src_tgt));
+ if (op_data->op_mea1 != NULL) {
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+
+ src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
+ oldlen,
+ &op_data->op_fid1,
+ &op_data->op_mds);
+ if (IS_ERR(src_tgt))
+ RETURN(PTR_ERR(src_tgt));
+ } else {
+ src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(src_tgt))
+ RETURN(PTR_ERR(src_tgt));
- op_data->op_mds = src_tgt->ltd_idx;
- }
+ op_data->op_mds = src_tgt->ltd_idx;
+ }
- if (op_data->op_mea2) {
- struct lmv_stripe_md *lsm = op_data->op_mea2;
- const struct lmv_oinfo *oinfo;
+ if (op_data->op_mea2) {
+ struct lmv_stripe_md *lsm = op_data->op_mea2;
+ const struct lmv_oinfo *oinfo;
- oinfo = lsm_name_to_stripe_info(lsm, new, newlen);
- op_data->op_fid2 = oinfo->lmo_fid;
+ oinfo = lsm_name_to_stripe_info(lsm, new, newlen);
+ if (IS_ERR(oinfo))
+ RETURN(PTR_ERR(oinfo));
+
+ op_data->op_fid2 = oinfo->lmo_fid;
+ }
}
+ if (IS_ERR(src_tgt))
+ RETURN(PTR_ERR(src_tgt));
/*
* LOOKUP lock on src child (fid3) should also be cancelled for
* Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
* own target.
*/
- rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
LCK_EX, MDS_INODELOCK_UPDATE,
MF_MDC_CANCEL_FID2);
+ if (rc != 0)
+ RETURN(rc);
/*
- * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
+ * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
*/
- if (rc == 0) {
- rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ if (fid_is_sane(&op_data->op_fid3)) {
+ struct lmv_tgt_desc *tgt;
+
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ /* Cancel LOOKUP lock on its parent */
+ rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
LCK_EX, MDS_INODELOCK_LOOKUP,
- MF_MDC_CANCEL_FID4);
+ MF_MDC_CANCEL_FID3);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_FULL,
+ MF_MDC_CANCEL_FID3);
+ if (rc != 0)
+ RETURN(rc);
}
/*
* Cancel all the locks on tgt child (fid4).
*/
- if (rc == 0)
- rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ if (fid_is_sane(&op_data->op_fid4))
+ rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
LCK_EX, MDS_INODELOCK_FULL,
MF_MDC_CANCEL_FID4);
CDEBUG(D_INODE, DFID":m%d to "DFID"\n", PFID(&op_data->op_fid1),
op_data->op_mds, PFID(&op_data->op_fid2));
- if (rc == 0)
- rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
- new, newlen, request);
+ rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen, new, newlen,
+ request);
+
RETURN(rc);
}
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *tgt = NULL;
+ struct lmv_tgt_desc *parent_tgt = NULL;
struct mdt_body *body;
int rc;
ENTRY;
/* For striped dir, we need to locate the parent as well */
if (op_data->op_mea1 != NULL &&
op_data->op_mea1->lsm_md_stripe_count > 1) {
+ struct lmv_tgt_desc *tmp;
+
LASSERT(op_data->op_name != NULL &&
op_data->op_namelen != 0);
- lmv_locate_target_for_name(lmv, op_data->op_mea1,
+ tmp = lmv_locate_target_for_name(lmv,
+ op_data->op_mea1,
op_data->op_name,
op_data->op_namelen,
&op_data->op_fid1,
&op_data->op_mds);
+ if (IS_ERR(tmp))
+ RETURN(PTR_ERR(tmp));
}
} else {
tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
/*
* Cancel FULL locks on child (fid3).
*/
- rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
- MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
+ parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(parent_tgt))
+ RETURN(PTR_ERR(parent_tgt));
+
+ if (parent_tgt != tgt) {
+ rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_LOOKUP,
+ MF_MDC_CANCEL_FID3);
+ }
+ rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
if (rc != 0)
RETURN(rc);
}
/* Unpack memmd */
- if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1) {
- CERROR("%s: invalid magic %x.\n", exp->exp_obd->obd_name,
- le32_to_cpu(lmm->lmv_magic));
- RETURN(-EINVAL);
+ if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
+ le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_MIGRATE &&
+ le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
+ CERROR("%s: invalid lmv magic %x: rc = %d\n",
+ exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic),
+ -EIO);
+ RETURN(-EIO);
}
+ if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1 ||
+ le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_MIGRATE)
+ lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
+ else
+ /**
+ * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md,
+ * stripecount should be 0 then.
+ */
+ lsm_size = lmv_stripe_md_size(0);
+
lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
if (lsm == NULL) {
OBD_ALLOC(lsm, lsm_size);
switch (le32_to_cpu(lmm->lmv_magic)) {
case LMV_MAGIC_V1:
+ case LMV_MAGIC_MIGRATE:
rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
break;
default:
info->lti_buf.lb_len = info->lti_ea_store_size;
rc = dt_xattr_get(env, next, &info->lti_buf, name, BYPASS_CAPA);
}
+
/* if object is not striped or inaccessible */
if (rc == -ENODATA || rc == -ENOENT)
RETURN(0);
int rc = 0;
ENTRY;
+ if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_MIGRATE)
+ RETURN(0);
+
if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
RETURN(-EINVAL);
GOTO(out_put, rc);
}
- rc = dt_declare_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, 0, th);
+ rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
+ XATTR_NAME_LMV, 0, th);
if (rc != 0)
GOTO(out_put, rc);
capa);
}
- rc = dt_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, fl, th, capa);
+ rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
+ fl, th, capa);
RETURN(rc);
}
if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
GOTO(unlock, rc = 0);
+ CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
+ PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
+ (int)v1->lmm_stripe_count,
+ (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
+
lp->ldo_def_stripenr = v1->lmm_stripe_count;
lp->ldo_def_stripe_size = v1->lmm_stripe_size;
lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
GOTO(out, rc = -ENOMEM);
}
- /* choose OST and generate appropriate objects */
- rc = lod_qos_prep_create(env, lo, attr, lovea, th);
- if (rc) {
- /* failed to create striping, let's reset
- * config so that others don't get confused */
- lod_object_free_striping(env, lo);
- GOTO(out, rc);
- }
+ if (!dt_object_remote(next)) {
+ /* choose OST and generate appropriate objects */
+ rc = lod_qos_prep_create(env, lo, attr, lovea, th);
+ if (rc) {
+ /* failed to create striping, let's reset
+ * config so that others don't get confused */
+ lod_object_free_striping(env, lo);
+ GOTO(out, rc);
+ }
- /*
- * declare storage for striping data
- */
- info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
+ /*
+ * declare storage for striping data
+ */
+ info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1);
- rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
- 0, th);
+ } else {
+ /* LOD can not choose OST objects for remote objects, i.e.
+ * stripes must be ready before that. Right now, it can only
+ * happen during migrate, i.e. migrate process needs to create
+ * remote regular file (mdd_migrate_create), then the migrate
+ * process will provide stripeEA. */
+ LASSERT(lovea != NULL);
+ info->lti_buf = *lovea;
+ }
+
+ rc = dt_declare_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_LOV, 0, th);
if (rc)
GOTO(out, rc);
rc = lod_declare_striped_object(env, dt, attr,
NULL, th);
} else if (dof->dof_type == DFT_DIR) {
- rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
+ /* Orphan object (like migrating object) does not have
+ * lod_dir_stripe, see lod_ah_init */
+ if (lo->ldo_dir_stripe != NULL)
+ rc = lod_declare_dir_striping_create(env, dt, attr,
+ dof, th);
}
out:
RETURN(rc);
rc = dt_create(env, next, attr, hint, dof, th);
if (rc == 0) {
- if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+ lo->ldo_dir_stripe != NULL)
rc = lod_dir_striping_create(env, dt, attr, dof, th);
else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0)
rc = lod_striping_create(env, dt, attr, dof, th);
static ssize_t lod_declare_write(const struct lu_env *env,
struct dt_object *dt,
- const loff_t size, loff_t pos,
+ const struct lu_buf *buf, loff_t pos,
struct thandle *th)
{
return dt_declare_record_write(env, dt_object_child(dt),
- size, pos, th);
+ buf, pos, th);
}
static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
#endif
#include <lustre_net.h>
#include <lustre/lustre_idl.h>
+#include <obd_class.h>
#include <obd.h>
#include <cl_object.h>
#include <lclient.h>
rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
/* XXX do something about time, uid, gid */
- rec->rn_opcode = REINT_RENAME;
+ rec->rn_opcode = op_data->op_cli_flags & CLI_MIGRATE ?
+ REINT_MIGRATE : REINT_RENAME;
rec->rn_fsuid = op_data->op_fsuid;
rec->rn_fsgid = op_data->op_fsgid;
rec->rn_cap = op_data->op_cap;
rec->rn_mode = op_data->op_mode;
rec->rn_bias = op_data->op_bias;
- mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
+ mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
struct ldlm_enqueue_info *ga_einfo;
};
-int it_disposition(struct lookup_intent *it, int flag)
-{
- return it->d.lustre.it_disposition & flag;
-}
-EXPORT_SYMBOL(it_disposition);
-
-void it_set_disposition(struct lookup_intent *it, int flag)
-{
- it->d.lustre.it_disposition |= flag;
-}
-EXPORT_SYMBOL(it_set_disposition);
-
-void it_clear_disposition(struct lookup_intent *it, int flag)
-{
- it->d.lustre.it_disposition &= ~flag;
-}
-EXPORT_SYMBOL(it_clear_disposition);
-
int it_open_error(int phase, struct lookup_intent *it)
{
if (it_disposition(it, DISP_OPEN_LEASE)) {
{
int rc;
+ if (!mdd_object_exists(mdd_obj))
+ return -ENODATA;
+
/* First try a small buf */
LASSERT(env != NULL);
ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
if (ldata->ld_buf->lb_buf == NULL)
return -ENOMEM;
- if (!mdd_object_exists(mdd_obj))
- return -ENODATA;
-
rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK,
BYPASS_CAPA);
if (rc == -ERANGE) {
rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
XATTR_NAME_LINK, BYPASS_CAPA);
}
- if (rc < 0)
+ if (rc < 0) {
+ lu_buf_free(ldata->ld_buf);
+ ldata->ld_buf = NULL;
return rc;
+ }
return linkea_init(ldata);
}
int ea_len;
void *linkea;
- if (ldata != NULL && ldata->ld_lee != NULL) {
+ if (ldata != NULL && ldata->ld_leh != NULL) {
ea_len = ldata->ld_leh->leh_len;
linkea = ldata->ld_buf->lb_buf;
} else {
return rc;
}
-int mdd_declare_finish_unlink(const struct lu_env *env,
- struct mdd_object *obj,
- struct md_attr *ma,
- struct thandle *handle)
+static int mdd_declare_finish_unlink(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct thandle *handle)
{
int rc;
if (rc)
return rc;
- rc = mdd_declare_finish_unlink(env, c, ma, handle);
+ rc = mdd_declare_finish_unlink(env, c, handle);
if (rc)
return rc;
struct thandle *handle;
const struct lu_buf *buf;
struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
+ struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
int rc;
ENTRY;
RETURN(rc);
/* calling ->ah_make_hint() is used to transfer information from parent */
- mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
+ mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
struct mdd_object *parent,
struct mdd_object *child,
struct lu_attr *attr,
- struct thandle *handle,
- struct linkea_data *ldata)
+ struct thandle *handle)
{
int rc;
ENTRY;
dotdot, handle);
}
- if (rc == 0)
- mdd_declare_links_add(env, child, handle, ldata);
-
RETURN(rc);
}
static int mdd_object_initialize(const struct lu_env *env,
const struct lu_fid *pfid,
- const struct lu_name *lname,
struct mdd_object *child,
struct lu_attr *attr, struct thandle *handle,
- const struct md_op_spec *spec,
- struct linkea_data *ldata)
+ const struct md_op_spec *spec)
{
int rc;
ENTRY;
mdo_ref_del(env, child, handle);
}
- if (rc == 0)
- mdd_links_add(env, child, pfid, lname, handle, ldata, 1);
-
RETURN(rc);
}
RETURN(rc);
}
-static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
- struct mdd_object *p, struct mdd_object *c,
- const struct lu_name *name,
- struct lu_attr *attr,
- struct thandle *handle,
- const struct md_op_spec *spec,
- struct linkea_data *ldata,
- struct lu_buf *def_acl_buf,
- struct lu_buf *acl_buf)
+static int mdd_declare_object_create(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *p, struct mdd_object *c,
+ struct lu_attr *attr,
+ struct thandle *handle,
+ const struct md_op_spec *spec,
+ struct lu_buf *def_acl_buf,
+ struct lu_buf *acl_buf,
+ struct dt_allocation_hint *hint)
{
int rc;
- rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
+ rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
+ hint);
if (rc)
GOTO(out, rc);
GOTO(out, rc);
}
#endif
-
- if (S_ISDIR(attr->la_mode)) {
- rc = mdo_declare_ref_add(env, p, handle);
- if (rc)
- GOTO(out, rc);
- }
-
- rc = mdd_declare_object_initialize(env, p, c, attr, handle, ldata);
- if (rc)
- GOTO(out, rc);
-
- if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
- rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
- else
- rc = mdo_declare_index_insert(env, p, mdo2fid(c),
- name->ln_name, handle);
+ rc = mdd_declare_object_initialize(env, p, c, attr, handle);
if (rc)
GOTO(out, rc);
}
if (S_ISLNK(attr->la_mode)) {
+ const char *target_name = spec->u.sp_symname;
+ int sym_len = strlen(target_name);
+ const struct lu_buf *buf;
+
+ buf = mdd_buf_get_const(env, target_name, sym_len);
rc = dt_declare_record_write(env, mdd_object_child(c),
- strlen(spec->u.sp_symname), 0,
- handle);
+ buf, 0, handle);
if (rc)
GOTO(out, rc);
}
+out:
+ return rc;
+}
+
+static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
+ struct mdd_object *p, struct mdd_object *c,
+ const struct lu_name *name,
+ struct lu_attr *attr,
+ struct thandle *handle,
+ const struct md_op_spec *spec,
+ struct linkea_data *ldata,
+ struct lu_buf *def_acl_buf,
+ struct lu_buf *acl_buf,
+ struct dt_allocation_hint *hint)
+{
+ int rc;
+
+ rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
+ def_acl_buf, acl_buf, hint);
+ if (rc)
+ GOTO(out, rc);
+
+ if (S_ISDIR(attr->la_mode)) {
+ rc = mdo_declare_ref_add(env, p, handle);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ if (spec->sp_cr_flags & MDS_OPEN_VOLATILE) {
+ rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
+ if (rc)
+ GOTO(out, rc);
+ } else {
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
- if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
- struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
+ rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,
+ handle);
+ if (rc)
+ return rc;
+ rc = mdd_declare_links_add(env, c, handle, ldata);
+ if (rc)
+ return rc;
*la = *attr;
la->la_valid = LA_CTIME | LA_MTIME;
RETURN(rc);
}
+/**
+ * Create a metadata object and initialize it, set acl, xattr.
+ **/
+static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
+ struct mdd_object *son, struct lu_attr *attr,
+ struct md_op_spec *spec, struct lu_buf *acl_buf,
+ struct lu_buf *def_acl_buf,
+ struct dt_allocation_hint *hint,
+ struct thandle *handle)
+{
+ int rc;
+
+ mdd_write_lock(env, son, MOR_TGT_CHILD);
+ rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
+ hint);
+ if (rc)
+ GOTO(unlock, rc);
+
+#ifdef CONFIG_FS_POSIX_ACL
+ if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
+ S_ISDIR(attr->la_mode)) {
+ /* set default acl */
+ rc = mdo_xattr_set(env, son, def_acl_buf,
+ XATTR_NAME_ACL_DEFAULT, 0,
+ handle, BYPASS_CAPA);
+ if (rc)
+ GOTO(err_destroy, rc);
+ }
+ /* set its own acl */
+ if (acl_buf != NULL && acl_buf->lb_len > 0) {
+ rc = mdo_xattr_set(env, son, acl_buf,
+ XATTR_NAME_ACL_ACCESS,
+ 0, handle, BYPASS_CAPA);
+ if (rc)
+ GOTO(err_destroy, rc);
+ }
+#endif
+
+ rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
+ spec);
+ if (rc != 0)
+ GOTO(err_destroy, rc);
+
+ /*
+ * in case of replay we just set LOVEA provided by the client
+ * XXX: I think it would be interesting to try "old" way where
+ * MDT calls this xattr_set(LOV) in a different transaction.
+ * probably this way we code can be made better.
+ */
+ if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA &&
+ S_ISREG(attr->la_mode))) {
+ const struct lu_buf *buf;
+
+ buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
+ spec->u.sp_ea.eadatalen);
+ rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
+ BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(err_destroy, rc);
+ }
+
+ if (S_ISLNK(attr->la_mode)) {
+ struct lu_ucred *uc = lu_ucred_assert(env);
+ struct dt_object *dt = mdd_object_child(son);
+ const char *target_name = spec->u.sp_symname;
+ int sym_len = strlen(target_name);
+ const struct lu_buf *buf;
+ loff_t pos = 0;
+
+ buf = mdd_buf_get_const(env, target_name, sym_len);
+ rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
+ mdd_object_capa(env, son),
+ uc->uc_cap &
+ CFS_CAP_SYS_RESOURCE_MASK);
+
+ if (rc == sym_len)
+ rc = 0;
+ else
+ GOTO(err_initlized, rc = -EFAULT);
+ }
+
+err_initlized:
+ if (unlikely(rc != 0)) {
+ int rc2;
+ if (S_ISDIR(attr->la_mode)) {
+ /* Drop the reference, no need to delete "."/"..",
+ * because the object to be destroied directly. */
+ rc2 = mdo_ref_del(env, son, handle);
+ if (rc2 != 0)
+ GOTO(unlock, rc);
+ }
+ rc2 = mdo_ref_del(env, son, handle);
+ if (rc2 != 0)
+ GOTO(unlock, rc);
+err_destroy:
+ mdo_destroy(env, son, handle);
+ }
+unlock:
+ mdd_write_unlock(env, son);
+ RETURN(rc);
+}
+
/*
* Create object and insert it into namespace.
*/
struct lu_buf def_acl_buf;
struct linkea_data *ldata = &info->mti_link_data;
const char *name = lname->ln_name;
- int rc, created = 0, initialized = 0, inserted = 0;
+ struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
+ int rc;
ENTRY;
/*
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
GOTO(out_free, rc = -EINPROGRESS);
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ GOTO(out_free, rc = PTR_ERR(handle));
+
acl_buf.lb_buf = info->mti_xattr_buf;
acl_buf.lb_len = sizeof(info->mti_xattr_buf);
def_acl_buf.lb_buf = info->mti_key;
def_acl_buf.lb_len = sizeof(info->mti_key);
rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
if (rc < 0)
- GOTO(out_free, rc);
-
- mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
+ GOTO(out_stop, rc);
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- GOTO(out_free, rc = PTR_ERR(handle));
+ mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
memset(ldata, 0, sizeof(*ldata));
- mdd_linkea_prepare(env, son, NULL, NULL, mdd_object_fid(mdd_pobj),
- lname, 1, 0, ldata);
+ rc = mdd_linkea_prepare(env, son, NULL, NULL,
+ mdd_object_fid(mdd_pobj),
+ lname, 1, 0, ldata);
rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
- handle, spec, ldata, &def_acl_buf, &acl_buf);
+ handle, spec, ldata, &def_acl_buf, &acl_buf,
+ hint);
if (rc)
GOTO(out_stop, rc);
if (rc)
GOTO(out_stop, rc);
- mdd_write_lock(env, son, MOR_TGT_CHILD);
- rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec);
- if (rc) {
- mdd_write_unlock(env, son);
- GOTO(cleanup, rc);
- }
-
- created = 1;
-
-#ifdef CONFIG_FS_POSIX_ACL
- if (def_acl_buf.lb_len > 0 && S_ISDIR(attr->la_mode)) {
- /* set default acl */
- rc = mdo_xattr_set(env, son, &def_acl_buf,
- XATTR_NAME_ACL_DEFAULT, 0,
- handle, BYPASS_CAPA);
- if (rc) {
- mdd_write_unlock(env, son);
- GOTO(cleanup, rc);
- }
- }
- /* set its own acl */
- if (acl_buf.lb_len > 0) {
- rc = mdo_xattr_set(env, son, &acl_buf,
- XATTR_NAME_ACL_ACCESS,
- 0, handle, BYPASS_CAPA);
- if (rc) {
- mdd_write_unlock(env, son);
- GOTO(cleanup, rc);
- }
- }
-#endif
-
- rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
- son, attr, handle, spec, ldata);
-
- /*
- * in case of replay we just set LOVEA provided by the client
- * XXX: I think it would be interesting to try "old" way where
- * MDT calls this xattr_set(LOV) in a different transaction.
- * probably this way we code can be made better.
- */
- if (rc == 0 && (spec->no_create ||
- (spec->sp_cr_flags & MDS_OPEN_HAS_EA &&
- S_ISREG(attr->la_mode)))) {
- const struct lu_buf *buf;
-
- buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
- spec->u.sp_ea.eadatalen);
- rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
- BYPASS_CAPA);
- }
-
- if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
- rc = __mdd_orphan_add(env, son, handle);
-
- mdd_write_unlock(env, son);
-
+ rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
+ &def_acl_buf, hint, handle);
if (rc != 0)
- /*
- * Object has no links, so it will be destroyed when last
- * reference is released. (XXX not now.)
- */
- GOTO(cleanup, rc);
-
- initialized = 1;
+ GOTO(out_stop, rc);
- if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE))
+ if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
+ mdd_write_lock(env, son, MOR_TGT_CHILD);
+ rc = __mdd_orphan_add(env, son, handle);
+ mdd_write_unlock(env, son);
+ if (rc != 0)
+ GOTO(err_created, rc);
+ } else {
rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
name, S_ISDIR(attr->la_mode), handle,
mdd_object_capa(env, mdd_pobj));
+ if (rc != 0)
+ GOTO(err_created, rc);
- if (rc != 0)
- GOTO(cleanup, rc);
-
- inserted = 1;
-
- if (S_ISLNK(attr->la_mode)) {
- struct lu_ucred *uc = lu_ucred_assert(env);
- struct dt_object *dt = mdd_object_child(son);
- const char *target_name = spec->u.sp_symname;
- int sym_len = strlen(target_name);
- const struct lu_buf *buf;
- loff_t pos = 0;
-
- buf = mdd_buf_get_const(env, target_name, sym_len);
- rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
- mdd_object_capa(env, son),
- uc->uc_cap &
- CFS_CAP_SYS_RESOURCE_MASK);
-
- if (rc == sym_len)
- rc = 0;
- else
- GOTO(cleanup, rc = -EFAULT);
- }
-
- /* volatile file creation does not update parent directory times */
- if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
- GOTO(cleanup, rc = 0);
+ mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
+ ldata, 1);
- /* update parent directory mtime/ctime */
- *la = *attr;
- la->la_valid = LA_CTIME | LA_MTIME;
- rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
- if (rc)
- GOTO(cleanup, rc);
+ /* update parent directory mtime/ctime */
+ *la = *attr;
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
+ if (rc)
+ GOTO(err_insert, rc);
+ }
- EXIT;
-cleanup:
- if (rc != 0 && created != 0) {
+ EXIT;
+err_insert:
+ if (rc != 0) {
int rc2;
- if (inserted != 0) {
- if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
- rc2 = __mdd_orphan_del(env, son, handle);
- else
- rc2 = __mdd_index_delete(env, mdd_pobj, name,
- S_ISDIR(attr->la_mode),
- handle, BYPASS_CAPA);
- if (rc2 != 0)
- goto out_stop;
- }
+ if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
+ rc2 = __mdd_orphan_del(env, son, handle);
+ else
+ rc2 = __mdd_index_delete(env, mdd_pobj, name,
+ S_ISDIR(attr->la_mode),
+ handle, BYPASS_CAPA);
+ if (rc2 != 0)
+ goto out_stop;
+err_created:
mdd_write_lock(env, son, MOR_TGT_CHILD);
- if (initialized != 0 && S_ISDIR(attr->la_mode)) {
+ if (S_ISDIR(attr->la_mode)) {
/* Drop the reference, no need to delete "."/"..",
* because the object to be destroied directly. */
rc2 = mdo_ref_del(env, son, handle);
goto out_stop;
}
}
-
rc2 = mdo_ref_del(env, son, handle);
if (rc2 != 0) {
mdd_write_unlock(env, son);
if (rc)
return rc;
- rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
+ rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
if (rc)
return rc;
}
return rc;
}
+/**
+ * During migration once the parent FID has been changed,
+ * we need update the parent FID in linkea.
+ **/
+static int mdd_linkea_update_child_internal(const struct lu_env *env,
+ struct mdd_object *parent,
+ struct mdd_object *child,
+ const char *name, int namelen,
+ struct thandle *handle,
+ bool declare)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct linkea_data ldata = {0};
+ struct lu_buf *buf = &info->mti_link_buf;
+ int count;
+ int rc = 0;
+
+ ENTRY;
+
+ buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+ if (buf->lb_buf == NULL)
+ RETURN(-ENOMEM);
+
+ ldata.ld_buf = buf;
+ rc = mdd_links_read(env, child, &ldata);
+ if (rc != 0) {
+ if (rc == -ENOENT || rc == -ENODATA)
+ rc = 0;
+ RETURN(rc);
+ }
+
+ LASSERT(ldata.ld_leh != NULL);
+ ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
+ for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
+ struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
+ struct lu_name lname;
+ struct lu_fid fid;
+
+ linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
+ &lname, &fid);
+
+ if (strncmp(lname.ln_name, name, namelen) ||
+ lu_fid_eq(&fid, mdd_object_fid(parent))) {
+ ldata.ld_lee = (struct link_ea_entry *)
+ ((char *)ldata.ld_lee +
+ ldata.ld_reclen);
+ continue;
+ }
+
+ CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
+ mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
+ lname.ln_namelen, lname.ln_name,
+ PFID(mdd_object_fid(parent)));
+ /* update to the new parent fid */
+ linkea_entry_pack(ldata.ld_lee, &lname,
+ mdd_object_fid(parent));
+ if (declare)
+ rc = mdd_declare_links_add(env, child, handle, &ldata);
+ else
+ rc = mdd_links_write(env, child, &ldata, handle);
+ break;
+ }
+ RETURN(rc);
+}
+
+static int mdd_linkea_declare_update_child(const struct lu_env *env,
+ struct mdd_object *parent,
+ struct mdd_object *child,
+ const char *name, int namelen,
+ struct thandle *handle)
+{
+ return mdd_linkea_update_child_internal(env, parent, child, name,
+ namelen, handle, true);
+}
+
+static int mdd_linkea_update_child(const struct lu_env *env,
+ struct mdd_object *parent,
+ struct mdd_object *child,
+ const char *name, int namelen,
+ struct thandle *handle)
+{
+ return mdd_linkea_update_child_internal(env, parent, child, name,
+ namelen, handle, false);
+}
+
+static int mdd_update_linkea_internal(const struct lu_env *env,
+ struct mdd_object *mdd_pobj,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj,
+ const struct lu_name *child_name,
+ struct thandle *handle,
+ int declare)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct linkea_data *ldata = &info->mti_link_data;
+ int count;
+ int rc = 0;
+ ENTRY;
+
+ rc = mdd_links_read(env, mdd_sobj, ldata);
+ if (rc != 0) {
+ if (rc == -ENOENT || rc == -ENODATA)
+ rc = 0;
+ RETURN(rc);
+ }
+
+ if (declare)
+ rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
+ else
+ rc = mdd_links_write(env, mdd_tobj, ldata, handle);
+
+ if (rc != 0)
+ RETURN(rc);
+
+ /* If it is mulitple links file, we need update the name entry for
+ * all parent */
+ LASSERT(ldata->ld_leh != NULL);
+ ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
+ for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
+ struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+ struct mdd_object *pobj;
+ struct lu_name lname;
+ struct lu_fid fid;
+
+ linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
+ &lname, &fid);
+ ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
+ ldata->ld_reclen);
+ pobj = mdd_object_find(env, mdd, &fid);
+ if (IS_ERR(pobj)) {
+ CWARN("%s: cannot find obj "DFID": rc = %ld\n",
+ mdd2obd_dev(mdd)->obd_name, PFID(&fid),
+ PTR_ERR(pobj));
+ continue;
+ }
+
+ if (!mdd_object_exists(pobj)) {
+ CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
+ mdd2obd_dev(mdd)->obd_name, PFID(&fid));
+ GOTO(next_put, rc);
+ }
+
+ if (pobj == mdd_pobj &&
+ lname.ln_namelen == child_name->ln_namelen &&
+ strncmp(lname.ln_name, child_name->ln_name,
+ lname.ln_namelen) == 0) {
+ CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
+ mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
+ PFID(&fid));
+ GOTO(next_put, rc);
+ }
+
+ CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
+ mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
+ PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
+
+ if (declare) {
+ /* Remove source name from source directory */
+ /* Insert new fid with target name into target dir */
+ rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
+ handle);
+ if (rc)
+ GOTO(next_put, rc);
+
+ rc = mdo_declare_index_insert(env, pobj,
+ mdd_object_fid(mdd_tobj),
+ lname.ln_name, handle);
+ if (rc)
+ GOTO(next_put, rc);
+
+ rc = mdo_declare_ref_add(env, mdd_tobj, handle);
+ if (rc)
+ GOTO(next_put, rc);
+
+ rc = mdo_declare_ref_del(env, mdd_sobj, handle);
+ if (rc)
+ GOTO(next_put, rc);
+ } else {
+ rc = __mdd_index_delete(env, pobj, lname.ln_name,
+ 0, handle,
+ mdd_object_capa(env, pobj));
+ if (rc)
+ GOTO(next_put, rc);
+
+ rc = __mdd_index_insert(env, pobj,
+ mdd_object_fid(mdd_tobj),
+ lname.ln_name, 0, handle,
+ mdd_object_capa(env, pobj));
+ if (rc)
+ GOTO(next_put, rc);
+
+ mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
+ rc = mdo_ref_add(env, mdd_tobj, handle);
+ mdd_write_unlock(env, mdd_tobj);
+ if (rc)
+ GOTO(next_put, rc);
+
+ mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+ mdo_ref_del(env, mdd_sobj, handle);
+ mdd_write_unlock(env, mdd_sobj);
+ }
+next_put:
+ mdd_object_put(env, pobj);
+ if (rc != 0)
+ break;
+ }
+
+ RETURN(rc);
+}
+
+static int mdd_migrate_xattrs(const struct lu_env *env,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+ char *xname;
+ struct thandle *handle;
+ struct lu_buf xbuf;
+ int xlen;
+ int rem;
+ int xsize;
+ int list_xsize;
+ struct lu_buf list_xbuf;
+ int rc;
+
+ /* retrieve xattr list from the old object */
+ list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL,
+ mdd_object_capa(env, mdd_sobj));
+ if (list_xsize == -ENODATA)
+ return 0;
+
+ if (list_xsize < 0)
+ return list_xsize;
+
+ lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
+ if (info->mti_big_buf.lb_buf == NULL)
+ return -ENOMEM;
+
+ list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
+ list_xbuf.lb_len = list_xsize;
+ rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf,
+ mdd_object_capa(env, mdd_sobj));
+ if (rc < 0)
+ return rc;
+ rc = 0;
+ rem = list_xsize;
+ xname = list_xbuf.lb_buf;
+ while (rem > 0) {
+ xlen = strnlen(xname, rem - 1) + 1;
+ if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
+ strcmp(XATTR_NAME_LMA, xname) == 0 ||
+ strcmp(XATTR_NAME_LMV, xname) == 0)
+ goto next;
+
+ /* For directory, if there are default layout, migrate here */
+ if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
+ !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
+ goto next;
+
+ xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL,
+ xname,
+ mdd_object_capa(env, mdd_sobj));
+ if (xsize == -ENODATA)
+ goto next;
+ if (xsize < 0)
+ GOTO(out, rc);
+
+ lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
+ if (info->mti_link_buf.lb_buf == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ xbuf.lb_len = xsize;
+ xbuf.lb_buf = info->mti_link_buf.lb_buf;
+ rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname,
+ mdd_object_capa(env, mdd_sobj));
+ if (rc == -ENODATA)
+ goto next;
+ if (rc < 0)
+ GOTO(out, rc);
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ GOTO(out, rc = PTR_ERR(handle));
+
+ rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
+ handle);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+ /* Note: this transaction is part of migration, and it is not
+ * the last step of migration, so we set th_local = 1 to avoid
+ * update last rcvd for this transaction */
+ handle->th_local = 1;
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle,
+ mdd_object_capa(env, mdd_sobj));
+ if (rc == -EEXIST)
+ GOTO(stop_trans, rc = 0);
+
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+stop_trans:
+ mdd_trans_stop(env, mdd, rc, handle);
+ if (rc != 0)
+ GOTO(out, rc);
+next:
+ rem -= xlen;
+ memmove(xname, xname + xlen, rem);
+ }
+out:
+ return rc;
+}
+
+static int mdd_declare_migrate_create(const struct lu_env *env,
+ struct mdd_object *mdd_pobj,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj,
+ struct md_op_spec *spec,
+ struct lu_attr *la,
+ union lmv_mds_md *mgr_ea,
+ struct thandle *handle)
+{
+ struct lu_attr *la_flag = MDD_ENV_VAR(env, la_for_fix);
+ const struct lu_buf *buf;
+ int rc;
+ int mgr_easize;
+
+ rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
+ handle, spec, NULL);
+ if (rc != 0)
+ return rc;
+
+ rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
+ handle);
+ if (rc != 0)
+ return rc;
+
+ if (S_ISLNK(la->la_mode)) {
+ const char *target_name = spec->u.sp_symname;
+ int sym_len = strlen(target_name);
+ const struct lu_buf *buf;
+
+ buf = mdd_buf_get_const(env, target_name, sym_len);
+ rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
+ buf, 0, handle);
+ if (rc != 0)
+ return rc;
+ }
+
+ if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
+ buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
+ spec->u.sp_ea.eadatalen);
+ rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
+ 0, handle);
+ if (rc)
+ return rc;
+ }
+
+ mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_MIGRATE);
+ buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
+ rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
+ 0, handle);
+ if (rc)
+ return rc;
+
+ la_flag->la_valid = LA_FLAGS;
+ la_flag->la_flags = LUSTRE_IMMUTABLE_FL;
+ mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
+ rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
+
+ return rc;
+}
+
+static int mdd_migrate_create(const struct lu_env *env,
+ struct mdd_object *mdd_pobj,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj,
+ struct lu_attr *la)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+ struct md_op_spec *spec = &info->mti_spec;
+ struct lu_buf lmm_buf = { 0 };
+ struct lu_buf link_buf = { 0 };
+ const struct lu_buf *buf;
+ struct thandle *handle;
+ struct lmv_mds_md_v1 *mgr_ea;
+ struct lu_attr *la_flag = MDD_ENV_VAR(env, la_for_fix);
+ int mgr_easize;
+ int rc;
+ ENTRY;
+
+ /* prepare spec for create */
+ memset(spec, 0, sizeof(*spec));
+ spec->sp_cr_lookup = 0;
+ spec->sp_feat = &dt_directory_features;
+ if (S_ISLNK(la->la_mode)) {
+ buf = lu_buf_check_and_alloc(
+ &mdd_env_info(env)->mti_big_buf,
+ la->la_size + 1);
+ link_buf = *buf;
+ link_buf.lb_len = la->la_size + 1;
+ rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
+ if (rc <= 0) {
+ rc = rc != 0 ? rc : -EFAULT;
+ CERROR("%s: "DFID" readlink failed: rc = %d\n",
+ mdd2obd_dev(mdd)->obd_name,
+ PFID(mdd_object_fid(mdd_sobj)), rc);
+ RETURN(rc);
+ }
+ spec->u.sp_symname = link_buf.lb_buf;
+ } else{
+ /* retrieve lov of the old object */
+ rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
+ if (rc != 0 && rc != -ENODATA)
+ RETURN(rc);
+ if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
+ spec->u.sp_ea.eadata = lmm_buf.lb_buf;
+ spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
+ spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
+ }
+ }
+
+ mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
+ mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_MIGRATE);
+ mgr_ea->lmv_stripe_count = cpu_to_le32(2);
+ mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
+ mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_MIGRATION);
+ fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
+ fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ GOTO(out_free, rc = PTR_ERR(handle));
+
+ /* Note: this transaction is part of migration, and it is not
+ * the last step of migration, so we set th_local = 1 to avoid
+ * update last rcvd for this transaction */
+ handle->th_local = 1;
+ rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
+ spec, la,
+ (union lmv_mds_md *)info->mti_xattr_buf,
+ handle);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ /* create the target object */
+ rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
+ NULL, handle);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
+ buf = mdd_buf_get_const(env, lmm_buf.lb_buf, lmm_buf.lb_len);
+ rc = mdo_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
+ 0, handle, mdd_object_capa(env, mdd_sobj));
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+ }
+
+ /* Set MIGRATE EA on the source inode, so once the migration needs
+ * to be re-done during failover, the re-do process can locate the
+ * target object which is already being created. */
+ mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_MIGRATE);
+ buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
+ rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0,
+ handle, mdd_object_capa(env, mdd_sobj));
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ /* Set immutable flag, so any modification is disabled until
+ * the migration is done. Once the migration is interrupted,
+ * if the resume process find the migrating object has both
+ * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
+ * flag and approve the migration */
+ la_flag->la_valid = LA_FLAGS;
+ la_flag->la_flags = LUSTRE_IMMUTABLE_FL;
+ mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
+ rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
+ mdd_object_capa(env, mdd_sobj));
+stop_trans:
+ if (handle != NULL)
+ mdd_trans_stop(env, mdd, rc, handle);
+out_free:
+ if (lmm_buf.lb_buf != NULL)
+ OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
+ RETURN(rc);
+}
+
+static int mdd_migrate_entries(const struct lu_env *env,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj)
+{
+ struct dt_object *next = mdd_object_child(mdd_sobj);
+ struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+ struct dt_object *dt_tobj = mdd_object_child(mdd_tobj);
+ struct thandle *handle;
+ struct dt_it *it;
+ const struct dt_it_ops *iops;
+ int rc;
+ int result;
+ struct lu_dirent *ent;
+ ENTRY;
+
+ OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
+ if (ent == NULL)
+ RETURN(-ENOMEM);
+
+ if (!dt_try_as_dir(env, next))
+ GOTO(out_ent, rc = -ENOTDIR);
+ /*
+ * iterate directories
+ */
+ iops = &next->do_index_ops->dio_it;
+ it = iops->init(env, next, LUDA_FID | LUDA_TYPE,
+ mdd_object_capa(env, mdd_sobj));
+ if (IS_ERR(it))
+ GOTO(out_ent, rc = PTR_ERR(it));
+
+ rc = iops->load(env, it, 0);
+ if (rc == 0)
+ rc = iops->next(env, it);
+ else if (rc > 0)
+ rc = 0;
+ /*
+ * At this point and across for-loop:
+ *
+ * rc == 0 -> ok, proceed.
+ * rc > 0 -> end of directory.
+ * rc < 0 -> error.
+ */
+ do {
+ struct mdd_object *child;
+ char *name = mdd_env_info(env)->mti_key;
+ int len;
+ int recsize;
+ int is_dir;
+ bool target_exist = false;
+
+ len = iops->key_size(env, it);
+ if (len == 0)
+ goto next;
+
+ result = iops->rec(env, it, (struct dt_rec *)ent,
+ LUDA_FID | LUDA_TYPE);
+ if (result == -ESTALE)
+ goto next;
+ if (result != 0) {
+ rc = result;
+ goto out;
+ }
+
+ fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
+ recsize = le16_to_cpu(ent->lde_reclen);
+
+ /* Insert new fid with target name into target dir */
+ if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
+ (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
+ ent->lde_name[1] == '.'))
+ goto next;
+
+ child = mdd_object_find(env, mdd, &ent->lde_fid);
+ if (IS_ERR(child))
+ GOTO(out, rc = PTR_ERR(child));
+
+ is_dir = S_ISDIR(lu_object_attr(&child->mod_obj.mo_lu));
+
+ snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
+
+ /* Check whether the name has been inserted to the target */
+ if (dt_try_as_dir(env, dt_tobj)) {
+ struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+
+ rc = dt_tobj->do_index_ops->dio_lookup(env, dt_tobj,
+ (struct dt_rec *)fid,
+ (struct dt_key *)name,
+ mdd_object_capa(env, mdd_tobj));
+ if (unlikely(rc == 0))
+ target_exist = true;
+ }
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ GOTO(out, rc = PTR_ERR(handle));
+
+ /* Note: this transaction is part of migration, and it is not
+ * the last step of migration, so we set th_local = 1 to avoid
+ * updating last rcvd for this transaction */
+ handle->th_local = 1;
+ if (likely(!target_exist)) {
+ rc = mdo_declare_index_insert(env, mdd_tobj,
+ &ent->lde_fid,
+ name, handle);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ if (is_dir) {
+ rc = mdo_declare_ref_add(env, mdd_tobj, handle);
+ if (rc != 0)
+ GOTO(out_put, rc);
+ }
+ }
+
+ rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ if (is_dir) {
+ rc = mdo_declare_ref_del(env, mdd_sobj, handle);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ /* Update .. for child */
+ rc = mdo_declare_index_delete(env, child, dotdot,
+ handle);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ rc = mdo_declare_index_insert(env, child,
+ mdd_object_fid(mdd_tobj),
+ dotdot, handle);
+ if (rc != 0)
+ GOTO(out_put, rc);
+ }
+
+ rc = mdd_linkea_declare_update_child(env, mdd_tobj,
+ child, name,
+ strlen(name),
+ handle);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc != 0) {
+ CERROR("%s: transaction start failed: rc = %d\n",
+ mdd2obd_dev(mdd)->obd_name, rc);
+ GOTO(out_put, rc);
+ }
+
+ if (likely(!target_exist)) {
+ rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
+ name, is_dir, handle,
+ mdd_object_capa(env, mdd_tobj));
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ if (is_dir) {
+ rc = mdo_ref_add(env, mdd_tobj, handle);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ }
+ }
+
+ rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle,
+ mdd_object_capa(env, mdd_sobj));
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ if (is_dir) {
+ rc = __mdd_index_delete_only(env, child, dotdot, handle,
+ mdd_object_capa(env, child));
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ rc = __mdd_index_insert_only(env, child,
+ mdd_object_fid(mdd_tobj),
+ dotdot, handle,
+ mdd_object_capa(env, child));
+ if (rc != 0)
+ GOTO(out_put, rc);
+ }
+
+ rc = mdd_linkea_update_child(env, mdd_tobj, child, name,
+ strlen(name), handle);
+
+out_put:
+ mdd_object_put(env, child);
+ mdd_trans_stop(env, mdd, rc, handle);
+ if (rc != 0)
+ GOTO(out, rc);
+next:
+ result = iops->next(env, it);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
+ GOTO(out, rc = -EINTR);
+
+ if (result == -ESTALE)
+ goto next;
+ } while (result == 0);
+out:
+ iops->put(env, it);
+ iops->fini(env, it);
+out_ent:
+ OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
+ RETURN(rc);
+}
+
+static int mdd_declare_update_linkea(const struct lu_env *env,
+ struct mdd_object *mdd_pobj,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj,
+ struct thandle *handle,
+ const struct lu_name *child_name)
+{
+ return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
+ child_name, handle, 1);
+}
+
+static int mdd_update_linkea(const struct lu_env *env,
+ struct mdd_object *mdd_pobj,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj,
+ struct thandle *handle,
+ const struct lu_name *child_name)
+{
+ return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
+ child_name, handle, 0);
+}
+
+static int mdd_declare_migrate_update_name(const struct lu_env *env,
+ struct mdd_object *mdd_pobj,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj,
+ const struct lu_name *lname,
+ struct lu_attr *la,
+ struct lu_attr *parent_la,
+ struct thandle *handle)
+{
+ struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
+ int rc;
+
+ /* Revert IMMUTABLE flag */
+ la_flag->la_valid = LA_FLAGS;
+ la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
+ mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
+ rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
+ if (rc != 0)
+ return rc;
+
+ /* delete entry from source dir */
+ rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
+ if (rc != 0)
+ return rc;
+
+ rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
+ mdd_tobj, handle, lname);
+ if (rc != 0)
+ return rc;
+
+ if (S_ISREG(mdd_object_type(mdd_sobj))) {
+ rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
+ handle);
+ if (rc != 0)
+ return rc;
+ }
+
+ if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+ rc = mdo_declare_ref_del(env, mdd_pobj, handle);
+ if (rc != 0)
+ return rc;
+ }
+
+ /* new name */
+ rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
+ lname->ln_name, handle);
+ if (rc != 0)
+ return rc;
+
+ if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+ rc = mdo_declare_ref_add(env, mdd_pobj, handle);
+ if (rc != 0)
+ return rc;
+ }
+
+ /* delete old object */
+ rc = mdo_declare_ref_del(env, mdd_sobj, handle);
+ if (rc != 0)
+ return rc;
+
+ if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+ /* delete old object */
+ rc = mdo_declare_ref_del(env, mdd_sobj, handle);
+ if (rc != 0)
+ return rc;
+ /* set nlink to 0 */
+ rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
+ if (rc != 0)
+ return rc;
+ }
+
+ rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
+
+ return rc;
+}
+
+static int mdd_migrate_update_name(const struct lu_env *env,
+ struct mdd_object *mdd_pobj,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj,
+ const struct lu_name *lname,
+ struct md_attr *ma)
+{
+ struct lu_attr *p_la = MDD_ENV_VAR(env, la_for_fix);
+ struct lu_attr *so_attr = MDD_ENV_VAR(env, cattr);
+ struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
+ struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+ struct thandle *handle;
+ int is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
+ const char *name = lname->ln_name;
+ int rc;
+ ENTRY;
+
+ /* update time for parent */
+ LASSERT(ma->ma_attr.la_valid & LA_CTIME);
+ p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
+ p_la->la_valid = LA_CTIME;
+
+ rc = mdd_la_get(env, mdd_sobj, so_attr, mdd_object_capa(env, mdd_sobj));
+ if (rc != 0)
+ RETURN(rc);
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ RETURN(PTR_ERR(handle));
+
+ rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
+ lname, so_attr, p_la, handle);
+ if (rc != 0) {
+ /* If the migration can not be fit in one transaction, just
+ * leave it in the original MDT */
+ if (rc == -E2BIG)
+ GOTO(stop_trans, rc = 0);
+ else
+ GOTO(stop_trans, rc);
+ }
+
+ CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
+ mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
+ PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
+ PFID(mdd_object_fid(mdd_tobj)));
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ /* Revert IMMUTABLE flag */
+ la_flag->la_valid = LA_FLAGS;
+ la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
+ mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
+ rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
+ mdd_object_capa(env, mdd_pobj));
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ /* Remove source name from source directory */
+ rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
+ mdd_object_capa(env, mdd_pobj));
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
+ handle, lname);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ if (S_ISREG(so_attr->la_mode)) {
+ if (so_attr->la_nlink == 1) {
+ rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
+ handle,
+ mdd_object_capa(env, mdd_sobj));
+ if (rc != 0 && rc != -ENODATA)
+ GOTO(stop_trans, rc);
+ }
+ }
+
+ /* Insert new fid with target name into target dir */
+ rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj), name,
+ is_dir, handle, mdd_object_capa(env, mdd_pobj));
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
+ NULL, 1);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
+ mdo_ref_del(env, mdd_sobj, handle);
+ if (is_dir)
+ mdo_ref_del(env, mdd_sobj, handle);
+
+ ma->ma_attr = *so_attr;
+ ma->ma_valid |= MA_INODE;
+ rc = mdd_finish_unlink(env, mdd_sobj, ma, handle);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
+ if (rc != 0)
+ GOTO(stop_trans, rc);
+
+ mdd_write_unlock(env, mdd_sobj);
+
+stop_trans:
+ mdd_trans_stop(env, mdd, rc, handle);
+
+ RETURN(rc);
+}
+
+/**
+ * Check whether we should migrate the file/dir
+ * return val
+ * < 0 permission check failed or other error.
+ * = 0 the file can be migrated.
+ * > 0 the file does not need to be migrated, mostly
+ * for multiple link file
+ **/
+static int mdd_migrate_sanity_check(const struct lu_env *env,
+ struct mdd_object *pobj,
+ const struct lu_attr *pattr,
+ struct mdd_object *sobj,
+ struct lu_attr *sattr)
+{
+ struct mdd_thread_info *info = mdd_env_info(env);
+ struct linkea_data *ldata = &info->mti_link_data;
+ int mgr_easize;
+ struct lu_buf *mgr_buf;
+ int count;
+ int rc;
+
+ ENTRY;
+
+ mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_MIGRATE);
+ mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
+ if (mgr_buf->lb_buf == NULL)
+ RETURN(-ENOMEM);
+
+ rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV,
+ mdd_object_capa(env, sobj));
+ if (rc > 0) {
+ union lmv_mds_md *lmm = mgr_buf->lb_buf;
+
+ /* If the object has migrateEA, it means IMMUTE flag
+ * is being set by previous migration process, so it
+ * needs to override the IMMUTE flag, otherwise the
+ * following sanity check will fail */
+ if (le32_to_cpu(lmm->lmv_md_v1.lmv_magic) ==
+ LMV_MAGIC_MIGRATE) {
+ struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+
+ sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
+ sobj->mod_flags &= ~IMMUTE_OBJ;
+ CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
+ mdd2obd_dev(mdd)->obd_name,
+ PFID(mdd_object_fid(sobj)));
+ }
+ }
+
+ rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
+ sobj, sattr, NULL, NULL);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* Then it will check if the file should be migrated. If the file
+ * has mulitple links, we only need migrate the file if all of its
+ * entries has been migrated to the remote MDT */
+ if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
+ RETURN(0);
+
+ rc = mdd_links_read(env, sobj, ldata);
+ if (rc != 0) {
+ /* For multiple links files, if there are no linkEA data at all,
+ * means the file might be created before linkEA is enabled, and
+ * all all of its links should not be migrated yet, otherwise
+ * it should have some linkEA there */
+ if (rc == -ENOENT || rc == -ENODATA)
+ RETURN(1);
+ RETURN(rc);
+ }
+
+ /* If it is mulitple links file, we need update the name entry for
+ * all parent */
+ LASSERT(ldata->ld_leh != NULL);
+ ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
+ for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
+ struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+ struct mdd_object *lpobj;
+ struct lu_name lname;
+ struct lu_fid fid;
+
+ linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
+ &lname, &fid);
+ ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
+ ldata->ld_reclen);
+ lpobj = mdd_object_find(env, mdd, &fid);
+ if (IS_ERR(lpobj)) {
+ CWARN("%s: cannot find obj "DFID": rc = %ld\n",
+ mdd2obd_dev(mdd)->obd_name, PFID(&fid),
+ PTR_ERR(lpobj));
+ continue;
+ }
+
+ if (!mdd_object_exists(lpobj) || mdd_object_remote(lpobj)) {
+ CDEBUG(D_INFO, DFID"%.*s: is on remote MDT.\n",
+ PFID(&fid), lname.ln_namelen, lname.ln_name);
+ mdd_object_put(env, lpobj);
+ continue;
+ }
+
+ CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
+ PFID(mdd_object_fid(sobj)), lname.ln_namelen,
+ lname.ln_name, PFID(&fid));
+ mdd_object_put(env, lpobj);
+ rc = 1;
+ break;
+ }
+
+ RETURN(rc);
+}
+
+static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
+ const struct lu_fid *lf, const struct lu_name *lname,
+ struct md_object *tobj, struct md_attr *ma)
+{
+ struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
+ struct mdd_device *mdd = mdo2mdd(pobj);
+ struct mdd_object *mdd_sobj = NULL;
+ struct mdd_object *mdd_tobj = NULL;
+ struct lu_attr *so_attr = MDD_ENV_VAR(env, cattr);
+ struct lu_attr *pattr = MDD_ENV_VAR(env, pattr);
+ int rc;
+
+ ENTRY;
+ /* object has to be locked by mdt, so it must exist */
+ mdd_sobj = mdd_object_find(env, mdd, lf);
+ LASSERT(mdd_sobj != NULL);
+
+ /* If the file will being migrated, it will check whether
+ * the file is being opened by someone else right now */
+ mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
+ if (mdd_sobj->mod_count >= 1) {
+ CERROR("%s: "DFID"%s is already opened count %d: rc = %d\n",
+ mdd2obd_dev(mdd)->obd_name,
+ PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
+ mdd_sobj->mod_count, -EBUSY);
+ mdd_read_unlock(env, mdd_sobj);
+ GOTO(put, rc = -EBUSY);
+ }
+ mdd_read_unlock(env, mdd_sobj);
+
+ rc = mdd_la_get(env, mdd_sobj, so_attr, mdd_object_capa(env, mdd_sobj));
+ if (rc != 0)
+ GOTO(put, rc);
+
+ rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(put, rc);
+
+ rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
+ if (rc != 0) {
+ if (rc > 0)
+ rc = 0;
+ GOTO(put, rc);
+ }
+
+ /* Sigh, it is impossible to finish all of migration in a single
+ * transaction, for example migrating big directory entries to the
+ * new MDT, it needs insert all of name entries of children in the
+ * new directory.
+ *
+ * So migration will be done in multiple steps and transactions.
+ *
+ * 1. create an orphan object on the remote MDT in one transaction.
+ * 2. migrate extend attributes to the new target file/directory.
+ * 3. For directory, migrate the entries to the new MDT and update
+ * linkEA of each children. Because we can not migrate all entries
+ * in a single transaction, so the migrating directory will become
+ * a striped directory during migration, so once the process is
+ * interrupted, the directory is still accessible. (During lookup,
+ * client will locate the name by searching both original and target
+ * object).
+ * 4. Finally, update the name/FID to point to the new file/directory
+ * in a separate transaction.
+ */
+
+ /* step 1: Check whether the orphan object has been created, and create
+ * orphan object on the remote MDT if needed */
+ mdd_tobj = md2mdd_obj(tobj);
+ if (!mdd_object_exists(mdd_tobj)) {
+ rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
+ so_attr);
+ if (rc != 0)
+ GOTO(put, rc);
+ }
+
+ /* step 2: migrate xattr */
+ rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
+ if (rc != 0)
+ GOTO(put, rc);
+
+ /* step 3: migrate name entries to the orphan object */
+ if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
+ rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
+ if (rc != 0)
+ GOTO(put, rc);
+ if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
+ OBD_FAIL_MDS_REINT_NET_REP)))
+ GOTO(put, rc = 0);
+ }
+
+ /* step 4: update name entry to the new object */
+ rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
+ ma);
+ if (rc != 0)
+ GOTO(put, rc);
+put:
+ if (mdd_sobj)
+ mdd_object_put(env, mdd_sobj);
+
+ RETURN(rc);
+}
+
const struct md_dir_operations mdd_dir_ops = {
- .mdo_is_subdir = mdd_is_subdir,
- .mdo_lookup = mdd_lookup,
- .mdo_create = mdd_create,
- .mdo_rename = mdd_rename,
- .mdo_link = mdd_link,
- .mdo_unlink = mdd_unlink,
- .mdo_create_data = mdd_create_data,
+ .mdo_is_subdir = mdd_is_subdir,
+ .mdo_lookup = mdd_lookup,
+ .mdo_create = mdd_create,
+ .mdo_rename = mdd_rename,
+ .mdo_link = mdd_link,
+ .mdo_unlink = mdd_unlink,
+ .mdo_create_data = mdd_create_data,
+ .mdo_migrate = mdd_migrate,
};
};
enum mod_flags {
- /* The dir object has been unlinked */
- DEAD_OBJ = 1 << 0,
- APPEND_OBJ = 1 << 1,
- IMMUTE_OBJ = 1 << 2,
- ORPHAN_OBJ = 1 << 3,
+ /* The dir object has been unlinked */
+ DEAD_OBJ = 1 << 0,
+ APPEND_OBJ = 1 << 1,
+ IMMUTE_OBJ = 1 << 2,
+ ORPHAN_OBJ = 1 << 3,
};
struct mdd_object {
struct dt_object_format mti_dof;
struct obd_quotactl mti_oqctl;
struct linkea_data mti_link_data;
+ struct md_op_spec mti_spec;
};
extern const char orph_index_name[];
int mdd_data_get(const struct lu_env *env, struct mdd_object *obj, void **data);
int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
struct lu_attr *la, struct lustre_capa *capa);
+void mdd_flags_xlate(struct mdd_object *obj, __u32 flags);
int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
struct md_attr *ma);
int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
struct mdd_object *c, struct lu_attr *attr,
struct thandle *handle,
- const struct md_op_spec *spec);
+ const struct md_op_spec *spec,
+ struct dt_allocation_hint *hint);
int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
struct md_attr *ma);
int mdd_link_buf_grow(const struct lu_env *env, ssize_t len);
extern const struct md_dir_operations mdd_dir_ops;
extern const struct md_object_operations mdd_obj_ops;
-
+int mdd_readlink(const struct lu_env *env, struct md_object *obj,
+ struct lu_buf *buf);
int accmode(const struct lu_env *env, const struct lu_attr *la, int flags);
extern struct lu_context_key mdd_thread_key;
extern const struct lu_device_operations mdd_lu_ops;
struct mdd_object *c,
struct lu_attr *attr,
struct thandle *handle,
- const struct md_op_spec *spec);
+ const struct md_op_spec *spec,
+ struct dt_allocation_hint *hint);
+int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
+ struct lu_buf *lmm_buf);
/* mdd_trans.c */
int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
struct mdd_object *child, const struct lu_attr *attr,
- const struct md_op_spec *spec);
+ const struct md_op_spec *spec,
+ struct dt_allocation_hint *hint);
static inline void mdd_object_get(struct mdd_object *o)
{
return mdo_attr_get(env, obj, la, capa);
}
-static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
+void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
{
obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
* Permission check is done when open,
* no need check again.
*/
-static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
- struct lu_buf *buf)
+int mdd_readlink(const struct lu_env *env, struct md_object *obj,
+ struct lu_buf *buf)
{
struct mdd_object *mdd_obj = md2mdd_obj(obj);
struct dt_object *next;
}
next = mdd_object_child(mdd_obj);
- mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
+ LASSERT(next != NULL);
+ LASSERT(next->do_body_ops != NULL);
+ LASSERT(next->do_body_ops->dbo_read != NULL);
+ mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
mdd_object_capa(env, mdd_obj));
mdd_read_unlock(env, mdd_obj);
struct mdd_object *c,
struct lu_attr *attr,
struct thandle *handle,
- const struct md_op_spec *spec)
+ const struct md_op_spec *spec,
+ struct dt_allocation_hint *hint)
{
struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
- struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
const struct dt_index_features *feat = spec->sp_feat;
int rc;
ENTRY;
int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
struct mdd_object *c, struct lu_attr *attr,
- struct thandle *handle,
- const struct md_op_spec *spec)
+ struct thandle *handle,
+ const struct md_op_spec *spec,
+ struct dt_allocation_hint *hint)
{
- struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
int rc;
ENTRY;
* read lov EA of an object
* return the lov EA in an allocated lu_buf
*/
-static int mdd_get_lov_ea(const struct lu_env *env,
- struct mdd_object *obj,
- struct lu_buf *lmm_buf)
+int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
+ struct lu_buf *lmm_buf)
{
struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
- int rc, sz;
+ int rc, bufsize;
ENTRY;
repeat:
}
if (rc < 0)
- GOTO(out, rc);
+ RETURN(rc);
if (rc == 0)
- GOTO(out, rc = -ENODATA);
+ RETURN(-ENODATA);
- sz = rc;
+ bufsize = rc;
if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
/* mti_big_buf was not allocated, so we have to
* allocate it based on the ea size */
buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
- sz);
+ bufsize);
if (buf->lb_buf == NULL)
GOTO(out, rc = -ENOMEM);
goto repeat;
}
- lu_buf_alloc(lmm_buf, sz);
+ lu_buf_alloc(lmm_buf, bufsize);
if (lmm_buf->lb_buf == NULL)
GOTO(out, rc = -ENOMEM);
- memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
+ memcpy(lmm_buf->lb_buf, buf->lb_buf, bufsize);
rc = 0;
EXIT;
void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
struct mdd_object *child, const struct lu_attr *attr,
- const struct md_op_spec *spec)
+ const struct md_op_spec *spec,
+ struct dt_allocation_hint *hint)
{
- struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
struct dt_object *nc = mdd_object_child(child);
hint->dah_eadata_len = 0;
}
- CDEBUG(D_INFO, DFID" eadata %p, len %d\n", PFID(mdd_object_fid(child)),
+ CDEBUG(D_INFO, DFID" eadata %p len %d\n", PFID(mdd_object_fid(child)),
hint->dah_eadata, hint->dah_eadata_len);
/* @hint will be initialized by underlying device. */
nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
RETURN(PTR_ERR(th));
rc = dt_declare_record_write(env, mdt->mdt_ck_obj,
- sizeof(*tmp) * 3, 0, th);
+ mdt_buf_const(env, NULL,
+ sizeof(*tmp) * 3), 0, th);
if (rc)
goto stop;
#include <lustre_acl.h>
#include <lustre_param.h>
#include <lustre_quota.h>
-#include <lustre_linkea.h>
#include <lustre_lfsck.h>
mdl_mode_t mdt_mdl_lock_modes[] = {
RETURN(rc);
}
-static int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
- struct md_attr *ma, const char *name)
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+ struct md_attr *ma, const char *name)
{
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
GOTO(out_ucred, rc = err_serious(rc));
if (mdt_check_resent(info, mdt_reconstruct, lhc)) {
- rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
+ DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
+ rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
GOTO(out_ucred, rc);
}
rc = mdt_reint_rec(info, lhc);
[REINT_RENAME] = &RQF_MDS_REINT_RENAME,
[REINT_OPEN] = &RQF_MDS_REINT_OPEN,
[REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
- [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK
+ [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK,
+ [REINT_MIGRATE] = &RQF_MDS_REINT_RENAME
};
ENTRY;
info->mti_spec.no_create = 0;
info->mti_spec.sp_rm_entry = 0;
+
+ info->mti_spec.u.sp_ea.eadata = NULL;
+ info->mti_spec.u.sp_ea.eadatalen = 0;
}
void mdt_thread_info_fini(struct mdt_thread_info *info)
site->ls_top_dev = &mdt->mdt_lu_dev;
mdt->mdt_child = lu2md_dev(mdt->mdt_child_exp->exp_obd->obd_lu_dev);
-
/* now connect to bottom OSD */
snprintf(name, MAX_OBD_NAME, "%s-osd", dev);
rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_bottom_exp);
mdt->mdt_bottom =
lu2dt_dev(mdt->mdt_bottom_exp->exp_obd->obd_lu_dev);
-
rc = lu_env_refill((struct lu_env *)env);
if (rc != 0)
CERROR("Failure to refill session: '%d'\n", rc);
int pli_fidcount; /**< number of \a pli_fids */
};
-static int mdt_links_read(struct mdt_thread_info *info,
- struct mdt_object *mdt_obj, struct linkea_data *ldata)
+int mdt_links_read(struct mdt_thread_info *info, struct mdt_object *mdt_obj,
+ struct linkea_data *ldata)
{
int rc;
#include <lustre_idmap.h>
#include <lustre_eacl.h>
#include <lustre_quota.h>
+#include <lustre_linkea.h>
/* check if request's xid is equal to last one or not*/
static inline int req_xid_is_last(struct ptlrpc_request *req)
int mdt_get_info(struct tgt_session_info *tsi);
int mdt_attr_get_complex(struct mdt_thread_info *info,
struct mdt_object *o, struct md_attr *ma);
-int mdt_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
- struct md_attr *ma, const char *name);
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+ struct md_attr *ma, const char *name);
int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
int created);
int mdt_object_is_som_enabled(struct mdt_object *mo);
int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
void *data, int flag);
+int mdt_links_read(struct mdt_thread_info *info,
+ struct mdt_object *mdt_obj,
+ struct linkea_data *ldata);
/* mdt_idmap.c */
int mdt_init_idmap(struct tgt_session_info *tsi);
void mdt_cleanup_idmap(struct mdt_export_data *);
info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
- rc = mdt_dlmreq_unpack(info);
- RETURN(rc);
+
+ rc = mdt_dlmreq_unpack(info);
+
+ RETURN(rc);
}
/*
[REINT_OPEN] = mdt_open_unpack,
[REINT_SETXATTR] = mdt_setxattr_unpack,
[REINT_RMENTRY] = mdt_rmentry_unpack,
+ [REINT_MIGRATE] = mdt_rename_unpack,
};
int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op)
ma->ma_valid = 0;
mutex_lock(&o->mot_lov_mutex);
if (!(o->mot_flags & MOF_LOV_CREATED)) {
- if (p != NULL && (fid_is_obf(mdt_object_fid(p)) ||
- fid_is_dot_lustre(mdt_object_fid(p))))
+ if (p != NULL && !fid_is_md_operative(mdt_object_fid(p)))
GOTO(unlock, rc = -EPERM);
rc = mdo_create_data(info->mti_env,
mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
if (result == -ENOENT) {
/* Create under OBF and .lustre is not permitted */
- if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+ if (!fid_is_md_operative(rr->rr_fid1))
GOTO(out_child, result = -EPERM);
/* save versions in reply */
"in "DFID,
PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
- if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+ if (!fid_is_md_operative(rr->rr_fid1))
RETURN(-EPERM);
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
- lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
-
parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
if (IS_ERR(parent))
RETURN(PTR_ERR(parent));
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
RETURN(err_serious(-ENOENT));
- if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+ if (!fid_is_md_operative(rr->rr_fid1))
RETURN(-EPERM);
/*
}
}
- if (fid_is_obf(child_fid) || fid_is_dot_lustre(child_fid))
+ if (!fid_is_md_operative(child_fid))
GOTO(unlock_parent, rc = -EPERM);
/* We will lock the child regardless it is local or remote. No harm. */
if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
RETURN(-EPERM);
- if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
- fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
+ if (!fid_is_md_operative(rr->rr_fid1) ||
+ !fid_is_md_operative(rr->rr_fid2))
RETURN(-EPERM);
/* step 1: find & lock the target parent dir */
int rc = 0;
ENTRY;
- do {
- LASSERT(fid_is_sane(&dst_fid));
- dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
- if (!IS_ERR(dst)) {
- rc = mdo_is_subdir(info->mti_env,
- mdt_object_child(dst), fid,
- &dst_fid);
- mdt_object_put(info->mti_env, dst);
- if (rc != -EREMOTE && rc < 0) {
- CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
- } else {
- /* check the found fid */
- if (lu_fid_eq(&dst_fid, fid))
- rc = -EINVAL;
- }
- } else {
- rc = PTR_ERR(dst);
- }
- } while (rc == -EREMOTE);
+ /* If the source and target are in the same directory, they can not
+ * be parent/child relationship, so subdir check is not needed */
+ if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
+ return 0;
+
+ do {
+ LASSERT(fid_is_sane(&dst_fid));
+ dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
+ if (!IS_ERR(dst)) {
+ rc = mdo_is_subdir(info->mti_env,
+ mdt_object_child(dst), fid,
+ &dst_fid);
+ mdt_object_put(info->mti_env, dst);
+ if (rc != -EREMOTE && rc < 0) {
+ CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
+ } else {
+ /* check the found fid */
+ if (lu_fid_eq(&dst_fid, fid))
+ rc = -EINVAL;
+ }
+ } else {
+ rc = PTR_ERR(dst);
+ }
+ } while (rc == -EREMOTE);
RETURN(rc);
}
+/* Update object linkEA */
+struct mdt_lock_list {
+ struct mdt_object *mll_obj;
+ struct mdt_lock_handle mll_lh;
+ struct list_head mll_list;
+};
+
+static void mdt_unlock_list(struct mdt_thread_info *info,
+ struct list_head *list, int rc)
+{
+ struct mdt_lock_list *mll;
+ struct mdt_lock_list *mll2;
+
+ list_for_each_entry_safe(mll, mll2, list, mll_list) {
+ mdt_object_unlock_put(info, mll->mll_obj, &mll->mll_lh, rc);
+ list_del(&mll->mll_list);
+ OBD_FREE_PTR(mll);
+ }
+}
+
+static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct mdt_object *pobj,
+ struct list_head *lock_list)
+{
+ struct lu_buf *buf = &info->mti_big_buf;
+ struct linkea_data ldata = { 0 };
+ int count;
+ int rc;
+ ENTRY;
+
+ if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
+ RETURN(0);
+
+ buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+ if (buf->lb_buf == NULL)
+ RETURN(-ENOMEM);
+
+ ldata.ld_buf = buf;
+ rc = mdt_links_read(info, obj, &ldata);
+ if (rc != 0) {
+ if (rc == -ENOENT || rc == -ENODATA)
+ rc = 0;
+ RETURN(rc);
+ }
+
+ LASSERT(ldata.ld_leh != NULL);
+ ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
+ for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_object *mdt_pobj;
+ struct mdt_lock_list *mll;
+ struct lu_name name;
+ struct lu_fid fid;
+
+ linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
+ &name, &fid);
+ ldata.ld_lee = (struct link_ea_entry *)((char *)ldata.ld_lee +
+ ldata.ld_reclen);
+ mdt_pobj = mdt_object_find(info->mti_env, mdt, &fid);
+ if (IS_ERR(mdt_pobj)) {
+ CWARN("%s: cannot find obj "DFID": rc = %ld\n",
+ mdt_obd_name(mdt), PFID(&fid), PTR_ERR(mdt_pobj));
+ continue;
+ }
+
+ if (!mdt_object_exists(mdt_pobj)) {
+ CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
+ mdt_obd_name(mdt), PFID(&fid));
+ mdt_object_put(info->mti_env, mdt_pobj);
+ continue;
+ }
+
+ if (mdt_pobj == pobj) {
+ CDEBUG(D_INFO, "%s: skipping parent obj "DFID"\n",
+ mdt_obd_name(mdt), PFID(&fid));
+ mdt_object_put(info->mti_env, mdt_pobj);
+ continue;
+ }
+
+ OBD_ALLOC_PTR(mll);
+ if (mll == NULL) {
+ mdt_object_put(info->mti_env, mdt_pobj);
+ GOTO(out, rc = -ENOMEM);
+ }
+
+ if (mdt_object_remote(mdt_pobj)) {
+ mdt_lock_reg_init(&mll->mll_lh, LCK_EX);
+ rc = mdt_remote_object_lock(info, mdt_pobj,
+ &mll->mll_lh.mlh_rreg_lh,
+ mll->mll_lh.mlh_rreg_mode,
+ MDS_INODELOCK_UPDATE);
+ } else {
+ mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
+ rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
+ MDS_INODELOCK_UPDATE,
+ MDT_LOCAL_LOCK);
+ }
+ if (rc != 0) {
+ CERROR("%s: cannot lock "DFID": rc =%d\n",
+ mdt_obd_name(mdt), PFID(&fid), rc);
+ mdt_object_put(info->mti_env, mdt_pobj);
+ OBD_FREE_PTR(mll);
+ GOTO(out, rc);
+ }
+
+ CFS_INIT_LIST_HEAD(&mll->mll_list);
+ mll->mll_obj = mdt_pobj;
+ list_add_tail(&mll->mll_list, lock_list);
+ }
+out:
+ if (rc != 0)
+ mdt_unlock_list(info, lock_list, rc);
+ RETURN(rc);
+}
+
+/* migrate files from one MDT to another MDT */
+static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
+{
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct md_attr *ma = &info->mti_attr;
+ struct mdt_object *msrcdir;
+ struct mdt_object *mold;
+ struct mdt_object *mnew = NULL;
+ struct mdt_lock_handle *lh_dirp;
+ struct mdt_lock_handle *lh_childp;
+ struct mdt_lock_handle *lh_tgtp = NULL;
+ struct lu_fid *old_fid = &info->mti_tmp_fid1;
+ struct list_head lock_list;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
+ PNAME(&rr->rr_name), PFID(rr->rr_fid2));
+ /* 1: lock the source dir. */
+ msrcdir = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+ if (IS_ERR(msrcdir)) {
+ CERROR("%s: cannot find source dir "DFID" : rc = %d\n",
+ mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+ (int)PTR_ERR(msrcdir));
+ RETURN(PTR_ERR(msrcdir));
+ }
+
+ lh_dirp = &info->mti_lh[MDT_LH_PARENT];
+ if (mdt_object_remote(msrcdir)) {
+ mdt_lock_reg_init(lh_dirp, LCK_EX);
+ rc = mdt_remote_object_lock(info, msrcdir,
+ &lh_dirp->mlh_rreg_lh,
+ lh_dirp->mlh_rreg_mode,
+ MDS_INODELOCK_UPDATE);
+ if (rc != ELDLM_OK)
+ GOTO(out_put_parent, rc);
+ } else {
+ mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
+ rc = mdt_object_lock(info, msrcdir, lh_dirp,
+ MDS_INODELOCK_UPDATE,
+ MDT_LOCAL_LOCK);
+ if (rc)
+ GOTO(out_put_parent, rc);
+
+ rc = mdt_version_get_check_save(info, msrcdir, 0);
+ if (rc)
+ GOTO(out_unlock_parent, rc);
+ }
+
+ /* 2: sanity check and find the object to be migrated. */
+ fid_zero(old_fid);
+ rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
+ if (rc != 0)
+ GOTO(out_unlock_parent, rc);
+
+ if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
+ GOTO(out_unlock_parent, rc = -EINVAL);
+
+ if (!fid_is_md_operative(old_fid))
+ GOTO(out_unlock_parent, rc = -EPERM);
+
+ mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
+ if (IS_ERR(mold))
+ GOTO(out_unlock_parent, rc = PTR_ERR(mold));
+
+ if (mdt_object_remote(mold)) {
+ CERROR("%s: source "DFID" is on the remote MDT\n",
+ mdt_obd_name(info->mti_mdt), PFID(old_fid));
+ GOTO(out_put_child, rc = -EREMOTE);
+ }
+
+ if (S_ISREG(lu_object_attr(&mold->mot_obj)) &&
+ !mdt_object_remote(msrcdir)) {
+ CERROR("%s: parent "DFID" is still on the same"
+ " MDT, which should be migrated first:"
+ " rc = %d\n", mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(msrcdir)), -EPERM);
+ GOTO(out_put_child, rc = -EPERM);
+ }
+
+ /* 3: iterate the linkea of the object and lock all of the objects */
+ CFS_INIT_LIST_HEAD(&lock_list);
+ rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list);
+ if (rc != 0)
+ GOTO(out_put_child, rc);
+
+ /* 4: lock of the object migrated object */
+ lh_childp = &info->mti_lh[MDT_LH_OLD];
+ mdt_lock_reg_init(lh_childp, LCK_EX);
+ rc = mdt_object_lock(info, mold, lh_childp,
+ MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+ MDS_INODELOCK_LAYOUT, MDT_CROSS_LOCK);
+ if (rc != 0)
+ GOTO(out_unlock_list, rc);
+
+ ma->ma_need = MA_LMV;
+ ma->ma_valid = 0;
+ ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
+ ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
+ rc = mdt_stripe_get(info, mold, ma, XATTR_NAME_LMV);
+ if (rc != 0)
+ GOTO(out_unlock_list, rc);
+
+ if ((ma->ma_valid & MA_LMV)) {
+ struct lmv_mds_md_v1 *lmm1;
+
+ lmv_le_to_cpu(ma->ma_lmv, ma->ma_lmv);
+ lmm1 = &ma->ma_lmv->lmv_md_v1;
+ if (lmm1->lmv_magic != LMV_MAGIC_MIGRATE) {
+ CERROR("%s: can not migrate striped dir "DFID
+ ": rc = %d\n", mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(mold)), -EPERM);
+ GOTO(out_unlock_child, rc = -EPERM);
+ }
+
+ if (!fid_is_sane(&lmm1->lmv_stripe_fids[1]))
+ GOTO(out_unlock_child, rc = -EINVAL);
+
+ mnew = mdt_object_find(info->mti_env, info->mti_mdt,
+ &lmm1->lmv_stripe_fids[1]);
+ if (IS_ERR(mnew))
+ GOTO(out_unlock_child, rc = PTR_ERR(mnew));
+
+ if (!mdt_object_remote(mnew)) {
+ CERROR("%s: "DFID" being migrated is on this MDT:"
+ " rc = %d\n", mdt_obd_name(info->mti_mdt),
+ PFID(rr->rr_fid2), -EPERM);
+ GOTO(out_put_new, rc = -EPERM);
+ }
+
+ lh_tgtp = &info->mti_lh[MDT_LH_CHILD];
+ mdt_lock_reg_init(lh_tgtp, LCK_EX);
+ rc = mdt_remote_object_lock(info, mnew,
+ &lh_tgtp->mlh_rreg_lh,
+ lh_tgtp->mlh_rreg_mode,
+ MDS_INODELOCK_UPDATE);
+ if (rc != 0) {
+ lh_tgtp = NULL;
+ GOTO(out_put_new, rc);
+ }
+ } else {
+ mnew = mdt_object_find(info->mti_env, info->mti_mdt,
+ rr->rr_fid2);
+ if (IS_ERR(mnew))
+ GOTO(out_unlock_child, rc = PTR_ERR(mnew));
+ if (!mdt_object_remote(mnew)) {
+ CERROR("%s: Migration "DFID" is on this MDT !\n",
+ mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid2));
+ GOTO(out_put_new, rc = -EXDEV);
+ }
+ }
+
+ /* 5: migrate it */
+ mdt_reint_init_ma(info, ma);
+
+ mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+ OBD_FAIL_MDS_REINT_RENAME_WRITE);
+
+ rc = mdo_migrate(info->mti_env, mdt_object_child(msrcdir),
+ old_fid, &rr->rr_name, mdt_object_child(mnew), ma);
+ if (rc != 0)
+ GOTO(out_unlock_new, rc);
+out_unlock_new:
+ if (lh_tgtp != NULL)
+ mdt_object_unlock(info, mnew, lh_tgtp, rc);
+out_put_new:
+ if (mnew)
+ mdt_object_put(info->mti_env, mnew);
+out_unlock_child:
+ mdt_object_unlock(info, mold, lh_childp, rc);
+out_unlock_list:
+ mdt_unlock_list(info, &lock_list, rc);
+out_put_child:
+ mdt_object_put(info->mti_env, mold);
+out_unlock_parent:
+ mdt_object_unlock(info, msrcdir, lh_dirp, rc);
+out_put_parent:
+ mdt_object_put(info->mti_env, msrcdir);
+
+ RETURN(rc);
+}
+
/*
* VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
* 2 - src child; 3 - tgt child.
* And tgt_c will be still in the same MDT as the original
* src_c.
*/
-static int mdt_reint_rename(struct mdt_thread_info *info,
- struct mdt_lock_handle *lhc)
+static int mdt_reint_rename_internal(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
{
- struct mdt_reint_record *rr = &info->mti_rr;
- struct md_attr *ma = &info->mti_attr;
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_object *msrcdir;
- struct mdt_object *mtgtdir;
- struct mdt_object *mold;
- struct mdt_object *mnew = NULL;
- struct mdt_lock_handle *lh_srcdirp;
- struct mdt_lock_handle *lh_tgtdirp;
- struct mdt_lock_handle *lh_oldp;
- struct mdt_lock_handle *lh_newp;
- struct lu_fid *old_fid = &info->mti_tmp_fid1;
- struct lu_fid *new_fid = &info->mti_tmp_fid2;
- struct lustre_handle rename_lh = { 0 };
- int rc;
- ENTRY;
-
- if (info->mti_dlm_req)
- ldlm_request_cancel(req, info->mti_dlm_req, 0);
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct md_attr *ma = &info->mti_attr;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_object *msrcdir;
+ struct mdt_object *mtgtdir;
+ struct mdt_object *mold;
+ struct mdt_object *mnew = NULL;
+ struct mdt_lock_handle *lh_srcdirp;
+ struct mdt_lock_handle *lh_tgtdirp;
+ struct mdt_lock_handle *lh_oldp = NULL;
+ struct mdt_lock_handle *lh_newp = NULL;
+ struct lu_fid *old_fid = &info->mti_tmp_fid1;
+ struct lu_fid *new_fid = &info->mti_tmp_fid2;
+ int rc;
+ ENTRY;
DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
PFID(rr->rr_fid1), PNAME(&rr->rr_name),
PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
- if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
- fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
- RETURN(-EPERM);
-
- rc = mdt_rename_lock(info, &rename_lh);
- if (rc) {
- CERROR("Can't lock FS for rename, rc %d\n", rc);
- RETURN(rc);
- }
-
- lh_newp = &info->mti_lh[MDT_LH_NEW];
-
- /* step 1: lock the source dir. */
- lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
+ /* step 1: lock the source dir. */
+ lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
- msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
- MDS_INODELOCK_UPDATE);
- if (IS_ERR(msrcdir))
- GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
+ msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
+ MDS_INODELOCK_UPDATE);
+ if (IS_ERR(msrcdir))
+ RETURN(PTR_ERR(msrcdir));
- rc = mdt_version_get_check_save(info, msrcdir, 0);
- if (rc)
- GOTO(out_unlock_source, rc);
+ rc = mdt_version_get_check_save(info, msrcdir, 0);
+ if (rc)
+ GOTO(out_unlock_source, rc);
- /* step 2: find & lock the target dir. */
- lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
+ /* step 2: find & lock the target dir. */
+ lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
- if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
- mdt_object_get(info->mti_env, msrcdir);
- mtgtdir = msrcdir;
- if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
- rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
- MDS_INODELOCK_UPDATE);
- if (rc)
- GOTO(out_unlock_source, rc);
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
- }
- } else {
- mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
- rr->rr_fid2);
- if (IS_ERR(mtgtdir))
- GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
+ if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
+ mdt_object_get(info->mti_env, msrcdir);
+ mtgtdir = msrcdir;
+ if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
+ rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
+ MDS_INODELOCK_UPDATE);
+ if (rc != 0)
+ GOTO(out_unlock_source, rc);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
+ }
+ } else {
+ mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
+ rr->rr_fid2);
+ if (IS_ERR(mtgtdir))
+ GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
- /* check early, the real version will be saved after locking */
- rc = mdt_version_get_check(info, mtgtdir, 1);
- if (rc)
- GOTO(out_put_target, rc);
+ /* check early, the real version will be saved after locking */
+ rc = mdt_version_get_check(info, mtgtdir, 1);
+ if (rc)
+ GOTO(out_put_target, rc);
if (unlikely(mdt_object_remote(mtgtdir))) {
CDEBUG(D_INFO, "Source dir "DFID" target dir "DFID
if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
GOTO(out_unlock_target, rc = -EINVAL);
- if (fid_is_obf(old_fid) || fid_is_dot_lustre(old_fid))
+ if (!fid_is_md_operative(old_fid))
GOTO(out_unlock_target, rc = -EPERM);
mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
if (IS_ERR(mold))
GOTO(out_unlock_target, rc = PTR_ERR(mold));
- lh_oldp = &info->mti_lh[MDT_LH_OLD];
- mdt_lock_reg_init(lh_oldp, LCK_EX);
- rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP |
- MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
- if (rc != 0) {
- mdt_object_put(info->mti_env, mold);
- GOTO(out_unlock_target, rc);
- }
-
tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
- /* save version after locking */
- mdt_version_get_save(info, mold, 2);
- mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
-
- /* step 4: find & lock the new object. */
- /* new target object may not exist now */
- /* lookup with version checking */
- fid_zero(new_fid);
+ /* save version after locking */
+ mdt_version_get_save(info, mold, 2);
+ mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
+
+ /* step 4: find & lock the new object. */
+ /* new target object may not exist now */
+ /* lookup with version checking */
+ fid_zero(new_fid);
rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
3);
- if (rc == 0) {
- /* the new_fid should have been filled at this moment */
- if (lu_fid_eq(old_fid, new_fid))
- GOTO(out_unlock_old, rc);
+ if (rc == 0) {
+ /* the new_fid should have been filled at this moment */
+ if (lu_fid_eq(old_fid, new_fid))
+ GOTO(out_put_old, rc);
- if (lu_fid_eq(new_fid, rr->rr_fid1) ||
- lu_fid_eq(new_fid, rr->rr_fid2))
- GOTO(out_unlock_old, rc = -EINVAL);
+ if (lu_fid_eq(new_fid, rr->rr_fid1) ||
+ lu_fid_eq(new_fid, rr->rr_fid2))
+ GOTO(out_put_old, rc = -EINVAL);
- if (fid_is_obf(new_fid) || fid_is_dot_lustre(new_fid))
- GOTO(out_unlock_old, rc = -EPERM);
+ if (!fid_is_md_operative(new_fid))
+ GOTO(out_put_old, rc = -EPERM);
if (mdt_object_remote(mold)) {
CDEBUG(D_INFO, "Src child "DFID" is on another MDT\n",
PFID(old_fid));
- GOTO(out_unlock_old, rc = -EXDEV);
+ GOTO(out_put_old, rc = -EXDEV);
}
- mdt_lock_reg_init(lh_newp, LCK_EX);
- mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
- if (IS_ERR(mnew))
- GOTO(out_unlock_old, rc = PTR_ERR(mnew));
+ mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
+ if (IS_ERR(mnew))
+ GOTO(out_put_old, rc = PTR_ERR(mnew));
if (mdt_object_remote(mnew)) {
- mdt_object_put(info->mti_env, mnew);
CDEBUG(D_INFO, "src child "DFID" is on another MDT\n",
PFID(new_fid));
- GOTO(out_unlock_old, rc = -EXDEV);
+ GOTO(out_put_new, rc = -EXDEV);
}
+ lh_oldp = &info->mti_lh[MDT_LH_OLD];
+ mdt_lock_reg_init(lh_oldp, LCK_EX);
+ rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
+ if (rc != 0)
+ GOTO(out_put_new, rc);
+
/* We used to acquire MDS_INODELOCK_FULL here but we
* can't do this now because a running HSM restore on
* the rename onto victim will hold the layout
* lock. See LU-4002. */
+
+ lh_newp = &info->mti_lh[MDT_LH_NEW];
+ mdt_lock_reg_init(lh_newp, LCK_EX);
rc = mdt_object_lock(info, mnew, lh_newp,
MDS_INODELOCK_LOOKUP |
MDS_INODELOCK_UPDATE,
MDT_CROSS_LOCK);
- if (rc != 0) {
- mdt_object_put(info->mti_env, mnew);
- GOTO(out_unlock_old, rc);
- }
- /* get and save version after locking */
- mdt_version_get_save(info, mnew, 3);
- mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
- } else if (rc != -EREMOTE && rc != -ENOENT) {
- GOTO(out_unlock_old, rc);
- } else {
+ if (rc != 0)
+ GOTO(out_unlock_old, rc);
+
+ /* get and save version after locking */
+ mdt_version_get_save(info, mnew, 3);
+ mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
+ } else if (rc != -EREMOTE && rc != -ENOENT) {
+ GOTO(out_put_old, rc);
+ } else {
/* If mnew does not exist and mold are remote directory,
* it only allows rename if they are under same directory */
if (mtgtdir != msrcdir && mdt_object_remote(mold)) {
CDEBUG(D_INFO, "Src child "DFID" is on another MDT\n",
PFID(old_fid));
- GOTO(out_unlock_old, rc = -EXDEV);
+ GOTO(out_put_old, rc = -EXDEV);
}
+
+ lh_oldp = &info->mti_lh[MDT_LH_OLD];
+ mdt_lock_reg_init(lh_oldp, LCK_EX);
+ rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
+ if (rc != 0)
+ GOTO(out_put_old, rc);
+
mdt_enoent_version_save(info, 3);
- }
+ }
- /* step 5: rename it */
- mdt_reint_init_ma(info, ma);
+ /* step 5: rename it */
+ mdt_reint_init_ma(info, ma);
- mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
- OBD_FAIL_MDS_REINT_RENAME_WRITE);
+ mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+ OBD_FAIL_MDS_REINT_RENAME_WRITE);
/* Check if @dst is subdir of @src. */
rc = mdt_rename_sanity(info, old_fid);
mdt_handle_last_unlink(info, mnew, ma);
mdt_rename_counter_tally(info, info->mti_mdt, req,
- msrcdir, mtgtdir);
- }
+ msrcdir, mtgtdir);
+ }
- EXIT;
+ EXIT;
out_unlock_new:
- if (mnew)
- mdt_object_unlock_put(info, mnew, lh_newp, rc);
+ if (mnew != NULL)
+ mdt_object_unlock(info, mnew, lh_newp, rc);
out_unlock_old:
- mdt_object_unlock_put(info, mold, lh_oldp, rc);
+ mdt_object_unlock(info, mold, lh_oldp, rc);
+out_put_new:
+ if (mnew != NULL)
+ mdt_object_put(info->mti_env, mnew);
+out_put_old:
+ mdt_object_put(info->mti_env, mold);
out_unlock_target:
- mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
+ mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
out_put_target:
- mdt_object_put(info->mti_env, mtgtdir);
+ mdt_object_put(info->mti_env, mtgtdir);
out_unlock_source:
- mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
-out_rename_lock:
+ mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
+ return rc;
+}
+
+static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc,
+ bool rename)
+{
+ struct mdt_reint_record *rr = &info->mti_rr;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct lustre_handle rename_lh = { 0 };
+ int rc;
+ ENTRY;
+
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(req, info->mti_dlm_req, 0);
+
+ if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
+ fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
+ RETURN(-EPERM);
+
+ rc = mdt_rename_lock(info, &rename_lh);
+ if (rc != 0) {
+ CERROR("%s: can't lock FS for rename: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), rc);
+ RETURN(rc);
+ }
+
+ if (rename)
+ rc = mdt_reint_rename_internal(info, lhc);
+ else
+ rc = mdt_reint_migrate_internal(info, lhc);
+
if (lustre_handle_is_used(&rename_lh))
mdt_rename_unlock(&rename_lh);
- return rc;
+
+ RETURN(rc);
+}
+
+static int mdt_reint_rename(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
+{
+ return mdt_reint_rename_or_migrate(info, lhc, true);
+}
+
+static int mdt_reint_migrate(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
+{
+ return mdt_reint_rename_or_migrate(info, lhc, false);
}
typedef int (*mdt_reinter)(struct mdt_thread_info *info,
[REINT_RENAME] = mdt_reint_rename,
[REINT_OPEN] = mdt_reint_open,
[REINT_SETXATTR] = mdt_reint_setxattr,
- [REINT_RMENTRY] = mdt_reint_unlink
+ [REINT_RMENTRY] = mdt_reint_unlink,
+ [REINT_MIGRATE] = mdt_reint_migrate,
};
int mdt_reint_rec(struct mdt_thread_info *info,
GOTO(out_put, rc = PTR_ERR(th));
th->th_sync = 1; /* update table synchronously */
- rc = dt_declare_record_write(env, fsdb, buf.lb_len, off, th);
+ rc = dt_declare_record_write(env, fsdb, &buf, off, th);
if (rc)
GOTO(out, rc);
* Numbers are always big-endian
* \retval record length
*/
-static int linkea_entry_pack(struct link_ea_entry *lee,
- const struct lu_name *lname,
- const struct lu_fid *pfid)
+int linkea_entry_pack(struct link_ea_entry *lee, const struct lu_name *lname,
+ const struct lu_fid *pfid)
{
struct lu_fid tmpfid;
int reclen;
lee->lee_reclen[1] = reclen & 0xff;
return reclen;
}
+EXPORT_SYMBOL(linkea_entry_pack);
void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
struct lu_name *lname, struct lu_fid *pfid)
o = loghandle->lgh_obj;
LASSERT(o);
+ lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr);
+ lgi->lgi_buf.lb_buf = NULL;
/* each time we update header */
- rc = dt_declare_record_write(env, o, sizeof(struct llog_log_hdr), 0,
+ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0,
th);
if (rc || idx == 0) /* if error or just header */
RETURN(rc);
lgi->lgi_off = 0;
}
+ lgi->lgi_buf.lb_len = 32 * 1024;
+ lgi->lgi_buf.lb_buf = NULL;
/* XXX: implement declared window or multi-chunks approach */
- rc = dt_declare_record_write(env, o, 32 * 1024, lgi->lgi_off, th);
+ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th);
RETURN(rc);
}
if (rc)
RETURN(rc);
- rc = dt_declare_record_write(env, o, LLOG_CHUNK_SIZE, 0, th);
+ lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE;
+ lgi->lgi_buf.lb_buf = NULL;
+ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0, th);
if (rc)
RETURN(rc);
if (IS_ERR(th))
GOTO(out, rc = PTR_ERR(th));
- rc = dt_declare_record_write(env, o, size, lgi->lgi_off, th);
+ lgi->lgi_buf.lb_len = size;
+ lgi->lgi_buf.lb_buf = idarray;
+ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th);
if (rc)
GOTO(out, rc);
if (rc)
GOTO(out_trans, rc);
- lgi->lgi_buf.lb_buf = idarray;
- lgi->lgi_buf.lb_len = size;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc)
CDEBUG(D_INODE, "error writeing CATALOGS: rc = %d\n", rc);
/* update fid generation file */
if (los != NULL) {
LASSERT(dt_object_exists(los->los_obj));
+ dti->dti_lb.lb_buf = NULL;
+ dti->dti_lb.lb_len = sizeof(struct los_ondisk);
rc = dt_declare_record_write(env, los->los_obj,
- sizeof(struct los_ondisk), 0, th);
+ &dti->dti_lb, 0, th);
if (rc)
RETURN(rc);
}
if (rc)
GOTO(out_trans, rc);
- rc = dt_declare_record_write(env, o, sizeof(lastid), 0, th);
+ lastid = cpu_to_le64(first_oid);
+
+ dti->dti_off = 0;
+ dti->dti_lb.lb_buf = &lastid;
+ dti->dti_lb.lb_len = sizeof(lastid);
+ rc = dt_declare_record_write(env, o, &dti->dti_lb, dti->dti_off,
+ th);
if (rc)
GOTO(out_trans, rc);
if (rc)
GOTO(out_lock, rc);
- lastid = cpu_to_le64(first_oid);
-
- dti->dti_off = 0;
- dti->dti_lb.lb_buf = &lastid;
- dti->dti_lb.lb_len = sizeof(lastid);
rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
if (rc)
GOTO(out_lock, rc);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
- rc = dt_declare_record_write(env, dt, buf->lb_len, *off, th);
+ rc = dt_declare_record_write(env, dt, buf, *off, th);
if (rc == 0) {
rc = dt_trans_start_local(env, ofd->ofd_osd, th);
if (rc == 0)
GOTO(out, rc = PTR_ERR(th));
rc = dt_declare_record_write(&env, ofd->ofd_health_check_file,
- info->fti_buf.lb_len, info->fti_off, th);
+ &info->fti_buf, info->fti_off, th);
if (rc == 0) {
th->th_sync = 1; /* sync IO is needed */
rc = dt_trans_start_local(&env, ofd->ofd_osd, th);
th->th_sync |= sync;
- rc = dt_declare_record_write(env, oseq->os_lastid_obj, sizeof(tmp),
+ rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
info->fti_off, th);
if (rc)
GOTO(trans_stop, rc);
/*
* Concurrency: doesn't matter
*/
-static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
-{
- return osd_oti_get(env)->oti_r_locks > 0;
-}
/*
* Concurrency: doesn't matter
LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
- LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
return -EACCES;
* in the cache, otherwise lu_object_alloc() crashes
* -bzzz
*/
- luch = lu_object_find_at(env, ludev, fid, NULL);
- if (!IS_ERR(luch)) {
- if (lu_object_exists(luch)) {
- lo = lu_object_locate(luch->lo_header, ludev->ld_type);
- if (lo != NULL)
- child = osd_obj(lo);
- else
- LU_OBJECT_DEBUG(D_ERROR, env, luch,
- "lu_object can't be located"
+ luch = lu_object_find_at(env, ludev->ld_site->ls_top_dev == NULL ?
+ ludev : ludev->ld_site->ls_top_dev,
+ fid, NULL);
+ if (!IS_ERR(luch)) {
+ if (lu_object_exists(luch)) {
+ lo = lu_object_locate(luch->lo_header, ludev->ld_type);
+ if (lo != NULL)
+ child = osd_obj(lo);
+ else
+ LU_OBJECT_DEBUG(D_ERROR, env, luch,
+ "lu_object can't be located"
DFID"\n", PFID(fid));
if (child == NULL) {
int rc;
ENTRY;
- LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
+ LASSERT(!dt_object_remote(dt));
LASSERT(handle != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
}
static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
- const loff_t size, loff_t pos,
- struct thandle *handle)
+ const struct lu_buf *buf, loff_t pos,
+ struct thandle *handle)
{
struct osd_thandle *oh;
int credits;
inode = osd_dt_obj(dt)->oo_inode;
LASSERT(inode);
- rc = dt_declare_record_write(env, dt, inode->i_sb->s_blocksize * 2, 0, th);
+ rc = dt_declare_record_write(env, dt, NULL, 0, th);
if (rc)
GOTO(out, rc);
}
static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
- const loff_t size, loff_t pos,
+ const struct lu_buf *buf, loff_t pos,
struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
}
- dmu_tx_hold_write(oh->ot_tx, oid, pos, size);
+ dmu_tx_hold_write(oh->ot_tx, oid, pos, buf->lb_len);
/* dt_declare_write() is usually called for system objects, such
* as llog or last_rcvd files. We needn't enforce quota on those
if (IS_ERR(th))
RETURN(PTR_ERR(th));
- rc = dt_declare_record_write(env, dt_obj, buf->lb_len, offset, th);
+ rc = dt_declare_record_write(env, dt_obj, buf, offset, th);
if (rc)
GOTO(out, rc);
rc = dt_trans_start_local(env, osp->opd_storage, th);
extern struct lu_object_operations osp_lu_obj_ops;
extern const struct dt_device_operations osp_dt_ops;
extern struct dt_object_operations osp_md_obj_ops;
+extern struct dt_body_operations osp_md_body_ops;
struct osp_thread_info {
struct lu_buf osi_lb;
{
LASSERT(ah);
- memset(ah, 0, sizeof(*ah));
ah->dah_parent = parent;
ah->dah_mode = child_mode;
}
.do_object_lock = osp_md_object_lock,
.do_object_unlock = osp_md_object_unlock,
};
+
+static ssize_t osp_md_declare_write(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ loff_t pos, struct thandle *th)
+{
+ struct dt_update_request *update;
+ struct lu_fid *fid;
+ int sizes[2] = {buf->lb_len, sizeof(pos)};
+ const char *bufs[2] = {(char *)buf->lb_buf,
+ (char *)&pos};
+ ssize_t rc;
+
+ update = out_find_create_update_loc(th, dt);
+ if (IS_ERR(update)) {
+ CERROR("%s: Get OSP update buf failed: rc = %d\n",
+ dt->do_lu.lo_dev->ld_obd->obd_name,
+ (int)PTR_ERR(update));
+ return PTR_ERR(update);
+ }
+
+ pos = cpu_to_le64(pos);
+ bufs[1] = (char *)&pos;
+ fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+ rc = out_insert_update(env, update, OUT_WRITE, fid,
+ ARRAY_SIZE(sizes), sizes, bufs);
+
+ return rc;
+
+}
+
+static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, loff_t *pos,
+ struct thandle *handle,
+ struct lustre_capa *capa, int ignore_quota)
+{
+ return buf->lb_len;
+}
+
+/* These body operation will be used to write symlinks during migration etc */
+struct dt_body_operations osp_md_body_ops = {
+ .dbo_declare_write = osp_md_declare_write,
+ .dbo_write = osp_md_write,
+};
if (unlikely(!fid_is_zero(fid))) {
/* replay case: caller knows fid */
osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
+ osi->osi_lb.lb_len = sizeof(osi->osi_id);
+ osi->osi_lb.lb_buf = NULL;
rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
- sizeof(osi->osi_id), osi->osi_off,
- th);
+ &osi->osi_lb, osi->osi_off, th);
RETURN(rc);
}
/* common for all OSPs file hystorically */
osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
+ osi->osi_lb.lb_len = sizeof(osi->osi_id);
+ osi->osi_lb.lb_buf = NULL;
rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
- sizeof(osi->osi_id), osi->osi_off,
- th);
+ &osi->osi_lb, osi->osi_off, th);
} else {
/* not needed in the cache anymore */
set_bit(LU_OBJECT_HEARD_BANSHEE,
struct lu_attr *la = &osp_env_info(env)->osi_attr;
po->opo_obj.do_ops = &osp_md_obj_ops;
+ po->opo_obj.do_body_ops = &osp_md_body_ops;
rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
la, NULL);
if (rc == 0)
th->th_sync |= sync;
rc = dt_declare_record_write(env, osp->opd_last_used_oid_file,
- lb_oid->lb_len, oid_off, th);
+ lb_oid, oid_off, th);
if (rc != 0)
GOTO(out, rc);
rc = dt_declare_record_write(env, osp->opd_last_used_seq_file,
- lb_oseq->lb_len, oseq_off, th);
+ lb_oseq, oseq_off, th);
if (rc != 0)
GOTO(out, rc);
int rc = 0;
LASSERT(tu != NULL);
+ LASSERT(tu != LP_POISON);
/* Check whether there are updates related with this OSP */
dt_update = out_find_update(tu, dt);
if (dt_update == NULL) {
(long long)REINT_SETXATTR);
LASSERTF(REINT_RMENTRY == 8, "found %lld\n",
(long long)REINT_RMENTRY);
- LASSERTF(REINT_MAX == 9, "found %lld\n",
+ LASSERTF(REINT_MIGRATE == 9, "found %lld\n",
+ (long long)REINT_MIGRATE);
+ LASSERTF(REINT_MAX == 10, "found %lld\n",
(long long)REINT_MAX);
LASSERTF(DISP_IT_EXECD == 0x00000001UL, "found 0x%.8xUL\n",
(unsigned)DISP_IT_EXECD);
return &ta->ta_args[i];
}
-static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
- struct thandle_exec_args *ta, struct obd_export *exp)
-{
- memset(ta, 0, sizeof(*ta));
- ta->ta_handle = dt_trans_create(env, dt);
- if (IS_ERR(ta->ta_handle)) {
- int rc;
-
- CERROR("%s: start handle error: rc = %ld\n",
- dt_obd_name(dt), PTR_ERR(ta->ta_handle));
- rc = PTR_ERR(ta->ta_handle);
- ta->ta_handle = NULL;
- return rc;
- }
- ta->ta_dev = dt;
- if (exp->exp_need_sync)
- ta->ta_handle->th_sync = 1;
-
- return 0;
-}
-
-static int out_trans_start(const struct lu_env *env,
- struct thandle_exec_args *ta)
-{
- return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
-}
-
-static int out_trans_stop(const struct lu_env *env,
- struct thandle_exec_args *ta, int err)
-{
- int i;
- int rc;
-
- ta->ta_handle->th_result = err;
- rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
- for (i = 0; i < ta->ta_argno; i++) {
- if (ta->ta_args[i].object != NULL) {
- lu_object_put(env, &ta->ta_args[i].object->do_lu);
- ta->ta_args[i].object = NULL;
- }
- }
-
- return rc;
-}
-
-int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
-{
- struct tgt_session_info *tsi = tgt_ses_info(env);
- int i = 0, rc;
-
- LASSERT(ta->ta_dev);
- LASSERT(ta->ta_handle);
-
- if (ta->ta_err != 0 || ta->ta_argno == 0)
- GOTO(stop, rc = ta->ta_err);
-
- rc = out_trans_start(env, ta);
- if (unlikely(rc))
- GOTO(stop, rc);
-
- for (i = 0; i < ta->ta_argno; i++) {
- rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
- &ta->ta_args[i]);
- if (unlikely(rc)) {
- CDEBUG(D_INFO, "error during execution of #%u from"
- " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
- ta->ta_args[i].line, rc);
- while (--i >= 0) {
- LASSERTF(ta->ta_args[i].undo_fn != NULL,
- "can't undo changes, hope for failover!\n");
- ta->ta_args[i].undo_fn(env, ta->ta_handle,
- &ta->ta_args[i]);
- }
- break;
- }
- }
-
- /* Only fail for real update */
- tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
-stop:
- CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
- dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
- out_trans_stop(env, ta, rc);
- ta->ta_handle = NULL;
- ta->ta_argno = 0;
- ta->ta_err = 0;
-
- RETURN(rc);
-}
-
static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
struct dt_object *obj,
struct object_update_reply *reply,
ENTRY;
- if (!lu_object_exists(&obj->do_lu))
+ if (!lu_object_exists(&obj->do_lu)) {
+ /* Usually, this will be called when the master MDT try
+ * to init a remote object(see osp_object_init), so if
+ * the object does not exist on slave, we need set BANSHEE flag,
+ * so the object can be removed from the cache immediately */
+ set_bit(LU_OBJECT_HEARD_BANSHEE,
+ &obj->do_lu.lo_header->loh_flags);
RETURN(-ENOENT);
+ }
dt_read_lock(env, obj, MOR_TGT_CHILD);
rc = dt_attr_get(env, obj, la, NULL);
dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
arg->u.xattr_set.name, arg->u.xattr_set.flags);
+ if (!lu_object_exists(&dt_obj->do_lu))
+ GOTO(out, rc = -ENOENT);
+
dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
arg->u.xattr_set.name, arg->u.xattr_set.flags,
**/
if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
rc = 0;
-
+out:
CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
LASSERT(ta->ta_handle != NULL);
- if (lu_object_exists(&dt_obj->do_lu)) {
- if (dt_try_as_dir(env, dt_obj) == 0) {
- ta->ta_err = -ENOTDIR;
- return ta->ta_err;
- }
- ta->ta_err = dt_declare_insert(env, dt_obj,
- (struct dt_rec *)fid,
- (struct dt_key *)name,
- ta->ta_handle);
+ if (dt_try_as_dir(env, dt_obj) == 0) {
+ ta->ta_err = -ENOTDIR;
+ return ta->ta_err;
}
+ ta->ta_err = dt_declare_insert(env, dt_obj,
+ (struct dt_rec *)fid,
+ (struct dt_key *)name,
+ ta->ta_handle);
+
if (ta->ta_err != 0)
return ta->ta_err;
RETURN(rc);
}
+static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
+ struct tx_arg *arg)
+{
+ struct dt_object *dt_obj = arg->object;
+ int rc;
+
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+ rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
+ &arg->u.write.pos, th);
+ dt_write_unlock(env, dt_obj);
+
+ if (rc == 0)
+ rc = arg->u.write.buf.lb_len;
+
+ object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
+
+ return rc > 0 ? 0 : rc;
+}
+
+static int __out_tx_write(const struct lu_env *env,
+ struct dt_object *dt_obj,
+ const struct lu_buf *buf,
+ loff_t pos, struct thandle_exec_args *ta,
+ struct object_update_reply *reply,
+ int index, char *file, int line)
+{
+ struct tx_arg *arg;
+
+ LASSERT(ta->ta_handle != NULL);
+ ta->ta_err = dt_declare_record_write(env, dt_obj, buf, pos,
+ ta->ta_handle);
+ if (ta->ta_err != 0)
+ return ta->ta_err;
+
+ arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
+ LASSERT(arg);
+ lu_object_get(&dt_obj->do_lu);
+ arg->object = dt_obj;
+ arg->u.write.buf = *buf;
+ arg->u.write.pos = pos;
+ arg->reply = reply;
+ arg->index = index;
+ return 0;
+}
+
+static int out_write(struct tgt_session_info *tsi)
+{
+ struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
+ struct object_update *update = tti->tti_u.update.tti_update;
+ struct dt_object *obj = tti->tti_u.update.tti_dt_object;
+ struct lu_buf *lbuf = &tti->tti_buf;
+ char *buf;
+ char *tmp;
+ int buf_len = 0;
+ loff_t pos;
+ int rc;
+ ENTRY;
+
+ buf = object_update_param_get(update, 0, &buf_len);
+ if (buf == NULL || buf_len == 0) {
+ CERROR("%s: empty buf for xattr set: rc = %d\n",
+ tgt_name(tsi->tsi_tgt), -EPROTO);
+ RETURN(err_serious(-EPROTO));
+ }
+ lbuf->lb_buf = buf;
+ lbuf->lb_len = buf_len;
+
+ tmp = (char *)object_update_param_get(update, 1, NULL);
+ if (tmp == NULL) {
+ CERROR("%s: empty flag for xattr set: rc = %d\n",
+ tgt_name(tsi->tsi_tgt), -EPROTO);
+ RETURN(err_serious(-EPROTO));
+ }
+
+ if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+ __swab64s((__u64 *)tmp);
+ pos = *(loff_t *)tmp;
+
+ rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
+ &tti->tti_tea,
+ tti->tti_u.update.tti_update_reply,
+ tti->tti_u.update.tti_update_reply_index);
+ RETURN(rc);
+}
+
#define DEF_OUT_HNDL(opc, name, flags, fn) \
[opc - OUT_CREATE] = { \
.th_name = name, \
MUTABOR | HABEO_REFERO, out_index_insert),
DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
MUTABOR | HABEO_REFERO, out_index_delete),
+ DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
};
struct tgt_handler *out_handler_find(__u32 opc)
return h;
}
+static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
+ struct thandle_exec_args *ta, struct obd_export *exp)
+{
+ memset(ta, 0, sizeof(*ta));
+ ta->ta_handle = dt_trans_create(env, dt);
+ if (IS_ERR(ta->ta_handle)) {
+ int rc;
+
+ rc = PTR_ERR(ta->ta_handle);
+ ta->ta_handle = NULL;
+ CERROR("%s: start handle error: rc = %d\n",
+ dt_obd_name(dt), rc);
+ return rc;
+ }
+ ta->ta_dev = dt;
+ if (exp->exp_need_sync)
+ ta->ta_handle->th_sync = 1;
+
+ return 0;
+}
+
+static int out_trans_start(const struct lu_env *env,
+ struct thandle_exec_args *ta)
+{
+ return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
+}
+
+static int out_trans_stop(const struct lu_env *env,
+ struct thandle_exec_args *ta, int err)
+{
+ int i;
+ int rc;
+
+ ta->ta_handle->th_result = err;
+ rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
+ for (i = 0; i < ta->ta_argno; i++) {
+ if (ta->ta_args[i].object != NULL) {
+ struct dt_object *obj = ta->ta_args[i].object;
+
+ /* If the object is being created during this
+ * transaction, we need to remove them from the
+ * cache immediately, because a few layers are
+ * missing in OUT handler, i.e. the object might
+ * not be initialized in all layers */
+ if (ta->ta_args[i].exec_fn == out_tx_create_exec)
+ set_bit(LU_OBJECT_HEARD_BANSHEE,
+ &obj->do_lu.lo_header->loh_flags);
+ lu_object_put(env, &ta->ta_args[i].object->do_lu);
+ ta->ta_args[i].object = NULL;
+ }
+ }
+
+ return rc;
+}
+
+int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
+{
+ struct tgt_session_info *tsi = tgt_ses_info(env);
+ int i = 0, rc;
+
+ LASSERT(ta->ta_dev);
+ LASSERT(ta->ta_handle);
+
+ if (ta->ta_err != 0 || ta->ta_argno == 0)
+ GOTO(stop, rc = ta->ta_err);
+
+ rc = out_trans_start(env, ta);
+ if (unlikely(rc))
+ GOTO(stop, rc);
+
+ for (i = 0; i < ta->ta_argno; i++) {
+ rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
+ &ta->ta_args[i]);
+ if (unlikely(rc != 0)) {
+ CDEBUG(D_INFO, "error during execution of #%u from"
+ " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
+ ta->ta_args[i].line, rc);
+ while (--i >= 0) {
+ if (ta->ta_args[i].undo_fn != NULL)
+ ta->ta_args[i].undo_fn(env,
+ ta->ta_handle,
+ &ta->ta_args[i]);
+ else
+ CERROR("%s: undo for %s:%d: rc = %d\n",
+ dt_obd_name(ta->ta_dev),
+ ta->ta_args[i].file,
+ ta->ta_args[i].line, -ENOTSUPP);
+ }
+ break;
+ }
+ }
+
+ /* Only fail for real update */
+ tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
+stop:
+ CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
+ dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
+ out_trans_stop(env, ta, rc);
+ ta->ta_handle = NULL;
+ ta->ta_argno = 0;
+ ta->ta_err = 0;
+
+ RETURN(rc);
+}
+
/**
* Object updates between Targets. Because all the updates has been
* dis-assemblied into object updates at sender side, so OUT will
/* Stop the current update transaction,
* create a new one */
rc = out_tx_end(env, ta);
- if (rc != 0)
+ if (rc < 0)
RETURN(rc);
rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
#define out_tx_destroy(info, obj, th, reply, idx) \
__out_tx_destroy(info, obj, th, reply, idx, __FILE__, __LINE__)
+#define out_tx_write(info, obj, buf, pos, th, reply, idx) \
+ __out_tx_write(info, obj, buf, pos, th, reply, idx, __FILE__, __LINE__)
+
extern struct page *tgt_page_to_corrupt;
struct tgt_thread_big_cache {
if (IS_ERR(th))
RETURN(PTR_ERR(th));
+ tti_buf_lcd(tti);
rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
- sizeof(struct lsd_client_data),
+ &tti->tti_buf,
ted->ted_lr_off, th);
if (rc)
GOTO(out, rc);
th->th_sync = sync;
+ tti_buf_lsd(tti);
rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
- sizeof(struct lr_server_data),
- tti->tti_off, th);
+ &tti->tti_buf, tti->tti_off, th);
if (rc)
GOTO(out, rc);
{
struct lu_target *tgt = cookie;
struct tgt_session_info *tsi;
+ struct tgt_thread_info *tti = tgt_th_info(env);
int rc;
/* if there is no session, then this transaction is not result of
if (tsi->tsi_exp == NULL)
return 0;
+ tti_buf_lcd(tti);
rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
- sizeof(struct lsd_client_data),
+ &tti->tti_buf,
tsi->tsi_exp->exp_target_data.ted_lr_off,
th);
if (rc)
return rc;
+ tti_buf_lsd(tti);
rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
- sizeof(struct lr_server_data), 0, th);
+ &tti->tti_buf, 0, th);
if (rc)
return rc;
nobase_noinst_SCRIPTS += racer/dir_create.sh racer/file_create.sh racer/file_list.sh
nobase_noinst_SCRIPTS += racer/file_rm.sh racer/racer.sh racer/file_concat.sh racer/file_exec.sh
nobase_noinst_SCRIPTS += racer/file_link.sh racer/file_rename.sh racer/file_symlink.sh
-nobase_noinst_SCRIPTS += racer/dir_remote.sh
+nobase_noinst_SCRIPTS += racer/dir_remote.sh racer/dir_migrate.sh
nobase_noinst_SCRIPTS += rmtacl/make-tree rmtacl/run
nobase_noinst_SCRIPTS += posix/posix.cfg
nobase_noinst_DATA = acl/cp.test acl/getfacl-noacl.test acl/inheritance.test
--- /dev/null
+#!/bin/bash
+
+DIR=$1
+MAX=$2
+
+MDTCOUNT=${MDSCOUNT:-$(lfs df $DIR 2> /dev/null | grep -c MDT)}
+while /bin/true ; do
+ migrate_dir=$((RANDOM % MAX))
+ file=$((RANDOM % MAX))
+ mdt_idx=$((RANDOM % MDTCOUNT))
+ mkdir -p $DIR/$migrate_dir 2> /dev/null
+ lfs mv -M$mdt_idx $DIR/$migrate_dir 2> /dev/null
+ touch $DIR/$migrate_dir/$file 2> /dev/null
+ $LFS getdirstripe $DIR/$migrate_dir > /dev/null 2>&1
+done
remote_dir=$((RANDOM % MAX))
file=$((RANDOM % MAX))
mdt_idx=$((RANDOM % MDTCOUNT))
- mkdir -p $DIR
- lfs mkdir -i$mdt_idx -c$MDTCOUNT $DIR/$remote_dir 2> /dev/null
- echo "abcd" > $DIR/$remote_dir/$file 2> /dev/null
- $LFS getdirstripe $DIR/$remote_dir 2> /dev/null
+ mkdir -p $DIR 2> /dev/null
+ $LFS mkdir -i$mdt_idx -c$MDTCOUNT $DIR/$remote_dir 2> /dev/null
+ touch $DIR/$remote_dir/$file 2> /dev/null
+ $LFS getdirstripe $DIR/$remote_dir > /dev/null 2>&1
done
file_list file_concat file_exec"
if [ $MDSCOUNT -gt 1 ]; then
- RACER_PROGS="${RACER_PROGS} dir_remote"
+ RACER_PROGS="${RACER_PROGS} dir_remote dir_migrate"
fi
racer_cleanup()
}
run_test 110f "remove remote directory: drop slave rep"
+test_110g () {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+ local remote_dir=$DIR/$tdir/remote_dir
+ local MDTIDX=1
+
+ mkdir -p $remote_dir
+
+ createmany -o $remote_dir/f 5000
+
+ #define OBD_FAIL_MIGRATE_NET_REP 0x1702
+ do_facet mds$MDTIDX lctl set_param fail_loc=0x1702
+ $LFS mv -M $MDTIDX $remote_dir || error "migrate failed"
+ do_facet mds$MDTIDX lctl set_param fail_loc=0x0
+
+ for file in $(find $remote_dir); do
+ mdt_index=$($LFS getstripe -M $file)
+ [ $mdt_index == $MDTIDX ] ||
+ error "$file is not on MDT${MDTIDX}"
+ done
+
+ rm -rf $DIR/$tdir || error "rmdir failed"
+}
+run_test 110g "drop reply during migration"
+
# LU-2844 mdt prepare fail should not cause umount oops
test_111 ()
{
. ${CONFIG:=$LUSTRE/tests/cfg/${NAME}.sh}
init_logging
-[ "$SLOW" = "no" ] && EXCEPT_SLOW="24o 27m 64b 68 71 77f 78 115 124b"
+[ "$SLOW" = "no" ] && EXCEPT_SLOW="24o 27m 64b 68 71 77f 78 115 124b 230d"
[ $(facet_fstype $SINGLEMDS) = "zfs" ] &&
# bug number for skipped test: LU-1593 LU-2610 LU-2833 LU-1957 LU-2805
error "create files under remote dir failed $i"
done
- check_fs_consistency_17n || error "e2fsck report error"
+ check_fs_consistency_17n ||
+ error "e2fsck report error after create files under remote dir"
for ((i=0;i<10;i++)); do
rm -rf $DIR/$tdir/remote_dir_${i} ||
error "destroy remote dir error $i"
done
- check_fs_consistency_17n || error "e2fsck report error"
+ check_fs_consistency_17n ||
+ error "e2fsck report error after unlink files under remote dir"
+
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.4.50) ] &&
+ skip "lustre < 2.4.50 does not support migrate mv " && return
+
+ for ((i=0; i<10; i++)); do
+ mkdir -p $DIR/$tdir/remote_dir_${i}
+ createmany -o $DIR/$tdir/remote_dir_${i}/f 10 ||
+ error "create files under remote dir failed $i"
+ $LFS mv -M 1 $DIR/$tdir/remote_dir_${i} ||
+ error "migrate remote dir error $i"
+ done
+ check_fs_consistency_17n || error "e2fsck report error after migration"
+
+ for ((i=0;i<10;i++)); do
+ rm -rf $DIR/$tdir/remote_dir_${i} ||
+ error "destroy remote dir error $i"
+ done
+
+ check_fs_consistency_17n || error "e2fsck report error after unlink"
}
run_test 17n "run e2fsck against master/slave MDT which contains remote dir"
}
run_test 230a "Create remote directory and files under the remote directory"
+test_230b() {
+ [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+ local MDTIDX=1
+ local mdt_index
+ local i
+ local file
+ local pid
+ local stripe_count
+ local migrate_dir=$DIR/$tdir/migrate_dir
+ local other_dir=$DIR/$tdir/other_dir
+
+ mkdir -p $migrate_dir
+ mkdir -p $other_dir
+ for ((i=0; i<10; i++)); do
+ mkdir -p $migrate_dir/dir_${i}
+ createmany -o $migrate_dir/dir_${i}/f 10 ||
+ error "create files under remote dir failed $i"
+ done
+
+ cp /etc/passwd $migrate_dir/$tfile
+ cp /etc/passwd $other_dir/$tfile
+ mkdir -p $migrate_dir/dir_default_stripe2
+ $LFS setstripe -c 2 $migrate_dir/dir_default_stripe2
+ $LFS setstripe -c 2 $migrate_dir/${tfile}_stripe2
+
+ mkdir -p $other_dir
+ ln $migrate_dir/$tfile $other_dir/luna
+ ln $migrate_dir/$tfile $migrate_dir/sofia
+ ln $other_dir/$tfile $migrate_dir/david
+ ln -s $migrate_dir/$tfile $other_dir/zachary
+ ln -s $migrate_dir/$tfile $migrate_dir/${tfile}_ln
+ ln -s $other_dir/$tfile $migrate_dir/${tfile}_ln_other
+
+ $LFS mv -v -M $MDTIDX $migrate_dir ||
+ error "migrate remote dir error"
+
+ echo "migratate to MDT1, then checking.."
+ for ((i=0; i<10; i++)); do
+ for file in $(find $migrate_dir/dir_${i}); do
+ mdt_index=$($LFS getstripe -M $file)
+ [ $mdt_index == $MDTIDX ] ||
+ error "$file is not on MDT${MDTIDX}"
+ done
+ done
+
+ # the multiple link file should still in MDT0
+ mdt_index=$($LFS getstripe -M $migrate_dir/$tfile)
+ [ $mdt_index == 0 ] ||
+ error "$file is not on MDT${MDTIDX}"
+
+ diff /etc/passwd $migrate_dir/$tfile ||
+ error "$tfile different after migration"
+
+ diff /etc/passwd $other_dir/luna ||
+ error "luna different after migration"
+
+ diff /etc/passwd $migrate_dir/sofia ||
+ error "sofia different after migration"
+
+ diff /etc/passwd $migrate_dir/david ||
+ error "david different after migration"
+
+ diff /etc/passwd $other_dir/zachary ||
+ error "zachary different after migration"
+
+ diff /etc/passwd $migrate_dir/${tfile}_ln ||
+ error "${tfile}_ln different after migration"
+
+ diff /etc/passwd $migrate_dir/${tfile}_ln_other ||
+ error "${tfile}_ln_other different after migration"
+
+ stripe_count=$($LFS getstripe -c $migrate_dir/dir_default_stripe2)
+ [ $stripe_count = 2 ] ||
+ error "dir strpe_count $d != 2 after migration."
+
+ stripe_count=$($LFS getstripe -c $migrate_dir/${tfile}_stripe2)
+ [ $stripe_count = 2 ] ||
+ error "file strpe_count $d != 2 after migration."
+
+ #migrate back to MDT0
+ MDTIDX=0
+ $LFS mv -v -M $MDTIDX $migrate_dir ||
+ error "migrate remote dir error"
+
+ echo "migrate back to MDT0, checking.."
+ for file in $(find $migrate_dir); do
+ mdt_index=$($LFS getstripe -M $file)
+ [ $mdt_index == $MDTIDX ] ||
+ error "$file is not on MDT${MDTIDX}"
+ done
+
+ diff /etc/passwd ${migrate_dir}/$tfile ||
+ error "$tfile different after migration"
+
+ diff /etc/passwd ${other_dir}/luna ||
+ error "luna different after migration"
+
+ diff /etc/passwd ${migrate_dir}/sofia ||
+ error "sofia different after migration"
+
+ diff /etc/passwd ${other_dir}/zachary ||
+ error "zachary different after migration"
+
+ diff /etc/passwd $migrate_dir/${tfile}_ln ||
+ error "${tfile}_ln different after migration"
+
+ diff /etc/passwd $migrate_dir/${tfile}_ln_other ||
+ error "${tfile}_ln_other different after migration"
+
+ stripe_count=$($LFS getstripe -c ${migrate_dir}/dir_default_stripe2)
+ [ $stripe_count = 2 ] ||
+ error "dir strpe_count $d != 2 after migration."
+
+ stripe_count=$($LFS getstripe -c ${migrate_dir}/${tfile}_stripe2)
+ [ $stripe_count = 2 ] ||
+ error "file strpe_count $d != 2 after migration."
+
+ rm -rf $DIR/$tdir || error "rm dir failed after migration"
+}
+run_test 230b "migrate directory"
+
+test_230c() {
+ [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+ local MDTIDX=1
+ local mdt_index
+ local file
+
+ #If migrating directory fails in the middle, all entries of
+ #the directory is still accessiable.
+ mkdir -p $DIR/$tdir
+ stat $DIR/$tdir
+ createmany -o $DIR/$tdir/f 10 ||
+ error "create files under ${tdir} failed"
+
+ #failed after migrating 5 entries
+ #OBD_FAIL_MIGRATE_ENTRIES 0x1801
+ do_facet mds1 lctl set_param fail_loc=0x20001801
+ do_facet mds1 lctl set_param fail_val=5
+ local t=`ls $DIR/$tdir | wc -l`
+ $LFS mv -M $MDTIDX $DIR/$tdir &&
+ error "migrate should failed after 5 entries"
+ local u=`ls $DIR/$tdir | wc -l`
+ [ "$u" == "$t" ] || error "$u != $t during migration"
+
+ for file in $(find $DIR/$tdir); do
+ stat $file || error "stat $file failed"
+ done
+
+ do_facet mds1 lctl set_param fail_loc=0
+ do_facet mds1 lctl set_param fail_val=0
+
+ $LFS mv -M $MDTIDX $DIR/$tdir ||
+ error "migrate open files should failed with open files"
+
+ echo "Finish migration, then checking.."
+ for file in $(find $DIR/$tdir); do
+ mdt_index=$($LFS getstripe -M $file)
+ [ $mdt_index == $MDTIDX ] ||
+ error "$file is not on MDT${MDTIDX}"
+ done
+
+ rm -rf $DIR/$tdir || error "rm dir failed after migration"
+}
+run_test 230c "check directory accessiblity if migration is failed"
+
+test_230d() {
+ [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+ local MDTIDX=1
+ local mdt_index
+ local i
+ local j
+
+ mkdir -p $DIR/$tdir
+
+ for ((i=0; i<100; i++)); do
+ mkdir -p $DIR/$tdir/dir_${i}
+ createmany -o $DIR/$tdir/dir_${i}/f 100 ||
+ error "create files under remote dir failed $i"
+ done
+
+ $LFS mv -M $MDTIDX -v $DIR/$tdir || error "migrate remote dir error"
+
+ echo "Finish migration, then checking.."
+ for file in $(find $DIR/$tdir); do
+ mdt_index=$($LFS getstripe -M $file)
+ [ $mdt_index == $MDTIDX ] ||
+ error "$file is not on MDT${MDTIDX}"
+ done
+
+ rm -rf $DIR/$tdir || error "rm dir failed after migration"
+}
+run_test 230d "check migrate big directory"
+
test_231a()
{
# For simplicity this test assumes that max_pages_per_rpc
}
run_test 76 "Verify open file for 2048 files"
+test_80() {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+ local MDTIDX=1
+ local mdt_index
+ local i
+ local file
+ local pid
+
+ mkdir -p $DIR1/$tdir/dir
+ createmany -o $DIR1/$tdir/dir/f 10 ||
+ error "create files under remote dir failed $i"
+
+ cp /etc/passwd $DIR1/$tdir/$tfile
+
+ #migrate open file should fails
+ multiop_bg_pause $DIR2/$tdir/$tfile O_c || error "open $file failed"
+ pid=$!
+ # give multiop a chance to open
+ sleep 1
+
+ $LFS mv -M $MDTIDX $DIR1/$tdir &&
+ error "migrate open files should failed with open files"
+
+ kill -USR1 $pid
+
+ $LFS mv -M $MDTIDX $DIR1/$tdir ||
+ error "migrate remote dir error"
+
+ echo "Finish migration, then checking.."
+ for file in $(find $DIR1/$tdir); do
+ mdt_index=$($LFS getstripe -M $file)
+ [ $mdt_index == $MDTIDX ] ||
+ error "$file is not on MDT${MDTIDX}"
+ done
+
+ diff /etc/passwd $DIR1/$tdir/$tfile ||
+ error "file different after migration"
+
+ rm -rf $DIR1/$tdir || error "rm dir failed after migration"
+}
+run_test 80 "migrate directory when some children is being opened"
+
log "cleanup: ======================================================"
[ "$(mount | grep $MOUNT2)" ] && umount $MOUNT2
static int lfs_hsm_remove(int argc, char **argv);
static int lfs_hsm_cancel(int argc, char **argv);
static int lfs_swap_layouts(int argc, char **argv);
+static int lfs_mv(int argc, char **argv);
#define SETSTRIPE_USAGE(_cmd, _tgt) \
"usage: "_cmd" [--stripe-count|-c <stripe_count>]\n"\
"usage: hsm_cancel [--filelist FILELIST] [--data DATA] <file> ..."},
{"swap_layouts", lfs_swap_layouts, 0, "Swap layouts between 2 files.\n"
"usage: swap_layouts <path1> <path2>"},
- {"migrate", lfs_setstripe, 0, "migrate file from one layout to "
+ {"migrate", lfs_setstripe, 0, "migrate file from one OST layout to "
"another (may be not safe with concurent writes).\n"
SETSTRIPE_USAGE("migrate ", "<filename>")},
- {"help", Parser_help, 0, "help"},
- {"exit", Parser_quit, 0, "quit"},
- {"quit", Parser_quit, 0, "quit"},
- { 0, 0, 0, NULL }
+ {"mv", lfs_mv, 0,
+ "To move directories between MDTs.\n"
+ "usage: mv <directory|filename> [--mdt-index|-M] <mdt_index> "
+ "[--verbose|-v]\n"},
+ {"help", Parser_help, 0, "help"},
+ {"exit", Parser_quit, 0, "quit"},
+ {"quit", Parser_quit, 0, "quit"},
+ { 0, 0, 0, NULL }
};
#define MIGRATION_BLOCKS 1
return result;
}
+static int lfs_mv(int argc, char **argv)
+{
+ struct find_param param = { .maxdepth = -1, .mdtindex = -1};
+ char *end;
+ int c;
+ int rc = 0;
+ struct option long_opts[] = {
+ {"--mdt-index", required_argument, 0, 'M'},
+ {"verbose", no_argument, 0, 'v'},
+ {0, 0, 0, 0}
+ };
+
+ while ((c = getopt_long(argc, argv, "M:v", long_opts, NULL)) != -1) {
+ switch (c) {
+ case 'M': {
+ param.mdtindex = strtoul(optarg, &end, 0);
+ if (*end != '\0') {
+ fprintf(stderr, "%s: invalid MDT index'%s'\n",
+ argv[0], optarg);
+ return CMD_HELP;
+ }
+ break;
+ }
+ case 'v': {
+ param.verbose = VERBOSE_DETAIL;
+ break;
+ }
+ default:
+ fprintf(stderr, "error: %s: unrecognized option '%s'\n",
+ argv[0], argv[optind - 1]);
+ return CMD_HELP;
+ }
+ }
+
+ if (param.mdtindex == -1) {
+ fprintf(stderr, "%s MDT index must be indicated\n", argv[0]);
+ return CMD_HELP;
+ }
+
+ if (optind >= argc) {
+ fprintf(stderr, "%s missing operand path\n", argv[0]);
+ return CMD_HELP;
+ }
+
+ param.migrate = 1;
+ rc = llapi_mv(argv[optind], ¶m);
+ if (rc != 0)
+ fprintf(stderr, "cannot migrate '%s' to MDT%04x: %s\n",
+ argv[optind], param.mdtindex, strerror(-rc));
+ return rc;
+}
+
static int lfs_osts(int argc, char **argv)
{
return lfs_tgts(argc, argv);
return rc;
}
-typedef int (semantic_func_t)(char *path, DIR *parent, DIR *d,
+typedef int (semantic_func_t)(char *path, DIR *parent, DIR **d,
void *data, struct dirent64 *de);
#define OBD_NOT_FOUND (-1)
param->got_uuids = 0;
param->obdindexes = NULL;
param->obdindex = OBD_NOT_FOUND;
- param->mdtindex = OBD_NOT_FOUND;
+ if (!param->migrate)
+ param->mdtindex = OBD_NOT_FOUND;
return 0;
}
free(param->fp_lmv_md);
}
-static int cb_common_fini(char *path, DIR *parent, DIR *d, void *data,
+static int cb_common_fini(char *path, DIR *parent, DIR **dirp, void *data,
struct dirent64 *de)
{
struct find_param *param = (struct find_param *)data;
}
}
- if (sem_init && (ret = sem_init(path, parent ?: p, d, data, de)))
- goto err;
+ if (sem_init && (ret = sem_init(path, parent ?: p, &d, data, de)))
+ goto err;
if (!d || (param->get_lmv && !param->recursive)) {
ret = 0;
out:
path[len] = 0;
- if (sem_fini)
- sem_fini(path, parent, d, data, de);
+ if (sem_fini)
+ sem_fini(path, parent, &d, data, de);
err:
if (d)
closedir(d);
return ret;
}
-static int cb_find_init(char *path, DIR *parent, DIR *dir,
+static int cb_find_init(char *path, DIR *parent, DIR **dirp,
void *data, struct dirent64 *de)
{
struct find_param *param = (struct find_param *)data;
+ DIR *dir = dirp == NULL ? NULL : *dirp;
int decision = 1; /* 1 is accepted; -1 is rejected. */
lstat_t *st = ¶m->lmd->lmd_st;
int lustre_fs = 1;
return 0;
}
+static int cb_mv_init(char *path, DIR *parent, DIR **dirp,
+ void *param_data, struct dirent64 *de)
+{
+ struct find_param *param = (struct find_param *)param_data;
+ DIR *dir = parent;
+ char raw[OBD_MAX_IOCTL_BUFFER] = {'\0'};
+ char *rawbuf = raw;
+ struct obd_ioctl_data data = { 0 };
+ int fd;
+ int ret;
+ char *filename;
+
+ LASSERT(parent != NULL || dirp != NULL);
+ if (dirp != NULL)
+ closedir(*dirp);
+
+ if (parent == NULL) {
+ dir = opendir_parent(path);
+ if (dir == NULL) {
+ ret = -errno;
+ fprintf(stderr, "can not open %s ret %d\n",
+ path, ret);
+ return ret;
+ }
+ }
+
+ fd = dirfd(dir);
+
+ filename = basename(path);
+ data.ioc_inlbuf1 = (char *)filename;
+ data.ioc_inllen1 = strlen(filename) + 1;
+ data.ioc_inlbuf2 = (char *)¶m->mdtindex;
+ data.ioc_inllen2 = sizeof(param->mdtindex);
+ ret = obd_ioctl_pack(&data, &rawbuf, sizeof(raw));
+ if (ret != 0) {
+ llapi_error(LLAPI_MSG_ERROR, ret,
+ "llapi_obd_statfs: error packing ioctl data");
+ goto out;
+ }
+
+ ret = ioctl(fd, LL_IOC_MIGRATE, rawbuf);
+ if (ret != 0) {
+ ret = -errno;
+ fprintf(stderr, "%s migrate failed %d\n", path, ret);
+ goto out;
+ } else if (param->verbose & VERBOSE_DETAIL) {
+ fprintf(stdout, "migrate %s to MDT%d\n", path, param->mdtindex);
+ }
+
+out:
+ if (dirp != NULL) {
+ /* If the directory is being migration, we need
+ * close the directory after migration,
+ * so the old directory cache will be cleanup
+ * on the client side, and re-open to get the
+ * new directory handle */
+ *dirp = opendir(path);
+ if (dirp == NULL) {
+ ret = -errno;
+ llapi_error(LLAPI_MSG_ERROR, ret,
+ "%s: Failed to open '%s'", __func__, path);
+ return ret;
+ }
+ }
+
+ if (parent == NULL)
+ closedir(dir);
+
+ return ret;
+}
+
+int llapi_mv(char *path, struct find_param *param)
+{
+ return param_callback(path, cb_mv_init, cb_common_fini, param);
+}
+
int llapi_find(char *path, struct find_param *param)
{
return param_callback(path, cb_find_init, cb_common_fini, param);
return 0;
}
-static int cb_get_mdt_index(char *path, DIR *parent, DIR *d, void *data,
+static int cb_get_mdt_index(char *path, DIR *parent, DIR **dirp, void *data,
struct dirent64 *de)
{
struct find_param *param = (struct find_param *)data;
+ DIR *d = dirp == NULL ? NULL : *dirp;
int ret = 0;
int mdtidx;
return 0;
}
-static int cb_getstripe(char *path, DIR *parent, DIR *d, void *data,
+static int cb_getstripe(char *path, DIR *parent, DIR **dirp, void *data,
struct dirent64 *de)
{
struct find_param *param = (struct find_param *)data;
+ DIR *d = dirp == NULL ? NULL : *dirp;
int ret = 0;
LASSERT(parent != NULL || d != NULL);
return rc;
}
-static int cb_quotachown(char *path, DIR *parent, DIR *d, void *data,
+static int cb_quotachown(char *path, DIR *parent, DIR **dirp, void *data,
struct dirent64 *de)
{
struct find_param *param = (struct find_param *)data;
+ DIR *d = dirp == NULL ? NULL : *dirp;
lstat_t *st;
int rc;
CHECK_VALUE(REINT_OPEN);
CHECK_VALUE(REINT_SETXATTR);
CHECK_VALUE(REINT_RMENTRY);
+ CHECK_VALUE(REINT_MIGRATE);
CHECK_VALUE(REINT_MAX);
CHECK_VALUE_X(DISP_IT_EXECD);
(long long)REINT_SETXATTR);
LASSERTF(REINT_RMENTRY == 8, "found %lld\n",
(long long)REINT_RMENTRY);
- LASSERTF(REINT_MAX == 9, "found %lld\n",
+ LASSERTF(REINT_MIGRATE == 9, "found %lld\n",
+ (long long)REINT_MIGRATE);
+ LASSERTF(REINT_MAX == 10, "found %lld\n",
(long long)REINT_MAX);
LASSERTF(DISP_IT_EXECD == 0x00000001UL, "found 0x%.8xUL\n",
(unsigned)DISP_IT_EXECD);