Whamcloud - gitweb
LU-14521 flr: delete mirror without volatile file 16/42116/4
authorBobi Jam <bobijam@whamcloud.com>
Fri, 19 Mar 2021 10:22:10 +0000 (18:22 +0800)
committerOleg Drokin <green@whamcloud.com>
Sat, 10 Apr 2021 17:40:46 +0000 (17:40 +0000)
Rather than opening a volatile file to delete a FLR mirror, this
patch delete sub objects on the specified mirror directly during
the mirror deletion handling.

Signed-off-by: Bobi Jam <bobijam@whamcloud.com>
Change-Id: I7a5e7488dbc820fdfa312218f363955a35752034
Reviewed-on: https://review.whamcloud.com/42116
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: John L. Hammond <jhammond@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lu_object.h
lustre/llite/file.c
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/lod/lod_object.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_open.c
lustre/utils/lfs.c

index 9d5437d..1ec8dd6 100644 (file)
@@ -933,6 +933,7 @@ enum lu_xattr_flags {
        LU_XATTR_CREATE  = BIT(1),
        LU_XATTR_MERGE   = BIT(2),
        LU_XATTR_SPLIT   = BIT(3),
+       LU_XATTR_PURGE   = BIT(4),
 };
 
 /** @} helpers */
index 0149146..de918fb 100644 (file)
@@ -3481,6 +3481,7 @@ static long ll_file_unlock_lease(struct file *file, struct ll_ioc_lease *ioc,
                if (!layout_file)
                        GOTO(out_lease_close, rc = -EBADF);
 
+               /* if layout_file == file, it means to destroy the mirror */
                sp.sp_inode = file_inode(layout_file);
                sp.sp_mirror_id = (__u16)mirror_id;
                data = &sp;
index e6a819d..e75d2f1 100644 (file)
@@ -548,6 +548,7 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
 int lod_del_device(const struct lu_env *env, struct lod_device *lod,
                   struct lod_tgt_descs *ltd, char *osp, unsigned int idx,
                   unsigned int gen);
+int validate_lod_and_idx(struct lod_device *lod, __u32 idx);
 int lod_fini_tgt(const struct lu_env *env, struct lod_device *lod,
                 struct lod_tgt_descs *ltd);
 int lod_striping_load(const struct lu_env *env, struct lod_object *lo);
index de18c92..4376eaa 100644 (file)
@@ -958,7 +958,7 @@ repeat:
  * \retval                     0 if the index is present
  * \retval                     -EINVAL if not
  */
-static int validate_lod_and_idx(struct lod_device *md, __u32 idx)
+int validate_lod_and_idx(struct lod_device *md, __u32 idx)
 {
        if (unlikely(idx >= md->lod_ost_descs.ltd_tgts_size ||
                     !test_bit(idx, md->lod_ost_bitmap))) {
index caa32fb..e861921 100644 (file)
@@ -3422,6 +3422,186 @@ static int lod_declare_layout_split(const struct lu_env *env,
        RETURN(rc);
 }
 
+static int lod_layout_declare_or_purge_mirror(const struct lu_env *env,
+                       struct dt_object *dt, const struct lu_buf *buf,
+                       struct thandle *th, bool declare)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
+       struct lov_comp_md_entry_v1 *entry;
+       struct lov_mds_md_v1 *lmm;
+       struct dt_object **sub_objs = NULL;
+       int rc = 0, i, k, array_count = 0;
+
+       ENTRY;
+
+       if (!declare) {
+               /* prepare sub-objects array */
+               for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+                       entry = &comp_v1->lcm_entries[i];
+
+                       if (!(entry->lcme_flags & LCME_FL_INIT))
+                               continue;
+
+                       lmm = (struct lov_mds_md_v1 *)
+                                       ((char *)comp_v1 + entry->lcme_offset);
+                       array_count += lmm->lmm_stripe_count;
+               }
+               OBD_ALLOC_PTR_ARRAY(sub_objs, array_count);
+               if (sub_objs == NULL)
+                       RETURN(-ENOMEM);
+       }
+
+       k = 0;  /* sub_objs index */
+       for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+               struct lov_ost_data_v1 *objs;
+               struct lu_object *o, *n;
+               struct dt_object *dto;
+               struct lu_device *nd;
+               struct lov_mds_md_v3 *v3;
+               __u32 idx;
+               int j;
+
+               entry = &comp_v1->lcm_entries[i];
+
+               if (!(entry->lcme_flags & LCME_FL_INIT))
+                       continue;
+
+               lmm = (struct lov_mds_md_v1 *)
+                               ((char *)comp_v1 + entry->lcme_offset);
+               v3 = (struct lov_mds_md_v3 *)lmm;
+               if (lmm->lmm_magic == LOV_MAGIC_V3)
+                       objs = &v3->lmm_objects[0];
+               else
+                       objs = &lmm->lmm_objects[0];
+
+               for (j = 0; j < lmm->lmm_stripe_count; j++) {
+                       idx = objs[j].l_ost_idx;
+                       rc = ostid_to_fid(&info->lti_fid, &objs[j].l_ost_oi,
+                                         idx);
+                       if (rc)
+                               GOTO(out, rc);
+
+                       if (!fid_is_sane(&info->lti_fid)) {
+                               CERROR("%s: sub-object insane fid "DFID"\n",
+                                      lod2obd(d)->obd_name,
+                                      PFID(&info->lti_fid));
+                               GOTO(out, rc = -EINVAL);
+                       }
+
+                       lod_getref(&d->lod_ost_descs);
+
+                       rc = validate_lod_and_idx(d, idx);
+                       if (unlikely(rc)) {
+                               lod_putref(d, &d->lod_ost_descs);
+                               GOTO(out, rc);
+                       }
+
+                       nd = &OST_TGT(d, idx)->ltd_tgt->dd_lu_dev;
+                       lod_putref(d, &d->lod_ost_descs);
+
+                       o = lu_object_find_at(env, nd, &info->lti_fid, NULL);
+                       if (IS_ERR(o))
+                               GOTO(out, rc = PTR_ERR(o));
+
+                       n = lu_object_locate(o->lo_header, nd->ld_type);
+                       if (unlikely(!n)) {
+                               lu_object_put(env, n);
+                               GOTO(out, rc = -ENOENT);
+                       }
+
+                       dto = container_of(n, struct dt_object, do_lu);
+
+                       if (declare) {
+                               rc = lod_sub_declare_destroy(env, dto, th);
+                               dt_object_put(env, dto);
+                               if (rc)
+                                       GOTO(out, rc);
+                       } else {
+                               /**
+                                * collect to-be-destroyed sub objects, the
+                                * reference would be released after actual
+                                * deletion.
+                                */
+                               sub_objs[k] = dto;
+                               k++;
+                       }
+               } /* for each stripe */
+       } /* for each component in the mirror */
+out:
+       if (!declare) {
+               i = 0;
+               if (!rc) {
+                       /* destroy the sub objects */
+                       for (; i < k; i++) {
+                               rc = lod_sub_destroy(env, sub_objs[i], th);
+                               if (rc)
+                                       break;
+                               dt_object_put(env, sub_objs[i]);
+                       }
+               }
+               /**
+                * if a sub object destroy failed, we'd release sub objects
+                * reference get from above sub_objs collection.
+                */
+               for (; i < k; i++)
+                       dt_object_put(env, sub_objs[i]);
+
+               OBD_FREE_PTR_ARRAY(sub_objs, array_count);
+       }
+
+       RETURN(rc);
+}
+
+/**
+ * Purge layouts, delete sub objects in the mirror stored in the vic_buf,
+ * and set the LOVEA with the layout from mbuf.
+ */
+static int lod_declare_layout_purge(const struct lu_env *env,
+               struct dt_object *dt, const struct lu_buf *buf,
+               struct thandle *th)
+{
+       struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
+       int rc;
+
+       ENTRY;
+
+       if (le32_to_cpu(comp_v1->lcm_magic) != LOV_MAGIC_COMP_V1) {
+               CERROR("%s: invalid layout magic %#x != %#x\n",
+                      lod2obd(d)->obd_name, le32_to_cpu(comp_v1->lcm_magic),
+                      LOV_MAGIC_COMP_V1);
+               RETURN(-EINVAL);
+       }
+
+       if (cpu_to_le32(LOV_MAGIC_COMP_V1) != LOV_MAGIC_COMP_V1)
+               lustre_swab_lov_comp_md_v1(comp_v1);
+
+       /* from now on, @buf contains cpu endian data */
+
+       if (comp_v1->lcm_mirror_count != 0) {
+               CERROR("%s: can only purge one mirror from "DFID"\n",
+                      lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu)));
+               RETURN(-EINVAL);
+       }
+
+       /* delcare sub objects deletion in the mirror stored in @buf */
+       rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, true);
+       RETURN(rc);
+}
+
+/* delete sub objects from the mirror stored in @buf */
+static int lod_layout_purge(const struct lu_env *env, struct dt_object *dt,
+                           const struct lu_buf *buf, struct thandle *th)
+{
+       int rc;
+
+       ENTRY;
+       rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, false);
+       RETURN(rc);
+}
+
 /**
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
@@ -3446,7 +3626,8 @@ static int lod_declare_xattr_set(const struct lu_env *env,
 
        mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
        if ((S_ISREG(mode) || mode == 0) &&
-           !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) &&
+           !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT |
+                   LU_XATTR_PURGE)) &&
            (strcmp(name, XATTR_NAME_LOV) == 0 ||
             strcmp(name, XATTR_LUSTRE_LOV) == 0)) {
                /*
@@ -3476,6 +3657,10 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
                        strcmp(name, XATTR_LUSTRE_LOV) == 0);
                rc = lod_declare_layout_split(env, dt, buf, th);
+       } else if (fl & LU_XATTR_PURGE) {
+               LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 ||
+                       strcmp(name, XATTR_LUSTRE_LOV) == 0);
+               rc = lod_declare_layout_purge(env, dt, buf, th);
        } else if (S_ISREG(mode) &&
                   strlen(name) >= sizeof(XATTR_LUSTRE_LOV) + 3 &&
                   allowed_lustre_lov(name)) {
@@ -4599,6 +4784,8 @@ static int lod_xattr_set(const struct lu_env *env,
                        lod_striping_free(env, lod_dt_obj(dt));
 
                        rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
+               } else if (fl & LU_XATTR_PURGE) {
+                       rc = lod_layout_purge(env, dt, buf, th);
                } else if (dt_object_remote(dt)) {
                        /* This only happens during migration, see
                         * mdd_migrate_create(), in which Master MDT will
index 4e9a1a0..0a4cbd4 100644 (file)
@@ -1770,7 +1770,7 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
 {
        struct mdd_device *mdd = mdo2mdd(md_obj);
        struct mdd_object *obj = md2mdd_obj(md_obj);
-       struct mdd_object *vic = md2mdd_obj(mrd->mrd_obj);
+       struct mdd_object *vic = NULL;
        struct lu_buf *buf = &mdd_env_info(env)->mti_buf[0];
        struct lu_buf *buf_save = &mdd_env_info(env)->mti_buf[1];
        struct lu_buf *buf_vic = &mdd_env_info(env)->mti_buf[2];
@@ -1781,22 +1781,34 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
 
        ENTRY;
 
-       rc = lu_fid_cmp(mdd_object_fid(obj), mdd_object_fid(vic));
-       if (rc == 0) /* same fid */
-               RETURN(-EPERM);
+       /**
+        * NULL @mrd_obj means mirror deleting, and use NULL vic to indicate
+        * mirror deleting
+        */
+       if (mrd->mrd_obj)
+               vic = md2mdd_obj(mrd->mrd_obj);
 
-       handle = mdd_trans_create(env, mdd);
-       if (IS_ERR(handle))
-               RETURN(PTR_ERR(handle));
+       if (vic) {
+               /* don't use the same file to save the splitted mirror */
+               rc = lu_fid_cmp(mdd_object_fid(obj), mdd_object_fid(vic));
+               if (rc == 0)
+                       RETURN(-EPERM);
 
-       if (rc > 0) {
-               mdd_write_lock(env, obj, DT_TGT_CHILD);
-               mdd_write_lock(env, vic, DT_TGT_CHILD);
+               if (rc > 0) {
+                       mdd_write_lock(env, obj, DT_TGT_CHILD);
+                       mdd_write_lock(env, vic, DT_TGT_CHILD);
+               } else {
+                       mdd_write_lock(env, vic, DT_TGT_CHILD);
+                       mdd_write_lock(env, obj, DT_TGT_CHILD);
+               }
        } else {
-               mdd_write_lock(env, vic, DT_TGT_CHILD);
                mdd_write_lock(env, obj, DT_TGT_CHILD);
        }
 
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               GOTO(unlock, rc = PTR_ERR(handle));
+
        /* get EA of mirrored file */
        memset(buf_save, 0, sizeof(*buf));
        rc = mdd_stripe_get(env, obj, buf_save, XATTR_NAME_LOV);
@@ -1809,60 +1821,105 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
 
        /**
         * Extract the mirror with specified mirror id, and store the splitted
-        * mirror layout to the victim file.
+        * mirror layout to the victim buffer.
         */
        memset(buf, 0, sizeof(*buf));
        memset(buf_vic, 0, sizeof(*buf_vic));
        rc = mdd_split_ea(lcm, mrd->mrd_mirror_id, buf, buf_vic);
        if (rc < 0)
                GOTO(out, rc);
+       /**
+        * @buf stores layout w/o the specified mirror, @buf_vic stores the
+        * splitted mirror
+        */
 
        dom_stripe = mdd_lmm_dom_size(buf_vic->lb_buf) > 0;
 
-       rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
-                                  LU_XATTR_SPLIT, handle);
-       if (rc)
-               GOTO(out, rc);
-       rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic, XATTR_NAME_LOV,
-                                  LU_XATTR_SPLIT, handle);
-       if (rc)
-               GOTO(out, rc);
+       if (vic) {
+               /**
+                * non delete mirror split
+                *
+                * declare obj set remaining layout in @buf, will set obj's
+                * in-memory layout
+                */
+               rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
+                                          LU_XATTR_SPLIT, handle);
+               if (rc)
+                       GOTO(out_restore, rc);
+
+               /* declare vic set splitted layout in @buf_vic */
+               rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic,
+                                          XATTR_NAME_LOV, LU_XATTR_SPLIT,
+                                          handle);
+               if (rc)
+                       GOTO(out_restore, rc);
+       } else {
+               /**
+                * declare delete mirror objects in @buf_vic, will change obj's
+                * in-memory layout
+                */
+               rc = mdd_declare_xattr_set(env, mdd, obj, buf_vic,
+                                          XATTR_NAME_LOV, LU_XATTR_PURGE,
+                                          handle);
+               if (rc)
+                       GOTO(out_restore, rc);
+
+               /* declare obj set remaining layout in @buf */
+               rc = mdd_declare_xattr_set(env, mdd, obj, buf,
+                                          XATTR_NAME_LOV, LU_XATTR_SPLIT,
+                                          handle);
+               if (rc)
+                       GOTO(out_restore, rc);
+       }
 
        rc = mdd_trans_start(env, mdd, handle);
        if (rc)
-               GOTO(out, rc);
+               GOTO(out_restore, rc);
 
+       /* set obj's layout in @buf */
        rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE,
                           handle);
        if (rc)
-               GOTO(out, rc);
-
-       rc = mdo_xattr_set(env, vic, buf_vic, XATTR_NAME_LOV, LU_XATTR_CREATE,
-                          handle);
-       if (rc)
                GOTO(out_restore, rc);
 
+       if (vic) {
+               /* set vic's layout in @buf_vic */
+               rc = mdo_xattr_set(env, vic, buf_vic, XATTR_NAME_LOV,
+                                  LU_XATTR_CREATE, handle);
+               if (rc)
+                       GOTO(out_restore, rc);
+       } else {
+               /* delete mirror objects */
+               rc = mdo_xattr_set(env, obj, buf_vic, XATTR_NAME_LOV,
+                                  LU_XATTR_PURGE, handle);
+               if (rc)
+                       GOTO(out_restore, rc);
+       }
+
        rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle,
                                      NULL);
        if (rc)
                GOTO(out, rc);
 
-       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic, handle,
-                                     NULL);
-       if (rc)
-               GOTO(out, rc);
-       EXIT;
+       if (vic) {
+               rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic,
+                                             handle, NULL);
+               if (rc)
+                       GOTO(out, rc);
+       }
 
 out_restore:
        if (rc) {
-               /* restore obj's layout */
+               /* restore obj's in-memory and on-disk layout */
                int rc2 = mdo_xattr_set(env, obj, buf_save, XATTR_NAME_LOV,
                                        LU_XATTR_REPLACE, handle);
                if (rc2)
-                       CERROR("%s: failed rollback "DFID" layout: file state unkonwn: rc = %d\n",
+                       CERROR("%s: failed rollback "DFID
+                              " layout: file state unknown: rc = %d\n",
                               mdd_obj_dev_name(obj),
-                              PFID(mdd_object_fid(obj)), rc2);
+                              PFID(mdd_object_fid(obj)), rc);
        }
+
 out:
        rc = mdd_trans_stop(env, mdd, rc, handle);
 
@@ -1870,8 +1927,10 @@ out:
        if (!rc && dom_stripe)
                mdd_dom_data_truncate(env, mdd, obj);
 
+unlock:
        mdd_write_unlock(env, obj);
-       mdd_write_unlock(env, vic);
+       if (vic)
+               mdd_write_unlock(env, vic);
        lu_buf_free(buf_save);
        lu_buf_free(buf);
        lu_buf_free(buf_vic);
@@ -1931,15 +1990,16 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                if (buf->lb_len != sizeof(*mrd))
                        RETURN(-EINVAL);
 
-               rc = mdd_layout_merge_allowed(env, obj, victim);
-               if (rc)
-                       RETURN(rc);
 
-               if (fl == LU_XATTR_MERGE)
+               if (fl == LU_XATTR_MERGE) {
+                       rc = mdd_layout_merge_allowed(env, obj, victim);
+                       if (rc)
+                               RETURN(rc);
                        /* merge layout of victim as a mirror of obj's. */
                        rc = mdd_xattr_merge(env, obj, victim);
-               else
+               } else {
                        rc = mdd_xattr_split(env, obj, mrd);
+               }
                RETURN(rc);
        }
 
index b839eaa..040d261 100644 (file)
@@ -2038,7 +2038,7 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
        struct ldlm_lock        *lease;
        struct mdt_object       *o1 = o, *o2 = NULL;
        bool                     lease_broken;
-       bool                     swap_objects;
+       bool                     swap_objects = false;
        int                      rc;
        ENTRY;
 
@@ -2056,38 +2056,52 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
                RETURN(-EINVAL);
 
        rc = lu_fid_cmp(&data->cd_fid, mdt_object_fid(o));
-       if (unlikely(rc == 0))
-               RETURN(-EINVAL);
+       if (rc == 0) {
+               /**
+                * only MDS_CLOSE_LAYOUT_SPLIT use the same fid to indicate
+                * mirror deletion, so we'd zero cd_fid, and keeps o2 be NULL.
+                */
+               if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT))
+                       RETURN(-EINVAL);
 
-       /* Exchange o1 and o2, to enforce locking order */
-       swap_objects = (rc < 0);
+               /* zero cd_fid to keeps o2 be NULL */
+               fid_zero(&data->cd_fid);
+       } else if (rc < 0) {
+               /* Exchange o1 and o2, to enforce locking order */
+               swap_objects = true;
+       }
 
        lease = ldlm_handle2lock(&data->cd_handle);
        if (lease == NULL)
                RETURN(-ESTALE);
 
-       o2 = mdt_object_find(info->mti_env, info->mti_mdt, &data->cd_fid);
-       if (IS_ERR(o2))
-               GOTO(out_lease, rc = PTR_ERR(o2));
+       if (!fid_is_zero(&data->cd_fid)) {
+               o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+                                    &data->cd_fid);
+               if (IS_ERR(o2))
+                       GOTO(out_lease, rc = PTR_ERR(o2));
 
-       if (!mdt_object_exists(o2))
-               GOTO(out_obj, rc = -ENOENT);
+               if (!mdt_object_exists(o2))
+                       GOTO(out_obj, rc = -ENOENT);
 
-       if (!S_ISREG(lu_object_attr(&o2->mot_obj)))
-               GOTO(out_obj, rc = -EINVAL);
+               if (!S_ISREG(lu_object_attr(&o2->mot_obj)))
+                       GOTO(out_obj, rc = -EINVAL);
 
-       if (swap_objects)
-               swap(o1, o2);
+               if (swap_objects)
+                       swap(o1, o2);
+       }
 
        rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
                           MAY_WRITE);
        if (rc < 0)
                GOTO(out_obj, rc);
 
-       rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
-                          MAY_WRITE);
-       if (rc < 0)
-               GOTO(out_obj, rc);
+       if (o2) {
+               rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2),
+                                  NULL, MAY_WRITE);
+               if (rc < 0)
+                       GOTO(out_obj, rc);
+       }
 
        /* try to hold open_sem so that nobody else can open the file */
        if (!down_write_trylock(&o->mot_open_sem)) {
@@ -2117,11 +2131,13 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
        if (rc < 0)
                GOTO(out_unlock_sem, rc);
 
-       mdt_lock_reg_init(lh2, LCK_EX);
-       rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
-                            MDS_INODELOCK_XATTR);
-       if (rc < 0)
-               GOTO(out_unlock1, rc);
+       if (o2) {
+               mdt_lock_reg_init(lh2, LCK_EX);
+               rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
+                                    MDS_INODELOCK_XATTR);
+               if (rc < 0)
+                       GOTO(out_unlock1, rc);
+       }
 
        /* Swap layout with orphan object */
        if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SWAP) {
@@ -2132,7 +2148,21 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
                struct lu_buf *buf = &info->mti_buf;
                struct md_rejig_data mrd;
 
-               mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
+               if (o2) {
+                       mrd.mrd_obj = mdt_object_child(o == o1 ? o2 : o1);
+               } else {
+                       if (!(ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)) {
+                               /* paranoid check again */
+                               CERROR(DFID
+                                 ":only mirror split support NULL o2 object\n",
+                                       PFID(mdt_object_fid(o)));
+                               GOTO(out_unlock1, rc = -EINVAL);
+                       }
+
+                       /* set NULL mrd_obj for deleting mirror objects */
+                       mrd.mrd_obj = NULL;
+               }
+
                if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)
                        mrd.mrd_mirror_id = data->cd_mirror_id;
 
@@ -2164,7 +2194,8 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
 
 out_unlock2:
        /* Release exclusive LL */
-       mdt_object_unlock(info, o2, lh2, 1);
+       if (o2)
+               mdt_object_unlock(info, o2, lh2, 1);
 
 out_unlock1:
        mdt_object_unlock(info, o1, lh1, 1);
@@ -2182,9 +2213,12 @@ out_unlock_sem:
        }
 
 out_obj:
-       /* Callee takes care of o, we must put the other one. We know
-        * that o1 != o2 from check of lu_fid_cmp() above. */
-       mdt_object_put(info->mti_env, o1 != o ? o1 : o2);
+       if (o1 != o)
+               /* the 2nd object has been used, and swapped to o1 */
+               mdt_object_put(info->mti_env, o1);
+       else if (o2)
+               /* the 2nd object has been used, and not swapped */
+               mdt_object_put(info->mti_env, o2);
 
        ldlm_reprocess_all(lease->l_resource, lease);
 
index f9ca763..2ad1dc0 100644 (file)
@@ -2206,8 +2206,16 @@ static int mirror_split(const char *fname, __u32 id, const char *pool,
        __u32 mirror_id;
        int mdt_index;
        int fd, fdv;
+       bool purge = true; /* delete mirror by setting fdv=fd */
        int rc;
 
+       if (victim_file && (strcmp(fname, victim_file) == 0)) {
+               fprintf(stderr,
+                       "error %s: the source file '%s' and -f file are the same\n",
+                       progname, fname);
+               return -EINVAL;
+       }
+
        /* check fname contains mirror with mirror_id/comp_id */
        layout = llapi_layout_get_by_path(fname, 0);
        if (!layout) {
@@ -2310,6 +2318,7 @@ static int mirror_split(const char *fname, __u32 id, const char *pool,
                goto close_fd;
        }
 
+again:
        if (!victim_file) {
                /* use a temp file to store the splitted layout */
                if (mflags & MF_DESTROY) {
@@ -2321,8 +2330,17 @@ static int mirror_split(const char *fname, __u32 id, const char *pool,
                                goto close_fd;
                        }
 
-                       fdv = llapi_create_volatile_idx(parent, mdt_index,
-                                                       O_LOV_DELAY_CREATE);
+                       if (purge) {
+                               /* don't use volatile file for mirror destroy */
+                               fdv = fd;
+                       } else {
+                               /**
+                                * try the old way to delete mirror using
+                                * volatile file.
+                                */
+                               fdv = llapi_create_volatile_idx(parent,
+                                               mdt_index, O_LOV_DELAY_CREATE);
+                       }
                } else {
                        snprintf(victim, sizeof(victim), "%s.mirror~%u",
                                 fname, mirror_id);
@@ -2363,6 +2381,12 @@ static int mirror_split(const char *fname, __u32 id, const char *pool,
        data->lil_ids[1] = mirror_id;
        rc = llapi_lease_set(fd, data);
        if (rc <= 0) {
+               if (rc == -EINVAL && purge) {
+                       /* could be old MDS which prohibit fd==fdv */
+                       purge = false;
+                       goto again;
+
+               }
                if (rc == 0) /* lost lease lock */
                        rc = -EBUSY;
                fprintf(stderr,
@@ -2374,7 +2398,8 @@ static int mirror_split(const char *fname, __u32 id, const char *pool,
        free(data);
 
 close_victim:
-       close(fdv);
+       if (!purge)
+               close(fdv);
 close_fd:
        close(fd);
 free_layout:
@@ -4313,6 +4338,12 @@ static int lfs_setstripe_internal(int argc, char **argv,
                                comp_id = mirror_id;
                        else
                                mirror_flags |= MF_COMP_ID;
+                       if (has_m_file && !strcmp(fname, mirror_list->m_file)) {
+                               fprintf(stderr,
+                                       "%s: the file specified by -f cannot be same as the source file '%s'\n",
+                                       progname, fname);
+                               goto usage_error;
+                       }
                        result = mirror_split(fname, comp_id, lsa.lsa_pool_name,
                                              mirror_flags,
                                              has_m_file ? mirror_list->m_file :