Whamcloud - gitweb
LU-10420 flr: split a mirror from mirrored file 88/30388/18
authorBobi Jam <bobijam.xu@intel.com>
Wed, 6 Dec 2017 04:12:26 +0000 (12:12 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 9 Feb 2018 05:57:29 +0000 (05:57 +0000)
Splits a mirror with mirror_id out of a mirrored file.

Signed-off-by: Bobi Jam <bobijam.xu@intel.com>
Change-Id: Ib9c2ca7deb329ba0f95880ebeee77563317d0fca
Reviewed-on: https://review.whamcloud.com/30388
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
18 files changed:
lustre/include/lu_object.h
lustre/include/lustre/lustreapi.h
lustre/include/lustre_mds.h
lustre/include/obd.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/llite/file.c
lustre/llite/llite_lib.c
lustre/lod/lod_object.c
lustre/mdc/mdc_lib.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_open.c
lustre/tests/sanity-flr.sh
lustre/utils/lfs.c
lustre/utils/liblustreapi.c
lustre/utils/liblustreapi_layout.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 63b92ef..766bf84 100644 (file)
@@ -909,6 +909,7 @@ enum lu_xattr_flags {
        LU_XATTR_REPLACE = (1 << 0),
        LU_XATTR_CREATE  = (1 << 1),
        LU_XATTR_MERGE   = (1 << 2),
        LU_XATTR_REPLACE = (1 << 0),
        LU_XATTR_CREATE  = (1 << 1),
        LU_XATTR_MERGE   = (1 << 2),
+       LU_XATTR_SPLIT   = (1 << 3),
 };
 
 /** @} helpers */
 };
 
 /** @} helpers */
index 0a8d1c0..d96c85a 100644 (file)
@@ -166,13 +166,15 @@ void llapi_clear_command_name(void);
 #define VERBOSE_DFID           0x4000
 #define VERBOSE_HASH_TYPE      0x8000
 #define VERBOSE_MIRROR_COUNT   0x10000
 #define VERBOSE_DFID           0x4000
 #define VERBOSE_HASH_TYPE      0x8000
 #define VERBOSE_MIRROR_COUNT   0x10000
+#define VERBOSE_MIRROR_ID      0x20000
 #define VERBOSE_DEFAULT                (VERBOSE_COUNT | VERBOSE_SIZE | \
                                 VERBOSE_OFFSET | VERBOSE_POOL | \
                                 VERBOSE_OBJID | VERBOSE_GENERATION | \
                                 VERBOSE_LAYOUT | VERBOSE_HASH_TYPE | \
                                 VERBOSE_COMP_COUNT | VERBOSE_COMP_FLAGS | \
                                 VERBOSE_COMP_START | VERBOSE_COMP_END | \
 #define VERBOSE_DEFAULT                (VERBOSE_COUNT | VERBOSE_SIZE | \
                                 VERBOSE_OFFSET | VERBOSE_POOL | \
                                 VERBOSE_OBJID | VERBOSE_GENERATION | \
                                 VERBOSE_LAYOUT | VERBOSE_HASH_TYPE | \
                                 VERBOSE_COMP_COUNT | VERBOSE_COMP_FLAGS | \
                                 VERBOSE_COMP_START | VERBOSE_COMP_END | \
-                                VERBOSE_COMP_ID | VERBOSE_MIRROR_COUNT)
+                                VERBOSE_COMP_ID | VERBOSE_MIRROR_COUNT| \
+                                VERBOSE_MIRROR_ID)
 
 struct find_param {
        unsigned int             fp_max_depth;
 
 struct find_param {
        unsigned int             fp_max_depth;
index ede98c7..32ab1ed 100644 (file)
@@ -60,6 +60,11 @@ struct mds_capa_info {
         struct lustre_capa_key *capa;
 };
 
         struct lustre_capa_key *capa;
 };
 
+struct md_rejig_data {
+       struct md_object        *mrd_obj;
+       __u16                   mrd_mirror_id;
+};
+
 #define MDD_OBD_NAME     "mdd_obd"
 #define MDD_OBD_UUID     "mdd_obd_uuid"
 
 #define MDD_OBD_NAME     "mdd_obd"
 #define MDD_OBD_UUID     "mdd_obd_uuid"
 
index 6dc319d..42fba9a 100644 (file)
@@ -863,6 +863,7 @@ struct md_op_data {
        /* Used by readdir */
        unsigned int            op_max_pages;
 
        /* Used by readdir */
        unsigned int            op_max_pages;
 
+       __u16                   op_mirror_id;
 };
 
 struct md_callback {
 };
 
 struct md_callback {
index a158f70..27734b4 100644 (file)
@@ -1843,10 +1843,12 @@ enum mds_op_bias {
        MDS_CLOSE_LAYOUT_SWAP   = 1 << 14,
        MDS_CLOSE_LAYOUT_MERGE  = 1 << 15,
        MDS_CLOSE_RESYNC_DONE   = 1 << 16,
        MDS_CLOSE_LAYOUT_SWAP   = 1 << 14,
        MDS_CLOSE_LAYOUT_MERGE  = 1 << 15,
        MDS_CLOSE_RESYNC_DONE   = 1 << 16,
+       MDS_CLOSE_LAYOUT_SPLIT  = 1 << 17,
 };
 
 };
 
-#define MDS_CLOSE_INTENT (MDS_HSM_RELEASE | MDS_CLOSE_LAYOUT_SWAP |    \
-                         MDS_CLOSE_LAYOUT_MERGE | MDS_CLOSE_RESYNC_DONE)
+#define MDS_CLOSE_INTENT (MDS_HSM_RELEASE | MDS_CLOSE_LAYOUT_SWAP |         \
+                         MDS_CLOSE_LAYOUT_MERGE | MDS_CLOSE_LAYOUT_SPLIT | \
+                         MDS_CLOSE_RESYNC_DONE)
 
 /* instance of mdt_reint_rec */
 struct mdt_rec_create {
 
 /* instance of mdt_reint_rec */
 struct mdt_rec_create {
@@ -3402,6 +3404,8 @@ struct close_data {
        union {
                __u64                           cd_reserved[8];
                struct close_data_resync_done   cd_resync;
        union {
                __u64                           cd_reserved[8];
                struct close_data_resync_done   cd_resync;
+               /* split close */
+               __u16                           cd_mirror_id;
        };
 };
 
        };
 };
 
index 4aa1147..59956dc 100644 (file)
@@ -336,6 +336,7 @@ enum ll_lease_flags {
        LL_LEASE_RESYNC         = 0x1,
        LL_LEASE_RESYNC_DONE    = 0x2,
        LL_LEASE_LAYOUT_MERGE   = 0x4,
        LL_LEASE_RESYNC         = 0x1,
        LL_LEASE_RESYNC_DONE    = 0x2,
        LL_LEASE_LAYOUT_MERGE   = 0x4,
+       LL_LEASE_LAYOUT_SPLIT   = 0x8,
 };
 
 #define IOC_IDS_MAX    4096
 };
 
 #define IOC_IDS_MAX    4096
@@ -886,7 +887,7 @@ struct identity_downcall_data {
 #endif /* !__KERNEL__ */
 
 /* lustre volatile file support
 #endif /* !__KERNEL__ */
 
 /* lustre volatile file support
- * file name header: .^L^S^T^R:volatile"
+ * file name header: ".^L^S^T^R:volatile"
  */
 #define LUSTRE_VOLATILE_HDR    ".\x0c\x13\x14\x12:VOLATILE"
 #define LUSTRE_VOLATILE_HDR_LEN        14
  */
 #define LUSTRE_VOLATILE_HDR    ".\x0c\x13\x14\x12:VOLATILE"
 #define LUSTRE_VOLATILE_HDR_LEN        14
index 626615f..eb18c91 100644 (file)
 #include "llite_internal.h"
 #include "vvp_internal.h"
 
 #include "llite_internal.h"
 #include "vvp_internal.h"
 
+struct split_param {
+       struct inode    *sp_inode;
+       __u16           sp_mirror_id;
+};
+
 static int
 ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
 
 static int
 ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
 
@@ -148,13 +153,22 @@ static int ll_close_inode_openhandle(struct inode *inode,
                /* merge blocks from the victim inode */
                op_data->op_attr_blocks += ((struct inode *)data)->i_blocks;
                op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
                /* merge blocks from the victim inode */
                op_data->op_attr_blocks += ((struct inode *)data)->i_blocks;
                op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
-       case MDS_CLOSE_LAYOUT_SWAP:
+       case MDS_CLOSE_LAYOUT_SPLIT:
+       case MDS_CLOSE_LAYOUT_SWAP: {
+               struct split_param *sp = data;
+
                LASSERT(data != NULL);
                op_data->op_bias |= bias;
                op_data->op_data_version = 0;
                op_data->op_lease_handle = och->och_lease_handle;
                LASSERT(data != NULL);
                op_data->op_bias |= bias;
                op_data->op_data_version = 0;
                op_data->op_lease_handle = och->och_lease_handle;
-               op_data->op_fid2 = *ll_inode2fid(data);
+               if (bias == MDS_CLOSE_LAYOUT_SPLIT) {
+                       op_data->op_fid2 = *ll_inode2fid(sp->sp_inode);
+                       op_data->op_mirror_id = sp->sp_mirror_id;
+               } else {
+                       op_data->op_fid2 = *ll_inode2fid(data);
+               }
                break;
                break;
+       }
 
        case MDS_CLOSE_RESYNC_DONE: {
                struct ll_ioc_lease *ioc = data;
 
        case MDS_CLOSE_RESYNC_DONE: {
                struct ll_ioc_lease *ioc = data;
@@ -2885,6 +2899,7 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
        struct ll_file_data     *fd = LUSTRE_FPRIVATE(file);
        struct ll_inode_info    *lli = ll_i2info(inode);
        struct obd_client_handle *och = NULL;
        struct ll_file_data     *fd = LUSTRE_FPRIVATE(file);
        struct ll_inode_info    *lli = ll_i2info(inode);
        struct obd_client_handle *och = NULL;
+       struct split_param sp;
        bool lease_broken;
        fmode_t fmode = 0;
        enum mds_op_bias bias = 0;
        bool lease_broken;
        fmode_t fmode = 0;
        enum mds_op_bias bias = 0;
@@ -2943,6 +2958,32 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
                bias = MDS_CLOSE_LAYOUT_MERGE;
                break;
        }
                bias = MDS_CLOSE_LAYOUT_MERGE;
                break;
        }
+       case LL_LEASE_LAYOUT_SPLIT: {
+               int fdv;
+               int mirror_id;
+
+               if (ioc->lil_count != 2)
+                       GOTO(out, rc = -EINVAL);
+
+               arg += sizeof(*ioc);
+               if (copy_from_user(&fdv, (void __user *)arg, sizeof(__u32)))
+                       GOTO(out, rc = -EFAULT);
+
+               arg += sizeof(__u32);
+               if (copy_from_user(&mirror_id, (void __user *)arg,
+                                  sizeof(__u32)))
+                       GOTO(out, rc = -EFAULT);
+
+               layout_file = fget(fdv);
+               if (!layout_file)
+                       GOTO(out, rc = -EBADF);
+
+               sp.sp_inode = file_inode(layout_file);
+               sp.sp_mirror_id = (__u16)mirror_id;
+               data = &sp;
+               bias = MDS_CLOSE_LAYOUT_SPLIT;
+               break;
+       }
        default:
                /* without close intent */
                break;
        default:
                /* without close intent */
                break;
@@ -2967,6 +3008,7 @@ out:
                        OBD_FREE(data, data_size);
                break;
        case LL_LEASE_LAYOUT_MERGE:
                        OBD_FREE(data, data_size);
                break;
        case LL_LEASE_LAYOUT_MERGE:
+       case LL_LEASE_LAYOUT_SPLIT:
                if (layout_file)
                        fput(layout_file);
                break;
                if (layout_file)
                        fput(layout_file);
                break;
index e9924ce..ed9f4ad 100644 (file)
@@ -2522,11 +2522,10 @@ struct md_op_data *ll_prep_md_op_data(struct md_op_data *op_data,
        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
        op_data->op_cap = cfs_curproc_cap_pack();
        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
        op_data->op_cap = cfs_curproc_cap_pack();
+       op_data->op_mds = 0;
        if ((opc == LUSTRE_OPC_CREATE) && (name != NULL) &&
             filename_is_volatile(name, namelen, &op_data->op_mds)) {
                op_data->op_bias |= MDS_CREATE_VOLATILE;
        if ((opc == LUSTRE_OPC_CREATE) && (name != NULL) &&
             filename_is_volatile(name, namelen, &op_data->op_mds)) {
                op_data->op_bias |= MDS_CREATE_VOLATILE;
-       } else {
-               op_data->op_mds = 0;
        }
        op_data->op_data = data;
 
        }
        op_data->op_data = data;
 
index 38918d3..acfdd01 100644 (file)
@@ -2931,6 +2931,31 @@ out:
 }
 
 /**
 }
 
 /**
+ * Split layouts, just set the LOVEA with the layout from mbuf.
+ */
+static int lod_declare_layout_split(const struct lu_env *env,
+               struct dt_object *dt, const struct lu_buf *mbuf,
+               struct thandle *th)
+{
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lov_comp_md_v1 *lcm = mbuf->lb_buf;
+       int rc;
+       ENTRY;
+
+       lod_obj_inc_layout_gen(lo);
+       lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen);
+
+       lod_object_free_striping(env, lo);
+       rc = lod_parse_striping(env, lo, mbuf);
+       if (rc)
+               RETURN(rc);
+
+       rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf,
+                                      XATTR_NAME_LOV, LU_XATTR_REPLACE, th);
+       RETURN(rc);
+}
+
+/**
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
  * \see dt_object_operations::do_declare_xattr_set() in the API description
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
  * \see dt_object_operations::do_declare_xattr_set() in the API description
@@ -2954,7 +2979,7 @@ static int lod_declare_xattr_set(const struct lu_env *env,
 
        mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
        if ((S_ISREG(mode) || mode == 0) &&
 
        mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
        if ((S_ISREG(mode) || mode == 0) &&
-           !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE)) &&
+           !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) &&
            (strcmp(name, XATTR_NAME_LOV) == 0 ||
             strcmp(name, XATTR_LUSTRE_LOV) == 0)) {
                /*
            (strcmp(name, XATTR_NAME_LOV) == 0 ||
             strcmp(name, XATTR_LUSTRE_LOV) == 0)) {
                /*
@@ -2980,6 +3005,10 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
                        strcmp(name, XATTR_LUSTRE_LOV) == 0);
                rc = lod_declare_layout_merge(env, dt, buf, th);
                LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
                        strcmp(name, XATTR_LUSTRE_LOV) == 0);
                rc = lod_declare_layout_merge(env, dt, buf, th);
+       } else if (fl & LU_XATTR_SPLIT) {
+               LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
+                       strcmp(name, XATTR_LUSTRE_LOV) == 0);
+               rc = lod_declare_layout_split(env, dt, buf, th);
        } else if (S_ISREG(mode) &&
                   strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 &&
                   strncmp(name, XATTR_LUSTRE_LOV,
        } else if (S_ISREG(mode) &&
                   strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 &&
                   strncmp(name, XATTR_LUSTRE_LOV,
index 27e7da3..79cca6d 100644 (file)
@@ -460,7 +460,9 @@ static void mdc_close_intent_pack(struct ptlrpc_request *req,
        data->cd_data_version = op_data->op_data_version;
        data->cd_fid = op_data->op_fid2;
 
        data->cd_data_version = op_data->op_data_version;
        data->cd_fid = op_data->op_fid2;
 
-       if (bias & MDS_CLOSE_RESYNC_DONE) {
+       if (bias & MDS_CLOSE_LAYOUT_SPLIT) {
+               data->cd_mirror_id = op_data->op_mirror_id;
+       } else if (bias & MDS_CLOSE_RESYNC_DONE) {
                struct close_data_resync_done *sync = &data->cd_resync;
 
                CLASSERT(sizeof(data->cd_resync) <= sizeof(data->cd_reserved));
                struct close_data_resync_done *sync = &data->cd_resync;
 
                CLASSERT(sizeof(data->cd_resync) <= sizeof(data->cd_reserved));
index b2a6d5c..17642ca 100644 (file)
@@ -1261,6 +1261,238 @@ out:
        return rc;
 }
 
        return rc;
 }
 
+/**
+ * Extract the mirror with specified mirror id, and store the splitted
+ * mirror layout to @buf.
+ *
+ * \param[in] comp_v1  mirrored layout
+ * \param[in] mirror_id        the mirror with mirror_id to be extracted
+ * \param[out] buf     store the layout excluding the extracted mirror,
+ *                     caller free the buffer we allocated in this function
+ * \param[out] buf_vic store the extracted layout, caller free the buffer
+ *                     we allocated in this function
+ *
+ * \retval     0 on success; < 0 if error happens
+ */
+static int mdd_split_ea(struct lov_comp_md_v1 *comp_v1, __u16 mirror_id,
+                       struct lu_buf *buf, struct lu_buf *buf_vic)
+{
+       struct lov_comp_md_v1 *comp_rem;
+       struct lov_comp_md_v1 *comp_vic;
+       struct lov_comp_md_entry_v1 *entry;
+       struct lov_comp_md_entry_v1 *entry_rem;
+       struct lov_comp_md_entry_v1 *entry_vic;
+       __u16 mirror_cnt;
+       __u16 comp_cnt, count = 0;
+       int lmm_size, lmm_size_vic = 0;
+       int i, j, k;
+       int offset, offset_rem, offset_vic;
+
+       mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1;
+       /* comp_v1 should contains more than 1 mirror */
+       if (mirror_cnt <= 1)
+               return -EINVAL;
+       comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count);
+       lmm_size = le32_to_cpu(comp_v1->lcm_size);
+
+       for (i = 0; i < comp_cnt; i++) {
+               entry = &comp_v1->lcm_entries[i];
+               if (mirror_id_of(le32_to_cpu(entry->lcme_id)) == mirror_id) {
+                       count++;
+                       lmm_size_vic += sizeof(*entry);
+                       lmm_size_vic += le32_to_cpu(entry->lcme_size);
+               } else if (count > 0) {
+                       /* find the specified mirror */
+                       break;
+               }
+       }
+
+       if (count == 0)
+               return -EINVAL;
+
+       lu_buf_alloc(buf, lmm_size - lmm_size_vic);
+       if (!buf->lb_buf)
+               return -ENOMEM;
+
+       lu_buf_alloc(buf_vic, sizeof(*comp_vic) + lmm_size_vic);
+       if (!buf_vic->lb_buf) {
+               lu_buf_free(buf);
+               return -ENOMEM;
+       }
+
+       comp_rem = (struct lov_comp_md_v1 *)buf->lb_buf;
+       comp_vic = (struct lov_comp_md_v1 *)buf_vic->lb_buf;
+
+       memcpy(comp_rem, comp_v1, sizeof(*comp_v1));
+       comp_rem->lcm_mirror_count = cpu_to_le16(mirror_cnt - 2);
+       comp_rem->lcm_entry_count = cpu_to_le32(comp_cnt - count);
+       comp_rem->lcm_size = cpu_to_le32(lmm_size - lmm_size_vic);
+       if (!comp_rem->lcm_mirror_count)
+               comp_rem->lcm_flags = cpu_to_le16(LCM_FL_NOT_FLR);
+
+       memset(comp_vic, 0, sizeof(*comp_v1));
+       comp_vic->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1);
+       comp_vic->lcm_mirror_count = 0;
+       comp_vic->lcm_entry_count = cpu_to_le32(count);
+       comp_vic->lcm_size = cpu_to_le32(lmm_size_vic + sizeof(*comp_vic));
+       comp_vic->lcm_flags = cpu_to_le16(LCM_FL_NOT_FLR);
+       comp_vic->lcm_layout_gen = 0;
+
+       offset = sizeof(*comp_v1) + sizeof(*entry) * comp_cnt;
+       offset_rem = sizeof(*comp_rem) +
+                    sizeof(*entry_rem) * (comp_cnt - count);
+       offset_vic = sizeof(*comp_vic) + sizeof(*entry_vic) * count;
+       for (i = j = k = 0; i < comp_cnt; i++) {
+               struct lov_mds_md *lmm, *lmm_dst;
+               bool vic = false;
+
+               entry = &comp_v1->lcm_entries[i];
+               entry_vic = &comp_vic->lcm_entries[j];
+               entry_rem = &comp_rem->lcm_entries[k];
+
+               if (mirror_id_of(le32_to_cpu(entry->lcme_id)) == mirror_id)
+                       vic = true;
+
+               /* copy component entry */
+               if (vic) {
+                       memcpy(entry_vic, entry, sizeof(*entry));
+                       entry_vic->lcme_flags &= cpu_to_le32(LCME_FL_INIT);
+                       entry_vic->lcme_offset = cpu_to_le32(offset_vic);
+                       j++;
+               } else {
+                       memcpy(entry_rem, entry, sizeof(*entry));
+                       entry_rem->lcme_offset = cpu_to_le32(offset_rem);
+                       k++;
+               }
+
+               lmm = (struct lov_mds_md *)((char *)comp_v1 + offset);
+               if (vic)
+                       lmm_dst = (struct lov_mds_md *)
+                                       ((char *)comp_vic + offset_vic);
+               else
+                       lmm_dst = (struct lov_mds_md *)
+                                       ((char *)comp_rem + offset_rem);
+
+               /* copy component entry blob */
+               memcpy(lmm_dst, lmm, le32_to_cpu(entry->lcme_size));
+
+               /* blob offset advance */
+               offset += le32_to_cpu(entry->lcme_size);
+               if (vic)
+                       offset_vic += le32_to_cpu(entry->lcme_size);
+               else
+                       offset_rem += le32_to_cpu(entry->lcme_size);
+       }
+
+       return 0;
+}
+
+static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
+                          struct md_rejig_data *mrd)
+{
+       struct mdd_device *mdd = mdo2mdd(md_obj);
+       struct mdd_object *obj = md2mdd_obj(md_obj);
+       struct mdd_object *vic = md2mdd_obj(mrd->mrd_obj);
+       struct lu_buf *buf = &mdd_env_info(env)->mti_buf[0];
+       struct lu_buf *buf_save = &mdd_env_info(env)->mti_buf[1];
+       struct lu_buf *buf_vic = &mdd_env_info(env)->mti_buf[2];
+       struct lov_comp_md_v1 *lcm;
+       struct thandle *handle;
+       int rc;
+       ENTRY;
+
+       rc = lu_fid_cmp(mdo2fid(obj), mdo2fid(vic));
+       if (rc == 0) /* same fid */
+               RETURN(-EPERM);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       if (rc > 0) {
+               mdd_write_lock(env, obj, MOR_TGT_CHILD);
+               mdd_write_lock(env, vic, MOR_TGT_CHILD);
+       } else {
+               mdd_write_lock(env, vic, MOR_TGT_CHILD);
+               mdd_write_lock(env, obj, MOR_TGT_CHILD);
+       }
+
+       /* get EA of mirrored file */
+       memset(buf_save, 0, sizeof(*buf));
+       rc = mdd_get_lov_ea(env, obj, buf_save);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       lcm = buf_save->lb_buf;
+       if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
+               GOTO(out, rc = -EINVAL);
+
+       /**
+        * Extract the mirror with specified mirror id, and store the splitted
+        * mirror layout to the victim file.
+        */
+       memset(buf, 0, sizeof(*buf));
+       memset(buf_vic, 0, sizeof(*buf_vic));
+       rc = mdd_split_ea(lcm, mrd->mrd_mirror_id, buf, buf_vic);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
+                                  LU_XATTR_SPLIT, handle);
+       if (rc)
+               GOTO(out, rc);
+       rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic, XATTR_NAME_LOV,
+                                  LU_XATTR_SPLIT, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE,
+                          handle);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdo_xattr_set(env, vic, buf_vic, XATTR_NAME_LOV, LU_XATTR_CREATE,
+                          handle);
+       if (rc)
+               GOTO(out_restore, rc);
+
+       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic, handle);
+       if (rc)
+               GOTO(out, rc);
+       EXIT;
+
+out_restore:
+       if (rc) {
+               /* restore obj's layout */
+               int rc2 = mdo_xattr_set(env, obj, buf_save, XATTR_NAME_LOV,
+                                       LU_XATTR_REPLACE, handle);
+               if (rc2)
+                       CERROR("%s: failed to rollback of layout of: "DFID
+                              ": %d, file state unkonwn.\n",
+                              mdd_obj_dev_name(obj), PFID(mdo2fid(obj)), rc2);
+       }
+out:
+       mdd_trans_stop(env, mdd, rc, handle);
+       mdd_write_unlock(env, obj);
+       mdd_write_unlock(env, vic);
+       lu_buf_free(buf_save);
+       lu_buf_free(buf);
+       lu_buf_free(buf_vic);
+
+       if (!rc)
+               (void) mdd_object_pfid_replace(env, obj);
+
+       return rc;
+}
+
 static int mdd_layout_merge_allowed(const struct lu_env *env,
                                    struct md_object *target,
                                    struct md_object *victim)
 static int mdd_layout_merge_allowed(const struct lu_env *env,
                                    struct md_object *target,
                                    struct md_object *victim)
@@ -1302,18 +1534,23 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
        if (rc)
                RETURN(rc);
 
        if (rc)
                RETURN(rc);
 
-       if (strcmp(name, XATTR_LUSTRE_LOV) == 0 && fl == LU_XATTR_MERGE) {
-               struct md_object *victim = buf->lb_buf;
+       if (strcmp(name, XATTR_LUSTRE_LOV) == 0 &&
+           (fl == LU_XATTR_MERGE || fl == LU_XATTR_SPLIT)) {
+               struct md_rejig_data *mrd = buf->lb_buf;
+               struct md_object *victim = mrd->mrd_obj;
 
 
-               if (buf->lb_len != sizeof(victim))
+               if (buf->lb_len != sizeof(*mrd))
                        RETURN(-EINVAL);
 
                rc = mdd_layout_merge_allowed(env, obj, victim);
                if (rc)
                        RETURN(rc);
 
                        RETURN(-EINVAL);
 
                rc = mdd_layout_merge_allowed(env, obj, victim);
                if (rc)
                        RETURN(rc);
 
-               /* merge layout of victim as a mirror of obj's. */
-               rc = mdd_xattr_merge(env, obj, victim);
+               if (fl == LU_XATTR_MERGE)
+                       /* merge layout of victim as a mirror of obj's. */
+                       rc = mdd_xattr_merge(env, obj, victim);
+               else
+                       rc = mdd_xattr_split(env, obj, mrd);
                RETURN(rc);
        }
 
                RETURN(rc);
        }
 
index e67f496..f477d99 100644 (file)
@@ -1994,13 +1994,21 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
        if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
                rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
                                     mdt_object_child(o2), 0);
        if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
                rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
                                     mdt_object_child(o2), 0);
-       } else if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_MERGE) {
+       } else if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_MERGE ||
+                  ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT) {
                struct lu_buf *buf = &info->mti_buf;
                struct lu_buf *buf = &info->mti_buf;
+               struct md_rejig_data mrd;
 
 
-               buf->lb_len = sizeof(void *);
-               buf->lb_buf = mdt_object_child(o == o1 ? o2 : o1);
+               mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
+               if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)
+                       mrd.mrd_mirror_id = data->cd_mirror_id;
+
+               buf->lb_len = sizeof(mrd);
+               buf->lb_buf = &mrd;
                rc = mo_xattr_set(info->mti_env, mdt_object_child(o), buf,
                rc = mo_xattr_set(info->mti_env, mdt_object_child(o), buf,
-                                 XATTR_LUSTRE_LOV, LU_XATTR_MERGE);
+                                 XATTR_LUSTRE_LOV,
+                                 ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT ?
+                                 LU_XATTR_SPLIT : LU_XATTR_MERGE);
                if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS)) {
                        int rc2;
 
                if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS)) {
                        int rc2;
 
@@ -2202,6 +2210,7 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
                break;
        }
        case MDS_CLOSE_LAYOUT_MERGE:
                break;
        }
        case MDS_CLOSE_LAYOUT_MERGE:
+       case MDS_CLOSE_LAYOUT_SPLIT:
        case MDS_CLOSE_LAYOUT_SWAP: {
                rc = mdt_close_handle_layouts(info, o, ma);
                if (rc < 0) {
        case MDS_CLOSE_LAYOUT_SWAP: {
                rc = mdt_close_handle_layouts(info, o, ma);
                if (rc < 0) {
index 8164ee6..5eaf52c 100644 (file)
@@ -1517,6 +1517,67 @@ test_42() {
 }
 run_test 42 "lfs mirror verify"
 
 }
 run_test 42 "lfs mirror verify"
 
+test_44() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       rm -rf $DIR/$tdir
+       rm -rf $DIR/$tdir-1
+       local tf=$DIR/$tdir/$tfile
+       local tf1=$DIR/$tdir-1/$tfile-1
+
+       $LFS setdirstripe -i 0 -c 1 $DIR/$tdir ||
+               error "create directory failed"
+       $LFS setdirstripe -i 1 -c 1 $DIR/$tdir-1 ||
+               error "create remote directory failed"
+       rm -f $tf $tf1 $tf.mirror~2
+       # create file with 4 mirrors
+       $LFS mirror create -N -E2m -E4m -E-1 -N -E1m -E2m -E3m -E-1 -N2 $tf ||
+               error "create PFLR file $tf failed"
+
+       # file should be in ro status
+       verify_flr_state $tf "ro"
+
+       # write data in [0, 3M)
+       dd if=/dev/urandom of=$tf bs=1M count=3 conv=notrunc ||
+               error "writing $tf failed"
+
+       verify_flr_state $tf "wp"
+
+       # synchronize all mirrors of the file
+       $LFS mirror resync $tf || error "mirror resync $tf failed"
+
+       verify_flr_state $tf "ro"
+
+       # split mirror 1
+       $LFS mirror split --mirror-id 1 -f $tf1 $tf ||
+               error "split to $tf1 failed"
+
+       local idx0=$($LFS getstripe -m $tf)
+       local idx1=$($LFS getstripe -m $tf1)
+
+       [[ x$idx0 == x0 ]] || error "$tf is not on MDT0"
+       [[ x$idx1 == x1 ]] || error "$tf1 is not on MDT1"
+
+       # verify mirror count
+       verify_mirror_count $tf 3
+       verify_mirror_count $tf1 1
+
+       $LFS mirror split --mirror-id 2 $tf ||
+               error "split mirror 2 failed"
+
+       verify_mirror_count $tf 2
+       verify_mirror_count $tf.mirror~2 1
+
+       $LFS mirror split --mirror-id 3 -d $tf ||
+               error "split and delte mirror 3 failed"
+       verify_mirror_count $tf 1
+
+       # verify splitted file contains the same content as the orig file does
+       diff $tf $tf1 || error "splited file $tf1 diffs from $tf"
+       diff $tf $tf.mirror~2 ||
+               error "splited file $tf.mirror~2 diffs from $tf"
+}
+run_test 44 "lfs mirror split check"
+
 ctrl_file=$(mktemp /tmp/CTRL.XXXXXX)
 lock_file=$(mktemp /var/lock/FLR.XXXXXX)
 
 ctrl_file=$(mktemp /tmp/CTRL.XXXXXX)
 lock_file=$(mktemp /var/lock/FLR.XXXXXX)
 
index 89ade08..f247dd1 100644 (file)
@@ -124,25 +124,31 @@ enum setstripe_origin {
        SO_SETSTRIPE,
        SO_MIGRATE,
        SO_MIRROR_CREATE,
        SO_SETSTRIPE,
        SO_MIGRATE,
        SO_MIRROR_CREATE,
-       SO_MIRROR_EXTEND
+       SO_MIRROR_EXTEND,
+       SO_MIRROR_SPLIT,
 };
 };
-static int lfs_setstripe0(int argc, char **argv, enum setstripe_origin opc);
+static int lfs_setstripe_internal(int argc, char **argv,
+                                 enum setstripe_origin opc);
 
 static inline int lfs_setstripe(int argc, char **argv)
 {
 
 static inline int lfs_setstripe(int argc, char **argv)
 {
-       return lfs_setstripe0(argc, argv, SO_SETSTRIPE);
+       return lfs_setstripe_internal(argc, argv, SO_SETSTRIPE);
 }
 static inline int lfs_setstripe_migrate(int argc, char **argv)
 {
 }
 static inline int lfs_setstripe_migrate(int argc, char **argv)
 {
-       return lfs_setstripe0(argc, argv, SO_MIGRATE);
+       return lfs_setstripe_internal(argc, argv, SO_MIGRATE);
 }
 static inline int lfs_mirror_create(int argc, char **argv)
 {
 }
 static inline int lfs_mirror_create(int argc, char **argv)
 {
-       return lfs_setstripe0(argc, argv, SO_MIRROR_CREATE);
+       return lfs_setstripe_internal(argc, argv, SO_MIRROR_CREATE);
 }
 static inline int lfs_mirror_extend(int argc, char **argv)
 {
 }
 static inline int lfs_mirror_extend(int argc, char **argv)
 {
-       return lfs_setstripe0(argc, argv, SO_MIRROR_EXTEND);
+       return lfs_setstripe_internal(argc, argv, SO_MIRROR_EXTEND);
+}
+static inline int lfs_mirror_split(int argc, char **argv)
+{
+       return lfs_setstripe_internal(argc, argv, SO_MIRROR_SPLIT);
 }
 
 /* Setstripe and migrate share mostly the same parameters */
 }
 
 /* Setstripe and migrate share mostly the same parameters */
@@ -250,6 +256,16 @@ command_t mirror_cmdlist[] = {
                "<--mirror-count|-N[mirror_count]> [--no-verify] "
                "[setstripe options|--parent|-f <victim_file>] ... <filename>\n"
          MIRROR_EXTEND_HELP },
                "<--mirror-count|-N[mirror_count]> [--no-verify] "
                "[setstripe options|--parent|-f <victim_file>] ... <filename>\n"
          MIRROR_EXTEND_HELP },
+       { .pc_name = "split", .pc_func = lfs_mirror_split,
+         .pc_help = "Split a mirrored file.\n"
+       "usage: lfs mirror split <--mirror-id <mirror_id>> [--destroy|-d] "
+       "[-f <new_file>] <mirrored file>\n"
+       "\tmirror_id:    The numerical unique identifier for a mirror. It\n"
+       "\t              can be fetched by lfs getstripe command.\n"
+       "\tnew_file:     This option indicates the layout of the split\n"
+       "\t              mirror will be stored into. If not specified,\n"
+       "\t              a new file named <mirrored_file>.mirror~<mirror_id>\n"
+       "\t              will be used.\n" },
        { .pc_name = "resync", .pc_func = lfs_mirror_resync,
          .pc_help = "Resynchronizes out-of-sync mirrored file(s).\n"
                "usage: lfs mirror resync [--only <mirror_id[,...]>] "
        { .pc_name = "resync", .pc_func = lfs_mirror_resync,
          .pc_help = "Resynchronizes out-of-sync mirrored file(s).\n"
                "usage: lfs mirror resync [--only <mirror_id[,...]>] "
@@ -1173,14 +1189,16 @@ static inline int mirror_sanity_check_one(struct llapi_layout *layout)
 
 /**
  * enum mirror_flags - Flags for extending a mirrored file.
 
 /**
  * enum mirror_flags - Flags for extending a mirrored file.
- * @NO_VERIFY: Indicates not to verify the mirror(s) from victim file(s)
+ * @MF_NO_VERIFY: Indicates not to verify the mirror(s) from victim file(s)
  *            in case the victim file(s) contains the same data as the
  *            original mirrored file.
  *            in case the victim file(s) contains the same data as the
  *            original mirrored file.
+ * @MF_DESTROY: Indicates to delete the mirror from the mirrored file.
  *
  * Flags for extending a mirrored file.
  */
 enum mirror_flags {
  *
  * Flags for extending a mirrored file.
  */
 enum mirror_flags {
-       NO_VERIFY       = 0x1,
+       MF_NO_VERIFY    = 0x1,
+       MF_DESTROY      = 0x2,
 };
 
 /**
 };
 
 /**
@@ -1442,7 +1460,7 @@ static int mirror_extend_file(const char *fname, const char *victim_file,
                goto out;
        }
 
                goto out;
        }
 
-       if (!(mirror_flags & NO_VERIFY)) {
+       if (!(mirror_flags & MF_NO_VERIFY)) {
                ssize_t ret;
                /* mirrors should have the same contents */
                ret = mirror_file_compare(fd, fdv);
                ssize_t ret;
                /* mirrors should have the same contents */
                ret = mirror_file_compare(fd, fdv);
@@ -1601,6 +1619,175 @@ static int mirror_extend(char *fname, struct mirror_args *mirror_list,
        return rc;
 }
 
        return rc;
 }
 
+static int verify_id(struct llapi_layout *layout, void *cbdata)
+{
+       uint32_t id;
+       int rc;
+
+       rc = llapi_layout_mirror_id_get(layout, &id);
+       if (rc < 0)
+               return rc;
+
+       if ((__u16)id == *(__u16 *)cbdata)
+               return LLAPI_LAYOUT_ITER_STOP;
+
+       return LLAPI_LAYOUT_ITER_CONT;
+}
+
+static int mirror_split(const char *fname, __u16 mirror_id,
+                       enum mirror_flags mflags, const char *victim_file)
+{
+       struct llapi_layout *layout;
+       char parent[PATH_MAX];
+       char victim[PATH_MAX];
+       int flags = O_CREAT | O_EXCL | O_LOV_DELAY_CREATE | O_NOFOLLOW;
+       char *ptr;
+       struct ll_ioc_lease *data;
+       uint16_t mirror_count;
+       int mdt_index;
+       int fd, fdv;
+       int rc;
+
+       /* check fname contains mirror with mirror_id */
+       layout = llapi_layout_get_by_path(fname, 0);
+       if (!layout) {
+               fprintf(stderr,
+                       "error %s: file '%s' couldn't get layout\n",
+                       progname, fname);
+               return -EINVAL;
+       }
+
+       rc = mirror_sanity_check_one(layout);
+       if (rc)
+               goto free_layout;
+
+       rc = llapi_layout_mirror_count_get(layout, &mirror_count);
+       if (rc) {
+               fprintf(stderr,
+                       "error %s: file '%s' couldn't get mirror count\n",
+                       progname, fname);
+               goto free_layout;
+       }
+       if (mirror_count < 2) {
+               fprintf(stderr,
+                       "error %s: file '%s' has %d component, cannot split\n",
+                       progname, fname, mirror_count);
+               goto free_layout;
+       }
+
+       rc = llapi_layout_comp_iterate(layout, verify_id, &mirror_id);
+       if (rc < 0) {
+               fprintf(stderr, "error %s: failed to iterate layout of '%s'\n",
+                       progname, fname);
+               goto free_layout;
+       } else if (rc == LLAPI_LAYOUT_ITER_CONT) {
+               fprintf(stderr,
+                    "error %s: file '%s' does not contain mirror with id %u\n",
+                       progname, fname, mirror_id);
+               goto free_layout;
+       }
+
+       fd = open(fname, O_RDWR);
+       if (fd < 0) {
+               fprintf(stderr,
+                       "error %s: open file '%s' failed: %s\n",
+                       progname, fname, strerror(errno));
+               goto free_layout;
+       }
+
+       /* get victim file directory pathname */
+       if (strlen(fname) > sizeof(parent) - 1) {
+               fprintf(stderr, "error %s: file name of '%s' too long\n",
+                       progname, fname);
+               rc = -ERANGE;
+               goto free_layout;
+       }
+       strncpy(parent, fname, sizeof(parent));
+       ptr = strrchr(parent, '/');
+       if (ptr == NULL) {
+               if (getcwd(parent, sizeof(parent)) == NULL) {
+                       fprintf(stderr, "error %s: getcwd failed: %s\n",
+                               progname, strerror(errno));
+                       rc = -errno;
+                       goto free_layout;
+               }
+       } else {
+               if (ptr == parent)
+                       ptr = parent + 1;
+               *ptr = '\0';
+       }
+
+       rc = llapi_file_fget_mdtidx(fd, &mdt_index);
+       if (rc < 0) {
+               fprintf(stderr, "%s: cannot get MDT index of '%s'\n",
+                       progname, fname);
+               goto free_layout;
+       }
+
+       if (victim_file == NULL) {
+               /* use a temp file to store the splitted layout */
+               if (mflags & MF_DESTROY) {
+                       fdv = llapi_create_volatile_idx(parent, mdt_index,
+                                                       O_LOV_DELAY_CREATE);
+               } else {
+                       snprintf(victim, sizeof(victim), "%s.mirror~%u",
+                                fname, mirror_id);
+                       fdv = open(victim, flags, S_IRUSR | S_IWUSR);
+               }
+       } else {
+               /* user specified victim file */
+               fdv = open(victim_file, flags, S_IRUSR | S_IWUSR);
+       }
+
+       if (fdv < 0) {
+               fprintf(stderr,
+                       "error %s: create victim file failed: %s\n",
+                       progname, strerror(errno));
+               goto close_fd;
+       }
+
+       /* get lease lock of fname */
+       rc = llapi_lease_acquire(fd, LL_LEASE_WRLCK);
+       if (rc < 0) {
+               fprintf(stderr,
+                       "error %s: cannot get lease of file '%s': %d\n",
+                       progname, fname, rc);
+               goto close_victim;
+       }
+
+       /* Atomatically put lease, split layouts and close. */
+       data = malloc(offsetof(typeof(*data), lil_ids[2]));
+       if (!data) {
+               rc = -ENOMEM;
+               goto close_victim;
+       }
+
+       data->lil_mode = LL_LEASE_UNLCK;
+       data->lil_flags = LL_LEASE_LAYOUT_SPLIT;
+       data->lil_count = 2;
+       data->lil_ids[0] = fdv;
+       data->lil_ids[1] = mirror_id;
+       rc = llapi_lease_set(fd, data);
+       if (rc <= 0) {
+               if (rc == 0) /* lost lease lock */
+                       rc = -EBUSY;
+               fprintf(stderr,
+                       "error %s: cannot split '%s': %s\n",
+                       progname, fname, strerror(-rc));
+       } else {
+               rc = 0;
+       }
+       free(data);
+
+close_victim:
+       close(fdv);
+close_fd:
+       close(fd);
+free_layout:
+       llapi_layout_free(layout);
+       return rc;
+}
+
 /**
  * Parse a string containing an target index list into an array of integers.
  *
 /**
  * Parse a string containing an target index list into an array of integers.
  *
@@ -2074,10 +2261,12 @@ enum {
        LFS_COMP_NO_VERIFY_OPT,
        LFS_PROJID_OPT,
        LFS_MIRROR_FLAGS_OPT,
        LFS_COMP_NO_VERIFY_OPT,
        LFS_PROJID_OPT,
        LFS_MIRROR_FLAGS_OPT,
+       LFS_MIRROR_ID_OPT,
 };
 
 /* functions */
 };
 
 /* functions */
-static int lfs_setstripe0(int argc, char **argv, enum setstripe_origin opc)
+static int lfs_setstripe_internal(int argc, char **argv,
+                                 enum setstripe_origin opc)
 {
        struct lfs_setstripe_args        lsa;
        struct llapi_stripe_param       *param = NULL;
 {
        struct lfs_setstripe_args        lsa;
        struct llapi_stripe_param       *param = NULL;
@@ -2109,6 +2298,7 @@ static int lfs_setstripe0(int argc, char **argv, enum setstripe_origin opc)
        struct mirror_args              *mirror_list = NULL;
        struct mirror_args              *new_mirror = NULL;
        struct mirror_args              *last_mirror = NULL;
        struct mirror_args              *mirror_list = NULL;
        struct mirror_args              *new_mirror = NULL;
        struct mirror_args              *last_mirror = NULL;
+       __u16                            mirror_id = 0;
        char                             cmd[PATH_MAX];
 
        struct option long_opts[] = {
        char                             cmd[PATH_MAX];
 
        struct option long_opts[] = {
@@ -2139,10 +2329,13 @@ static int lfs_setstripe0(int argc, char **argv, enum setstripe_origin opc)
                        .name = "no-verify",    .has_arg = no_argument},
        { .val = LFS_MIRROR_FLAGS_OPT,
                        .name = "flags",        .has_arg = required_argument},
                        .name = "no-verify",    .has_arg = no_argument},
        { .val = LFS_MIRROR_FLAGS_OPT,
                        .name = "flags",        .has_arg = required_argument},
+       { .val = LFS_MIRROR_ID_OPT,
+                       .name = "mirror-id",    .has_arg = required_argument},
        { .val = 'c',   .name = "stripe-count", .has_arg = required_argument},
        { .val = 'c',   .name = "stripe_count", .has_arg = required_argument},
 /* find        { .val = 'C',   .name = "ctime",        .has_arg = required_argument }*/
        { .val = 'd',   .name = "delete",       .has_arg = no_argument},
        { .val = 'c',   .name = "stripe-count", .has_arg = required_argument},
        { .val = 'c',   .name = "stripe_count", .has_arg = required_argument},
 /* find        { .val = 'C',   .name = "ctime",        .has_arg = required_argument }*/
        { .val = 'd',   .name = "delete",       .has_arg = no_argument},
+       { .val = 'd',   .name = "destroy",      .has_arg = no_argument},
        { .val = 'E',   .name = "comp-end",     .has_arg = required_argument},
        { .val = 'E',   .name = "component-end",
                                                .has_arg = required_argument},
        { .val = 'E',   .name = "comp-end",     .has_arg = required_argument},
        { .val = 'E',   .name = "component-end",
                                                .has_arg = required_argument},
@@ -2234,7 +2427,16 @@ static int lfs_setstripe0(int argc, char **argv, enum setstripe_origin opc)
                        setstripe_args_init(&lsa);
                        break;
                case LFS_COMP_NO_VERIFY_OPT:
                        setstripe_args_init(&lsa);
                        break;
                case LFS_COMP_NO_VERIFY_OPT:
-                       mirror_flags |= NO_VERIFY;
+                       mirror_flags |= MF_NO_VERIFY;
+                       break;
+               case LFS_MIRROR_ID_OPT:
+                       mirror_id = strtoul(optarg, &end, 0);
+                       if (*end != '\0' || mirror_id == 0) {
+                               fprintf(stderr,
+                                       "%s %s: invalid mirror ID '%s'\n",
+                                       progname, argv[0], optarg);
+                               goto usage_error;
+                       }
                        break;
                case LFS_MIRROR_FLAGS_OPT: {
                        __u32 flags;
                        break;
                case LFS_MIRROR_FLAGS_OPT: {
                        __u32 flags;
@@ -2289,6 +2491,15 @@ static int lfs_setstripe0(int argc, char **argv, enum setstripe_origin opc)
                case 'd':
                        /* delete the default striping pattern */
                        delete = 1;
                case 'd':
                        /* delete the default striping pattern */
                        delete = 1;
+                       if (opc == SO_MIRROR_SPLIT) {
+                               if (has_m_file) {
+                                       fprintf(stderr,
+                                             "%s %s: -d cannot used with -f\n",
+                                               progname, argv[0]);
+                                       goto usage_error;
+                               }
+                               mirror_flags |= MF_DESTROY;
+                       }
                        break;
                case 'E':
                        if (lsa.lsa_comp_end != 0) {
                        break;
                case 'E':
                        if (lsa.lsa_comp_end != 0) {
@@ -2339,21 +2550,27 @@ static int lfs_setstripe0(int argc, char **argv, enum setstripe_origin opc)
                        }
                        break;
                case 'f':
                        }
                        break;
                case 'f':
-                       if (opc != SO_MIRROR_EXTEND) {
+                       if (opc != SO_MIRROR_EXTEND && opc != SO_MIRROR_SPLIT) {
                                fprintf(stderr,
                                        "error: %s: invalid option: %s\n",
                                        progname, argv[optopt + 1]);
                                goto usage_error;
                        }
                                fprintf(stderr,
                                        "error: %s: invalid option: %s\n",
                                        progname, argv[optopt + 1]);
                                goto usage_error;
                        }
-                       if (last_mirror == NULL) {
-                               fprintf(stderr, "error: %s: '-N' must exist "
-                                       "in front of '%s'\n",
-                                       progname, argv[optopt + 1]);
-                               goto usage_error;
+                       if (opc == SO_MIRROR_EXTEND) {
+                               if (last_mirror == NULL) {
+                                       fprintf(stderr,
+                               "error: %s: '-N' must exist in front of '%s'\n",
+                                               progname, argv[optopt + 1]);
+                                       goto usage_error;
+                               }
+                               last_mirror->m_file = optarg;
+                               last_mirror->m_count = 1;
+                       } else {
+                               /* mirror split */
+                               if (mirror_list == NULL)
+                                       mirror_list = lfs_mirror_alloc();
+                               mirror_list->m_file = optarg;
                        }
                        }
-
-                       last_mirror->m_file = optarg;
-                       last_mirror->m_count = 1;
                        has_m_file = true;
                        break;
                case 'L':
                        has_m_file = true;
                        break;
                case 'L':
@@ -2518,7 +2735,7 @@ static int lfs_setstripe0(int argc, char **argv, enum setstripe_origin opc)
                        goto error;
        }
 
                        goto error;
        }
 
-       if (mirror_flags & NO_VERIFY) {
+       if (mirror_flags & MF_NO_VERIFY) {
                if (opc != SO_MIRROR_EXTEND) {
                        fprintf(stderr,
                                "error: %s: --no-verify is valid only for lfs mirror extend command\n",
                if (opc != SO_MIRROR_EXTEND) {
                        fprintf(stderr,
                                "error: %s: --no-verify is valid only for lfs mirror extend command\n",
@@ -2704,6 +2921,16 @@ static int lfs_setstripe0(int argc, char **argv, enum setstripe_origin opc)
                } else if (opc == SO_MIRROR_EXTEND) {
                        result = mirror_extend(fname, mirror_list,
                                               mirror_flags);
                } else if (opc == SO_MIRROR_EXTEND) {
                        result = mirror_extend(fname, mirror_list,
                                               mirror_flags);
+               } else if (opc == SO_MIRROR_SPLIT) {
+                       if (mirror_id == 0) {
+                               fprintf(stderr,
+                                       "%s %s: no mirror id is specified\n",
+                                       progname, argv[0]);
+                               goto usage_error;
+                       }
+                       result = mirror_split(fname, mirror_id, mirror_flags,
+                                             has_m_file ? mirror_list->m_file :
+                                             NULL);
                } else if (layout != NULL) {
                        result = lfs_component_create(fname, O_CREAT | O_WRONLY,
                                                      0644, layout);
                } else if (layout != NULL) {
                        result = lfs_component_create(fname, O_CREAT | O_WRONLY,
                                                      0644, layout);
@@ -6913,7 +7140,6 @@ static inline
 int lfs_mirror_resync_file(const char *fname, struct ll_ioc_lease *ioc,
                           __u16 *mirror_ids, int ids_nr)
 {
 int lfs_mirror_resync_file(const char *fname, struct ll_ioc_lease *ioc,
                           __u16 *mirror_ids, int ids_nr)
 {
-       const char *progname = "lfs mirror resync";
        struct llapi_resync_comp comp_array[1024] = { { 0 } };
        struct llapi_layout *layout;
        struct stat stbuf;
        struct llapi_resync_comp comp_array[1024] = { { 0 } };
        struct llapi_layout *layout;
        struct stat stbuf;
@@ -6948,8 +7174,9 @@ int lfs_mirror_resync_file(const char *fname, struct ll_ioc_lease *ioc,
        ioc->lil_flags = LL_LEASE_RESYNC;
        rc = llapi_lease_set(fd, ioc);
        if (rc < 0) {
        ioc->lil_flags = LL_LEASE_RESYNC;
        rc = llapi_lease_set(fd, ioc);
        if (rc < 0) {
-               fprintf(stderr, "%s: '%s' llapi_lease_set resync failed: "
-                       "%s.\n", progname, fname, strerror(errno));
+               fprintf(stderr,
+                       "%s: '%s' llapi_lease_set resync failed: %s.\n",
+                       progname, fname, strerror(errno));
                goto close_fd;
        }
 
                goto close_fd;
        }
 
@@ -7108,8 +7335,9 @@ static inline int lfs_mirror_resync(int argc, char **argv)
        }
 
        if (ids_nr > 0 && argc > optind + 1) {
        }
 
        if (ids_nr > 0 && argc > optind + 1) {
-               fprintf(stderr, "%s: option '--only' cannot be used upon "
-                       "multiple files.\n", argv[0]);
+               fprintf(stderr,
+                   "%s: option '--only' cannot be used upon multiple files.\n",
+                       argv[0]);
                rc = CMD_HELP;
                goto error;
 
                rc = CMD_HELP;
                goto error;
 
@@ -7133,9 +7361,6 @@ static inline int lfs_mirror_resync(int argc, char **argv)
        for (; optind < argc; optind++) {
                rc = lfs_mirror_resync_file(argv[optind], ioc,
                                            mirror_ids, ids_nr);
        for (; optind < argc; optind++) {
                rc = lfs_mirror_resync_file(argv[optind], ioc,
                                            mirror_ids, ids_nr);
-               if (rc)
-                       fprintf(stderr, "%s: resync file '%s' failed: %d\n",
-                               argv[0], argv[optind], rc);
                /* ignore previous file's error, continue with next file */
 
                /* reset ioc */
                /* ignore previous file's error, continue with next file */
 
                /* reset ioc */
index 87dffba..ed07763 100644 (file)
@@ -2571,7 +2571,7 @@ static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
                        llapi_printf(LLAPI_MSG_NORMAL, "%s",
                                     layout2name(lum->lmm_pattern));
                else
                        llapi_printf(LLAPI_MSG_NORMAL, "%s",
                                     layout2name(lum->lmm_pattern));
                else
-                       llapi_printf(LLAPI_MSG_NORMAL, "%.x", lum->lmm_pattern);
+                       llapi_printf(LLAPI_MSG_NORMAL, "%x", lum->lmm_pattern);
                separator = is_dir ? " " : "\n";
        }
 
                separator = is_dir ? " " : "\n";
        }
 
@@ -2927,6 +2927,19 @@ static void lov_dump_comp_v1_entry(struct find_param *param,
                separator = "\n";
        }
 
                separator = "\n";
        }
 
+       if (verbose & VERBOSE_MIRROR_ID) {
+               llapi_printf(LLAPI_MSG_NORMAL, "%s", separator);
+               if (verbose & ~VERBOSE_MIRROR_ID)
+                       llapi_printf(LLAPI_MSG_NORMAL,
+                                    "%4slcme_mirror_id:      ", " ");
+               if (entry->lcme_id != LCME_ID_INVAL)
+                       llapi_printf(LLAPI_MSG_NORMAL, "%u",
+                                    mirror_id_of(entry->lcme_id));
+               else
+                       llapi_printf(LLAPI_MSG_NORMAL, "N/A");
+               separator = "\n";
+       }
+
        if (verbose & VERBOSE_COMP_FLAGS) {
                llapi_printf(LLAPI_MSG_NORMAL, "%s", separator);
                if (verbose & ~VERBOSE_COMP_FLAGS)
        if (verbose & VERBOSE_COMP_FLAGS) {
                llapi_printf(LLAPI_MSG_NORMAL, "%s", separator);
                if (verbose & ~VERBOSE_COMP_FLAGS)
index 531e083..f289224 100644 (file)
@@ -34,6 +34,7 @@
 #include <unistd.h>
 #include <errno.h>
 #include <limits.h>
 #include <unistd.h>
 #include <errno.h>
 #include <limits.h>
+#include <assert.h>
 #include <sys/xattr.h>
 #include <sys/param.h>
 
 #include <sys/xattr.h>
 #include <sys/param.h>
 
@@ -2231,7 +2232,13 @@ bool llapi_layout_is_composite(struct llapi_layout *layout)
 /**
  * Iterate every components in the @layout and call callback function @cb.
  *
 /**
  * Iterate every components in the @layout and call callback function @cb.
  *
- * \param[in]
+ * \param[in] layout   component layout list.
+ * \param[in] cb       callback for each component
+ * \param[in] cbdata   callback data
+ *
+ * \retval < 0                         error happens during the iteration
+ * \retval LLAPI_LAYOUT_ITER_CONT      finished the iteration w/o error
+ * \retval LLAPI_LAYOUT_ITER_STOP      got something, stop the iteration
  */
 int llapi_layout_comp_iterate(struct llapi_layout *layout,
                              llapi_layout_iter_cb cb, void *cbdata)
  */
 int llapi_layout_comp_iterate(struct llapi_layout *layout,
                              llapi_layout_iter_cb cb, void *cbdata)
@@ -2242,7 +2249,13 @@ int llapi_layout_comp_iterate(struct llapi_layout *layout,
        if (rc < 0)
                return rc;
 
        if (rc < 0)
                return rc;
 
-       while (rc == 0) {
+       /**
+        * make sure on success llapi_layout_comp_use() API returns 0 with
+        * USE_FIRST.
+        */
+       assert(rc == 0);
+
+       while (1) {
                rc = cb(layout, cbdata);
                if (rc != LLAPI_LAYOUT_ITER_CONT)
                        break;
                rc = cb(layout, cbdata);
                if (rc != LLAPI_LAYOUT_ITER_CONT)
                        break;
@@ -2250,9 +2263,11 @@ int llapi_layout_comp_iterate(struct llapi_layout *layout,
                rc = llapi_layout_comp_use(layout, LLAPI_LAYOUT_COMP_USE_NEXT);
                if (rc < 0)
                        return rc;
                rc = llapi_layout_comp_use(layout, LLAPI_LAYOUT_COMP_USE_NEXT);
                if (rc < 0)
                        return rc;
+               else if (rc == 1)       /* reached the last comp */
+                       return LLAPI_LAYOUT_ITER_CONT;
        }
 
        }
 
-       return rc >= 0 ? LLAPI_LAYOUT_ITER_CONT : rc;
+       return rc;
 }
 
 /**
 }
 
 /**
index ec17444..57c8124 100644 (file)
@@ -1006,6 +1006,27 @@ check_ll_fid(void)
 }
 
 static void
 }
 
 static void
+check_mds_op_bias(void)
+{
+       BLANK_LINE();
+       CHECK_VALUE_X(MDS_CHECK_SPLIT);
+       CHECK_VALUE_X(MDS_CROSS_REF);
+       CHECK_VALUE_X(MDS_VTX_BYPASS);
+       CHECK_VALUE_X(MDS_PERM_BYPASS);
+       CHECK_VALUE_X(MDS_QUOTA_IGNORE);
+       CHECK_VALUE_X(MDS_KEEP_ORPHAN);
+       CHECK_VALUE_X(MDS_RECOV_OPEN);
+       CHECK_VALUE_X(MDS_DATA_MODIFIED);
+       CHECK_VALUE_X(MDS_CREATE_VOLATILE);
+       CHECK_VALUE_X(MDS_OWNEROVERRIDE);
+       CHECK_VALUE_X(MDS_HSM_RELEASE);
+       CHECK_VALUE_X(MDS_CLOSE_LAYOUT_SWAP);
+       CHECK_VALUE_X(MDS_CLOSE_LAYOUT_MERGE);
+       CHECK_VALUE_X(MDS_CLOSE_RESYNC_DONE);
+       CHECK_VALUE_X(MDS_CLOSE_LAYOUT_SPLIT);
+}
+
+static void
 check_mdt_body(void)
 {
        BLANK_LINE();
 check_mdt_body(void)
 {
        BLANK_LINE();
@@ -2743,6 +2764,7 @@ main(int argc, char **argv)
        check_niobuf_remote();
        check_ost_body();
        check_ll_fid();
        check_niobuf_remote();
        check_ost_body();
        check_ll_fid();
+       check_mds_op_bias();
        check_mdt_body();
        check_mdt_ioepoch();
        check_mdt_rec_setattr();
        check_mdt_body();
        check_mdt_ioepoch();
        check_mdt_rec_setattr();
index 9518361..f22804f 100644 (file)
@@ -2220,6 +2220,37 @@ void lustre_assert_wire_constants(void)
        LASSERTF((int)sizeof(((struct ll_fid *)0)->f_type) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct ll_fid *)0)->f_type));
 
        LASSERTF((int)sizeof(((struct ll_fid *)0)->f_type) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct ll_fid *)0)->f_type));
 
+       LASSERTF(MDS_CHECK_SPLIT == 0x00000001UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_CHECK_SPLIT);
+       LASSERTF(MDS_CROSS_REF == 0x00000002UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_CROSS_REF);
+       LASSERTF(MDS_VTX_BYPASS == 0x00000004UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_VTX_BYPASS);
+       LASSERTF(MDS_PERM_BYPASS == 0x00000008UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_PERM_BYPASS);
+       LASSERTF(MDS_QUOTA_IGNORE == 0x00000020UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_QUOTA_IGNORE);
+       LASSERTF(MDS_KEEP_ORPHAN == 0x00000080UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_KEEP_ORPHAN);
+       LASSERTF(MDS_RECOV_OPEN == 0x00000100UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_RECOV_OPEN);
+       LASSERTF(MDS_DATA_MODIFIED == 0x00000200UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_DATA_MODIFIED);
+       LASSERTF(MDS_CREATE_VOLATILE == 0x00000400UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_CREATE_VOLATILE);
+       LASSERTF(MDS_OWNEROVERRIDE == 0x00000800UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_OWNEROVERRIDE);
+       LASSERTF(MDS_HSM_RELEASE == 0x00001000UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_HSM_RELEASE);
+       LASSERTF(MDS_CLOSE_LAYOUT_SWAP == 0x00004000UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_CLOSE_LAYOUT_SWAP);
+       LASSERTF(MDS_CLOSE_LAYOUT_MERGE == 0x00008000UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_CLOSE_LAYOUT_MERGE);
+       LASSERTF(MDS_CLOSE_RESYNC_DONE == 0x00010000UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_CLOSE_RESYNC_DONE);
+       LASSERTF(MDS_CLOSE_LAYOUT_SPLIT == 0x00020000UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_CLOSE_LAYOUT_SPLIT);
+
        /* Checks for struct mdt_body */
        LASSERTF((int)sizeof(struct mdt_body) == 216, "found %lld\n",
                 (long long)(int)sizeof(struct mdt_body));
        /* Checks for struct mdt_body */
        LASSERTF((int)sizeof(struct mdt_body) == 216, "found %lld\n",
                 (long long)(int)sizeof(struct mdt_body));