Whamcloud - gitweb
LU-14579 flr: mirror unlink and split race 69/43369/5
authorBobi Jam <bobijam@whamcloud.com>
Fri, 2 Apr 2021 04:47:32 +0000 (12:47 +0800)
committerOleg Drokin <green@whamcloud.com>
Wed, 28 Apr 2021 02:11:26 +0000 (02:11 +0000)
- protect lod_object::ldo_comp_entries during
  lod_obj_for_each_stripe(), since other thread could change the
  ldo_comp_entries at the same time.

- protect LOD in-memory layout during layout change
  layout_{add|set|del} and purge_mirror.

- fix lock-tx order in mdd_unlink: start the transaction and then
  take locks. (introduced in commit
  55d5235354d49aee0a330ad64beef4ed9004a27f)

- Add test case for mirror split and unlink race.

Fixes: 55d5235354 ("LU-14579 flr: GPF in lod_sub_declare_destroy")
Signed-off-by: Bobi Jam <bobijam@whamcloud.com>
Change-Id: Ic54245c8755f660087fce46d1cad0ef7fa091245
Reviewed-on: https://review.whamcloud.com/43369
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_object.c
lustre/tests/sanity-flr.sh

index a00dc3b..f4829a3 100644 (file)
@@ -1176,10 +1176,10 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
                            struct lod_obj_stripe_cb_data *data)
 {
        struct lod_layout_component *lod_comp;
                            struct lod_obj_stripe_cb_data *data)
 {
        struct lod_layout_component *lod_comp;
-       int i, j, rc;
+       int i, j, rc = 0;
        ENTRY;
 
        ENTRY;
 
-       LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL);
+       mutex_lock(&lo->ldo_layout_mutex);
        for (i = 0; i < lo->ldo_comp_cnt; i++) {
                lod_comp = &lo->ldo_comp_entries[i];
 
        for (i = 0; i < lo->ldo_comp_cnt; i++) {
                lod_comp = &lo->ldo_comp_entries[i];
 
@@ -1199,7 +1199,7 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
                if (data->locd_comp_cb) {
                        rc = data->locd_comp_cb(env, lo, i, data);
                        if (rc)
                if (data->locd_comp_cb) {
                        rc = data->locd_comp_cb(env, lo, i, data);
                        if (rc)
-                               RETURN(rc);
+                               GOTO(unlock, rc);
                }
 
                /* could used just to do sth about component, not each
                }
 
                /* could used just to do sth about component, not each
@@ -1216,10 +1216,12 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
                                continue;
                        rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data);
                        if (rc != 0)
                                continue;
                        rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data);
                        if (rc != 0)
-                               RETURN(rc);
+                               GOTO(unlock, rc);
                }
        }
                }
        }
-       RETURN(0);
+unlock:
+       mutex_unlock(&lo->ldo_layout_mutex);
+       RETURN(rc);
 }
 
 static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env,
 }
 
 static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env,
@@ -2747,10 +2749,15 @@ static int lod_declare_layout_add(const struct lu_env *env,
        if (magic != LOV_USER_MAGIC_COMP_V1)
                RETURN(-EINVAL);
 
        if (magic != LOV_USER_MAGIC_COMP_V1)
                RETURN(-EINVAL);
 
+       mutex_lock(&lo->ldo_layout_mutex);
+
        array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count;
        OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt);
        array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count;
        OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt);
-       if (comp_array == NULL)
+       if (comp_array == NULL) {
+               mutex_unlock(&lo->ldo_layout_mutex);
                RETURN(-ENOMEM);
                RETURN(-ENOMEM);
+       }
+
 
        memcpy(comp_array, lo->ldo_comp_entries,
               sizeof(*comp_array) * lo->ldo_comp_cnt);
 
        memcpy(comp_array, lo->ldo_comp_entries,
               sizeof(*comp_array) * lo->ldo_comp_cnt);
@@ -2807,6 +2814,8 @@ static int lod_declare_layout_add(const struct lu_env *env,
        LASSERT(lo->ldo_mirror_count == 1);
        lo->ldo_mirrors[0].lme_end = array_cnt - 1;
 
        LASSERT(lo->ldo_mirror_count == 1);
        lo->ldo_mirrors[0].lme_end = array_cnt - 1;
 
+       mutex_unlock(&lo->ldo_layout_mutex);
+
        RETURN(0);
 
 error:
        RETURN(0);
 
 error:
@@ -2819,6 +2828,8 @@ error:
                }
        }
        OBD_FREE_PTR_ARRAY(comp_array, array_cnt);
                }
        }
        OBD_FREE_PTR_ARRAY(comp_array, array_cnt);
+       mutex_unlock(&lo->ldo_layout_mutex);
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -2914,6 +2925,7 @@ static int lod_declare_layout_set(const struct lu_env *env,
                RETURN(-EINVAL);
        }
 
                RETURN(-EINVAL);
        }
 
+       mutex_lock(&lo->ldo_layout_mutex);
        for (i = 0; i < comp_v1->lcm_entry_count; i++) {
                __u32 id = comp_v1->lcm_entries[i].lcme_id;
                __u32 flags = comp_v1->lcm_entries[i].lcme_flags;
        for (i = 0; i < comp_v1->lcm_entry_count; i++) {
                __u32 id = comp_v1->lcm_entries[i].lcme_id;
                __u32 flags = comp_v1->lcm_entries[i].lcme_flags;
@@ -2923,7 +2935,8 @@ static int lod_declare_layout_set(const struct lu_env *env,
 
                if (flags & LCME_FL_INIT) {
                        if (changed)
 
                if (flags & LCME_FL_INIT) {
                        if (changed)
-                               lod_striping_free(env, lo);
+                               lod_striping_free_nolock(env, lo);
+                       mutex_unlock(&lo->ldo_layout_mutex);
                        RETURN(-EINVAL);
                }
 
                        RETURN(-EINVAL);
                }
 
@@ -2946,8 +2959,11 @@ static int lod_declare_layout_set(const struct lu_env *env,
                                if (flags) {
                                        if ((flags & LCME_FL_STALE) &&
                                            lod_last_non_stale_mirror(mirror_id,
                                if (flags) {
                                        if ((flags & LCME_FL_STALE) &&
                                            lod_last_non_stale_mirror(mirror_id,
-                                                                     lo))
+                                                                     lo)) {
+                                               mutex_unlock(
+                                                       &lo->ldo_layout_mutex);
                                                RETURN(-EUCLEAN);
                                                RETURN(-EUCLEAN);
+                                       }
                                        lod_comp->llc_flags |= flags;
                                }
                                if (mirror_flag) {
                                        lod_comp->llc_flags |= flags;
                                }
                                if (mirror_flag) {
@@ -2960,6 +2976,7 @@ static int lod_declare_layout_set(const struct lu_env *env,
                        changed = true;
                }
        }
                        changed = true;
                }
        }
+       mutex_unlock(&lo->ldo_layout_mutex);
 
        if (!changed) {
                CDEBUG(D_LAYOUT, "%s: requested component(s) not found.\n",
 
        if (!changed) {
                CDEBUG(D_LAYOUT, "%s: requested component(s) not found.\n",
@@ -3042,9 +3059,13 @@ static int lod_declare_layout_del(const struct lu_env *env,
                flags = 0;
        }
 
                flags = 0;
        }
 
+       mutex_lock(&lo->ldo_layout_mutex);
+
        left = lo->ldo_comp_cnt;
        left = lo->ldo_comp_cnt;
-       if (left <= 0)
+       if (left <= 0) {
+               mutex_unlock(&lo->ldo_layout_mutex);
                RETURN(-EINVAL);
                RETURN(-EINVAL);
+       }
 
        for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) {
                struct lod_layout_component *lod_comp;
 
        for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) {
                struct lod_layout_component *lod_comp;
@@ -3061,6 +3082,7 @@ static int lod_declare_layout_del(const struct lu_env *env,
                if (left != (i + 1)) {
                        CDEBUG(D_LAYOUT, "%s: this deletion will create "
                               "a hole.\n", lod2obd(d)->obd_name);
                if (left != (i + 1)) {
                        CDEBUG(D_LAYOUT, "%s: this deletion will create "
                               "a hole.\n", lod2obd(d)->obd_name);
+                       mutex_unlock(&lo->ldo_layout_mutex);
                        RETURN(-EINVAL);
                }
                left--;
                        RETURN(-EINVAL);
                }
                left--;
@@ -3079,8 +3101,10 @@ static int lod_declare_layout_del(const struct lu_env *env,
                        if (obj == NULL)
                                continue;
                        rc = lod_sub_declare_destroy(env, obj, th);
                        if (obj == NULL)
                                continue;
                        rc = lod_sub_declare_destroy(env, obj, th);
-                       if (rc)
+                       if (rc) {
+                               mutex_unlock(&lo->ldo_layout_mutex);
                                RETURN(rc);
                                RETURN(rc);
+                       }
                }
        }
 
                }
        }
 
@@ -3088,9 +3112,12 @@ static int lod_declare_layout_del(const struct lu_env *env,
        if (left == lo->ldo_comp_cnt) {
                CDEBUG(D_LAYOUT, "%s: requested component id:%#x not found\n",
                       lod2obd(d)->obd_name, id);
        if (left == lo->ldo_comp_cnt) {
                CDEBUG(D_LAYOUT, "%s: requested component id:%#x not found\n",
                       lod2obd(d)->obd_name, id);
+               mutex_unlock(&lo->ldo_layout_mutex);
                RETURN(-EINVAL);
        }
 
                RETURN(-EINVAL);
        }
 
+       mutex_unlock(&lo->ldo_layout_mutex);
+
        memset(attr, 0, sizeof(*attr));
        attr->la_valid = LA_SIZE;
        rc = lod_sub_declare_attr_set(env, next, attr, th);
        memset(attr, 0, sizeof(*attr));
        attr->la_valid = LA_SIZE;
        rc = lod_sub_declare_attr_set(env, next, attr, th);
@@ -3428,6 +3455,7 @@ static int lod_layout_declare_or_purge_mirror(const struct lu_env *env,
 {
        struct lod_thread_info *info = lod_env_info(env);
        struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
 {
        struct lod_thread_info *info = lod_env_info(env);
        struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lod_object *lo = lod_dt_obj(dt);
        struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
        struct lov_comp_md_entry_v1 *entry;
        struct lov_mds_md_v1 *lmm;
        struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
        struct lov_comp_md_entry_v1 *entry;
        struct lov_mds_md_v1 *lmm;
@@ -3436,6 +3464,12 @@ static int lod_layout_declare_or_purge_mirror(const struct lu_env *env,
 
        ENTRY;
 
 
        ENTRY;
 
+       /**
+        * other ops (like lod_declare_destroy) could destroying sub objects
+        * as well.
+        */
+       mutex_lock(&lo->ldo_layout_mutex);
+
        if (!declare) {
                /* prepare sub-objects array */
                for (i = 0; i < comp_v1->lcm_entry_count; i++) {
        if (!declare) {
                /* prepare sub-objects array */
                for (i = 0; i < comp_v1->lcm_entry_count; i++) {
@@ -3449,8 +3483,10 @@ static int lod_layout_declare_or_purge_mirror(const struct lu_env *env,
                        array_count += lmm->lmm_stripe_count;
                }
                OBD_ALLOC_PTR_ARRAY(sub_objs, array_count);
                        array_count += lmm->lmm_stripe_count;
                }
                OBD_ALLOC_PTR_ARRAY(sub_objs, array_count);
-               if (sub_objs == NULL)
+               if (sub_objs == NULL) {
+                       mutex_unlock(&lo->ldo_layout_mutex);
                        RETURN(-ENOMEM);
                        RETURN(-ENOMEM);
+               }
        }
 
        k = 0;  /* sub_objs index */
        }
 
        k = 0;  /* sub_objs index */
@@ -3550,6 +3586,7 @@ out:
 
                OBD_FREE_PTR_ARRAY(sub_objs, array_count);
        }
 
                OBD_FREE_PTR_ARRAY(sub_objs, array_count);
        }
+       mutex_unlock(&lo->ldo_layout_mutex);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -4335,7 +4372,7 @@ static int lod_generate_and_set_lovea(const struct lu_env *env,
        LASSERT(lo);
 
        if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) {
        LASSERT(lo);
 
        if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) {
-               lod_striping_free(env, lo);
+               lod_striping_free_nolock(env, lo);
                rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th);
                RETURN(rc);
        }
                rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th);
                RETURN(rc);
        }
@@ -4631,6 +4668,8 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt,
 
        LASSERT(lo->ldo_mirror_count == 1);
 
 
        LASSERT(lo->ldo_mirror_count == 1);
 
+       mutex_lock(&lo->ldo_layout_mutex);
+
        rc = lod_layout_del_prep_layout(env, lo, th);
        if (rc < 0)
                GOTO(out, rc);
        rc = lod_layout_del_prep_layout(env, lo, th);
        if (rc < 0)
                GOTO(out, rc);
@@ -4658,7 +4697,10 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt,
        EXIT;
 out:
        if (rc)
        EXIT;
 out:
        if (rc)
-               lod_striping_free(env, lo);
+               lod_striping_free_nolock(env, lo);
+
+       mutex_unlock(&lo->ldo_layout_mutex);
+
        return rc;
 }
 
        return rc;
 }
 
@@ -5928,6 +5970,8 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt,
        int     rc = 0, i, j;
        ENTRY;
 
        int     rc = 0, i, j;
        ENTRY;
 
+       mutex_lock(&lo->ldo_layout_mutex);
+
        LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) ||
                lo->ldo_is_foreign);
 
        LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) ||
                lo->ldo_is_foreign);
 
@@ -5986,15 +6030,20 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt,
        if (rc)
                GOTO(out, rc);
 
        if (rc)
                GOTO(out, rc);
 
+       lo->ldo_comp_cached = 1;
+
        rc = lod_generate_and_set_lovea(env, lo, th);
        if (rc)
                GOTO(out, rc);
 
        rc = lod_generate_and_set_lovea(env, lo, th);
        if (rc)
                GOTO(out, rc);
 
-       lo->ldo_comp_cached = 1;
+       mutex_unlock(&lo->ldo_layout_mutex);
+
        RETURN(0);
 
 out:
        RETURN(0);
 
 out:
-       lod_striping_free(env, lo);
+       lod_striping_free_nolock(env, lo);
+       mutex_unlock(&lo->ldo_layout_mutex);
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -6054,8 +6103,9 @@ lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo,
 {
        if (data->locd_declare)
                return lod_sub_declare_destroy(env, dt, th);
 {
        if (data->locd_declare)
                return lod_sub_declare_destroy(env, dt, th);
-       else if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
-                stripe_idx == cfs_fail_val)
+
+       if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
+           stripe_idx == cfs_fail_val)
                return lod_sub_destroy(env, dt, th);
 
        return 0;
                return lod_sub_destroy(env, dt, th);
 
        return 0;
index a81de64..2f69cc7 100644 (file)
@@ -1739,17 +1739,17 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
        if (IS_ERR(handle))
                RETURN(PTR_ERR(handle));
 
        if (IS_ERR(handle))
                RETURN(PTR_ERR(handle));
 
-       if (likely(mdd_cobj != NULL))
-               mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
-
        rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
                                lname, ma, handle, no_name, is_dir);
        if (rc)
        rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
                                lname, ma, handle, no_name, is_dir);
        if (rc)
-               GOTO(cleanup, rc);
+               GOTO(stop, rc);
 
        rc = mdd_trans_start(env, mdd, handle);
        if (rc)
 
        rc = mdd_trans_start(env, mdd, handle);
        if (rc)
-               GOTO(cleanup, rc);
+               GOTO(stop, rc);
+
+       if (likely(mdd_cobj != NULL))
+               mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
 
        if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
                rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
 
        if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
                rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
@@ -1840,6 +1840,7 @@ cleanup:
                        lname, NULL, handle);
        }
 
                        lname, NULL, handle);
        }
 
+stop:
        rc = mdd_trans_stop(env, mdd, rc, handle);
 
        return rc;
        rc = mdd_trans_stop(env, mdd, rc, handle);
 
        return rc;
index a4ed6b1..1553946 100644 (file)
@@ -1787,36 +1787,19 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
        if (mrd->mrd_obj)
                vic = md2mdd_obj(mrd->mrd_obj);
 
        if (mrd->mrd_obj)
                vic = md2mdd_obj(mrd->mrd_obj);
 
-       if (vic) {
-               /* don't use the same file to save the splitted mirror */
-               rc = lu_fid_cmp(mdd_object_fid(obj), mdd_object_fid(vic));
-               if (rc == 0)
-                       RETURN(-EPERM);
-
-               if (rc > 0) {
-                       mdd_write_lock(env, obj, DT_TGT_CHILD);
-                       mdd_write_lock(env, vic, DT_TGT_CHILD);
-               } else {
-                       mdd_write_lock(env, vic, DT_TGT_CHILD);
-                       mdd_write_lock(env, obj, DT_TGT_CHILD);
-               }
-       } else {
-               mdd_write_lock(env, obj, DT_TGT_CHILD);
-       }
-
        handle = mdd_trans_create(env, mdd);
        if (IS_ERR(handle))
        handle = mdd_trans_create(env, mdd);
        if (IS_ERR(handle))
-               GOTO(unlock, rc = PTR_ERR(handle));
+               RETURN(PTR_ERR(handle));
 
        /* get EA of mirrored file */
        memset(buf_save, 0, sizeof(*buf));
        rc = mdd_stripe_get(env, obj, buf_save, XATTR_NAME_LOV);
        if (rc < 0)
 
        /* get EA of mirrored file */
        memset(buf_save, 0, sizeof(*buf));
        rc = mdd_stripe_get(env, obj, buf_save, XATTR_NAME_LOV);
        if (rc < 0)
-               GOTO(out, rc);
+               GOTO(stop, rc);
 
        lcm = buf_save->lb_buf;
        if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
 
        lcm = buf_save->lb_buf;
        if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
-               GOTO(out, rc = -EINVAL);
+               GOTO(stop, rc = -EINVAL);
 
        /**
         * Extract the mirror with specified mirror id, and store the splitted
 
        /**
         * Extract the mirror with specified mirror id, and store the splitted
@@ -1826,7 +1809,7 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
        memset(buf_vic, 0, sizeof(*buf_vic));
        rc = mdd_split_ea(lcm, mrd->mrd_mirror_id, buf, buf_vic);
        if (rc < 0)
        memset(buf_vic, 0, sizeof(*buf_vic));
        rc = mdd_split_ea(lcm, mrd->mrd_mirror_id, buf, buf_vic);
        if (rc < 0)
-               GOTO(out, rc);
+               GOTO(stop, rc);
        /**
         * @buf stores layout w/o the specified mirror, @buf_vic stores the
         * splitted mirror
        /**
         * @buf stores layout w/o the specified mirror, @buf_vic stores the
         * splitted mirror
@@ -1844,14 +1827,14 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
                rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
                                           LU_XATTR_SPLIT, handle);
                if (rc)
                rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
                                           LU_XATTR_SPLIT, handle);
                if (rc)
-                       GOTO(out_restore, rc);
+                       GOTO(stop, rc);
 
                /* declare vic set splitted layout in @buf_vic */
                rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic,
                                           XATTR_NAME_LOV, LU_XATTR_SPLIT,
                                           handle);
                if (rc)
 
                /* declare vic set splitted layout in @buf_vic */
                rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic,
                                           XATTR_NAME_LOV, LU_XATTR_SPLIT,
                                           handle);
                if (rc)
-                       GOTO(out_restore, rc);
+                       GOTO(stop, rc);
        } else {
                /**
                 * declare delete mirror objects in @buf_vic, will change obj's
        } else {
                /**
                 * declare delete mirror objects in @buf_vic, will change obj's
@@ -1861,25 +1844,42 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
                                           XATTR_NAME_LOV, LU_XATTR_PURGE,
                                           handle);
                if (rc)
                                           XATTR_NAME_LOV, LU_XATTR_PURGE,
                                           handle);
                if (rc)
-                       GOTO(out_restore, rc);
+                       GOTO(stop, rc);
 
                /* declare obj set remaining layout in @buf */
                rc = mdd_declare_xattr_set(env, mdd, obj, buf,
                                           XATTR_NAME_LOV, LU_XATTR_SPLIT,
                                           handle);
                if (rc)
 
                /* declare obj set remaining layout in @buf */
                rc = mdd_declare_xattr_set(env, mdd, obj, buf,
                                           XATTR_NAME_LOV, LU_XATTR_SPLIT,
                                           handle);
                if (rc)
-                       GOTO(out_restore, rc);
+                       GOTO(stop, rc);
        }
 
        rc = mdd_trans_start(env, mdd, handle);
        if (rc)
        }
 
        rc = mdd_trans_start(env, mdd, handle);
        if (rc)
-               GOTO(out_restore, rc);
+               GOTO(stop, rc);
+
+       if (vic) {
+               /* don't use the same file to save the splitted mirror */
+               rc = lu_fid_cmp(mdd_object_fid(obj), mdd_object_fid(vic));
+               if (rc == 0)
+                       GOTO(stop, rc = -EPERM);
+
+               if (rc > 0) {
+                       mdd_write_lock(env, obj, DT_TGT_CHILD);
+                       mdd_write_lock(env, vic, DT_TGT_CHILD);
+               } else {
+                       mdd_write_lock(env, vic, DT_TGT_CHILD);
+                       mdd_write_lock(env, obj, DT_TGT_CHILD);
+               }
+       } else {
+               mdd_write_lock(env, obj, DT_TGT_CHILD);
+       }
 
        /* set obj's layout in @buf */
        rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE,
                           handle);
        if (rc)
 
        /* set obj's layout in @buf */
        rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE,
                           handle);
        if (rc)
-               GOTO(out_restore, rc);
+               GOTO(unlock, rc);
 
        if (vic) {
                /* set vic's layout in @buf_vic */
 
        if (vic) {
                /* set vic's layout in @buf_vic */
@@ -1898,13 +1898,13 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
        rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle,
                                      NULL);
        if (rc)
        rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle,
                                      NULL);
        if (rc)
-               GOTO(out, rc);
+               GOTO(out_restore, rc);
 
        if (vic) {
                rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic,
                                              handle, NULL);
                if (rc)
 
        if (vic) {
                rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic,
                                              handle, NULL);
                if (rc)
-                       GOTO(out, rc);
+                       GOTO(out_restore, rc);
        }
 
 out_restore:
        }
 
 out_restore:
@@ -1919,17 +1919,17 @@ out_restore:
                               PFID(mdd_object_fid(obj)), rc);
        }
 
                               PFID(mdd_object_fid(obj)), rc);
        }
 
-out:
+unlock:
+       mdd_write_unlock(env, obj);
+       if (vic)
+               mdd_write_unlock(env, vic);
+stop:
        rc = mdd_trans_stop(env, mdd, rc, handle);
 
        /* Truncate local DOM data if all went well */
        if (!rc && dom_stripe)
                mdd_dom_data_truncate(env, mdd, obj);
 
        rc = mdd_trans_stop(env, mdd, rc, handle);
 
        /* Truncate local DOM data if all went well */
        if (!rc && dom_stripe)
                mdd_dom_data_truncate(env, mdd, obj);
 
-unlock:
-       mdd_write_unlock(env, obj);
-       if (vic)
-               mdd_write_unlock(env, vic);
        lu_buf_free(buf_save);
        lu_buf_free(buf);
        lu_buf_free(buf_vic);
        lu_buf_free(buf_save);
        lu_buf_free(buf);
        lu_buf_free(buf_vic);
index 531b5a0..671be84 100644 (file)
@@ -2597,6 +2597,35 @@ test_60a() {
 }
 run_test 60a "mirror extend sets correct size on sparse file"
 
 }
 run_test 60a "mirror extend sets correct size on sparse file"
 
+test_70() {
+       local tf=$DIR/$tdir/$tfile
+
+       test_mkdir $DIR/$tdir
+
+       while true; do
+               rm -f $tf
+               $LFS mirror create -N -E 1M -c -1 -E eof -N $tf
+               echo xxxx > $tf
+       done &
+       c_pid=$!
+       echo "mirror create pid $c_pid"
+
+       while true; do
+               $LFS mirror split -d --mirror-id=1 $tf &> /dev/null
+       done &
+       s_pid=$!
+       echo "mirror split pid $s_pid"
+
+       echo "mirror create and split race for 60 seconds, should not crash"
+       sleep 60
+       kill -9 $c_pid &> /dev/null
+       kill -9 $s_pid &> /dev/null
+
+       rm -f $tf
+       true
+}
+run_test 70 "mirror create and split race"
+
 ctrl_file=$(mktemp /tmp/CTRL.XXXXXX)
 lock_file=$(mktemp /var/lock/FLR.XXXXXX)
 
 ctrl_file=$(mktemp /tmp/CTRL.XXXXXX)
 lock_file=$(mktemp /var/lock/FLR.XXXXXX)