+/**
+ * Extract the mirror with specified mirror id, and store the splitted
+ * mirror layout to @buf.
+ *
+ * \param[in] comp_v1 mirrored layout
+ * \param[in] mirror_id the mirror with mirror_id to be extracted
+ * \param[out] buf store the layout excluding the extracted mirror,
+ * caller free the buffer we allocated in this function
+ * \param[out] buf_vic store the extracted layout, caller free the buffer
+ * we allocated in this function
+ *
+ * \retval 0 on success; < 0 if error happens
+ */
+static int mdd_split_ea(struct lov_comp_md_v1 *comp_v1, __u16 mirror_id,
+ struct lu_buf *buf, struct lu_buf *buf_vic)
+{
+ struct lov_comp_md_v1 *comp_rem;
+ struct lov_comp_md_v1 *comp_vic;
+ struct lov_comp_md_entry_v1 *entry;
+ struct lov_comp_md_entry_v1 *entry_rem;
+ struct lov_comp_md_entry_v1 *entry_vic;
+ __u16 mirror_cnt;
+ __u16 comp_cnt, count = 0;
+ int lmm_size, lmm_size_vic = 0;
+ int i, j, k;
+ int offset, offset_rem, offset_vic;
+
+ mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1;
+ /* comp_v1 should contains more than 1 mirror */
+ if (mirror_cnt <= 1)
+ return -EINVAL;
+ comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count);
+ lmm_size = le32_to_cpu(comp_v1->lcm_size);
+
+ for (i = 0; i < comp_cnt; i++) {
+ entry = &comp_v1->lcm_entries[i];
+ if (mirror_id_of(le32_to_cpu(entry->lcme_id)) == mirror_id) {
+ count++;
+ lmm_size_vic += sizeof(*entry);
+ lmm_size_vic += le32_to_cpu(entry->lcme_size);
+ } else if (count > 0) {
+ /* find the specified mirror */
+ break;
+ }
+ }
+
+ if (count == 0)
+ return -EINVAL;
+
+ lu_buf_alloc(buf, lmm_size - lmm_size_vic);
+ if (!buf->lb_buf)
+ return -ENOMEM;
+
+ lu_buf_alloc(buf_vic, sizeof(*comp_vic) + lmm_size_vic);
+ if (!buf_vic->lb_buf) {
+ lu_buf_free(buf);
+ return -ENOMEM;
+ }
+
+ comp_rem = (struct lov_comp_md_v1 *)buf->lb_buf;
+ comp_vic = (struct lov_comp_md_v1 *)buf_vic->lb_buf;
+
+ memcpy(comp_rem, comp_v1, sizeof(*comp_v1));
+ comp_rem->lcm_mirror_count = cpu_to_le16(mirror_cnt - 2);
+ comp_rem->lcm_entry_count = cpu_to_le32(comp_cnt - count);
+ comp_rem->lcm_size = cpu_to_le32(lmm_size - lmm_size_vic);
+ if (!comp_rem->lcm_mirror_count)
+ comp_rem->lcm_flags = cpu_to_le16(LCM_FL_NOT_FLR);
+
+ memset(comp_vic, 0, sizeof(*comp_v1));
+ comp_vic->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1);
+ comp_vic->lcm_mirror_count = 0;
+ comp_vic->lcm_entry_count = cpu_to_le32(count);
+ comp_vic->lcm_size = cpu_to_le32(lmm_size_vic + sizeof(*comp_vic));
+ comp_vic->lcm_flags = cpu_to_le16(LCM_FL_NOT_FLR);
+ comp_vic->lcm_layout_gen = 0;
+
+ offset = sizeof(*comp_v1) + sizeof(*entry) * comp_cnt;
+ offset_rem = sizeof(*comp_rem) +
+ sizeof(*entry_rem) * (comp_cnt - count);
+ offset_vic = sizeof(*comp_vic) + sizeof(*entry_vic) * count;
+ for (i = j = k = 0; i < comp_cnt; i++) {
+ struct lov_mds_md *lmm, *lmm_dst;
+ bool vic = false;
+
+ entry = &comp_v1->lcm_entries[i];
+ entry_vic = &comp_vic->lcm_entries[j];
+ entry_rem = &comp_rem->lcm_entries[k];
+
+ if (mirror_id_of(le32_to_cpu(entry->lcme_id)) == mirror_id)
+ vic = true;
+
+ /* copy component entry */
+ if (vic) {
+ memcpy(entry_vic, entry, sizeof(*entry));
+ entry_vic->lcme_flags &= cpu_to_le32(LCME_FL_INIT);
+ entry_vic->lcme_offset = cpu_to_le32(offset_vic);
+ j++;
+ } else {
+ memcpy(entry_rem, entry, sizeof(*entry));
+ entry_rem->lcme_offset = cpu_to_le32(offset_rem);
+ k++;
+ }
+
+ lmm = (struct lov_mds_md *)((char *)comp_v1 + offset);
+ if (vic)
+ lmm_dst = (struct lov_mds_md *)
+ ((char *)comp_vic + offset_vic);
+ else
+ lmm_dst = (struct lov_mds_md *)
+ ((char *)comp_rem + offset_rem);
+
+ /* copy component entry blob */
+ memcpy(lmm_dst, lmm, le32_to_cpu(entry->lcme_size));
+
+ /* blob offset advance */
+ offset += le32_to_cpu(entry->lcme_size);
+ if (vic)
+ offset_vic += le32_to_cpu(entry->lcme_size);
+ else
+ offset_rem += le32_to_cpu(entry->lcme_size);
+ }
+
+ return 0;
+}
+
+static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
+ struct md_rejig_data *mrd)
+{
+ struct mdd_device *mdd = mdo2mdd(md_obj);
+ struct mdd_object *obj = md2mdd_obj(md_obj);
+ struct mdd_object *vic = md2mdd_obj(mrd->mrd_obj);
+ struct lu_buf *buf = &mdd_env_info(env)->mti_buf[0];
+ struct lu_buf *buf_save = &mdd_env_info(env)->mti_buf[1];
+ struct lu_buf *buf_vic = &mdd_env_info(env)->mti_buf[2];
+ struct lov_comp_md_v1 *lcm;
+ struct thandle *handle;
+ int rc;
+ ENTRY;
+
+ rc = lu_fid_cmp(mdo2fid(obj), mdo2fid(vic));
+ if (rc == 0) /* same fid */
+ RETURN(-EPERM);
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ RETURN(PTR_ERR(handle));
+
+ if (rc > 0) {
+ mdd_write_lock(env, obj, MOR_TGT_CHILD);
+ mdd_write_lock(env, vic, MOR_TGT_CHILD);
+ } else {
+ mdd_write_lock(env, vic, MOR_TGT_CHILD);
+ mdd_write_lock(env, obj, MOR_TGT_CHILD);
+ }
+
+ /* get EA of mirrored file */
+ memset(buf_save, 0, sizeof(*buf));
+ rc = mdd_get_lov_ea(env, obj, buf_save);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ lcm = buf_save->lb_buf;
+ if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
+ GOTO(out, rc = -EINVAL);
+
+ /**
+ * Extract the mirror with specified mirror id, and store the splitted
+ * mirror layout to the victim file.
+ */
+ memset(buf, 0, sizeof(*buf));
+ memset(buf_vic, 0, sizeof(*buf_vic));
+ rc = mdd_split_ea(lcm, mrd->mrd_mirror_id, buf, buf_vic);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
+ LU_XATTR_SPLIT, handle);
+ if (rc)
+ GOTO(out, rc);
+ rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic, XATTR_NAME_LOV,
+ LU_XATTR_SPLIT, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE,
+ handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdo_xattr_set(env, vic, buf_vic, XATTR_NAME_LOV, LU_XATTR_CREATE,
+ handle);
+ if (rc)
+ GOTO(out_restore, rc);
+
+ rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic, handle);
+ if (rc)
+ GOTO(out, rc);
+ EXIT;
+
+out_restore:
+ if (rc) {
+ /* restore obj's layout */
+ int rc2 = mdo_xattr_set(env, obj, buf_save, XATTR_NAME_LOV,
+ LU_XATTR_REPLACE, handle);
+ if (rc2)
+ CERROR("%s: failed to rollback of layout of: "DFID
+ ": %d, file state unkonwn.\n",
+ mdd_obj_dev_name(obj), PFID(mdo2fid(obj)), rc2);
+ }
+out:
+ mdd_trans_stop(env, mdd, rc, handle);
+ mdd_write_unlock(env, obj);
+ mdd_write_unlock(env, vic);
+ lu_buf_free(buf_save);
+ lu_buf_free(buf);
+ lu_buf_free(buf_vic);
+
+ if (!rc)
+ (void) mdd_object_pfid_replace(env, obj);
+
+ return rc;
+}
+