- protect lod_object::ldo_comp_entries during
lod_obj_for_each_stripe(), since other thread could change the
ldo_comp_entries at the same time.
- protect LOD in-memory layout during layout change
layout_{add|set|del} and purge_mirror.
- fix lock-tx order in mdd_unlink: start the transaction and then
take locks. (introduced in commit
55d5235354d49aee0a330ad64beef4ed9004a27f)
- Add test case for mirror split and unlink race.
Fixes:
55d5235354 ("LU-14579 flr: GPF in lod_sub_declare_destroy")
Signed-off-by: Bobi Jam <bobijam@whamcloud.com>
Change-Id: Ic54245c8755f660087fce46d1cad0ef7fa091245
Reviewed-on: https://review.whamcloud.com/43369
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
struct lod_obj_stripe_cb_data *data)
{
struct lod_layout_component *lod_comp;
struct lod_obj_stripe_cb_data *data)
{
struct lod_layout_component *lod_comp;
- LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL);
+ mutex_lock(&lo->ldo_layout_mutex);
for (i = 0; i < lo->ldo_comp_cnt; i++) {
lod_comp = &lo->ldo_comp_entries[i];
for (i = 0; i < lo->ldo_comp_cnt; i++) {
lod_comp = &lo->ldo_comp_entries[i];
if (data->locd_comp_cb) {
rc = data->locd_comp_cb(env, lo, i, data);
if (rc)
if (data->locd_comp_cb) {
rc = data->locd_comp_cb(env, lo, i, data);
if (rc)
}
/* could used just to do sth about component, not each
}
/* could used just to do sth about component, not each
continue;
rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data);
if (rc != 0)
continue;
rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data);
if (rc != 0)
+unlock:
+ mutex_unlock(&lo->ldo_layout_mutex);
+ RETURN(rc);
}
static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env,
}
static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env,
if (magic != LOV_USER_MAGIC_COMP_V1)
RETURN(-EINVAL);
if (magic != LOV_USER_MAGIC_COMP_V1)
RETURN(-EINVAL);
+ mutex_lock(&lo->ldo_layout_mutex);
+
array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count;
OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt);
array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count;
OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt);
- if (comp_array == NULL)
+ if (comp_array == NULL) {
+ mutex_unlock(&lo->ldo_layout_mutex);
memcpy(comp_array, lo->ldo_comp_entries,
sizeof(*comp_array) * lo->ldo_comp_cnt);
memcpy(comp_array, lo->ldo_comp_entries,
sizeof(*comp_array) * lo->ldo_comp_cnt);
LASSERT(lo->ldo_mirror_count == 1);
lo->ldo_mirrors[0].lme_end = array_cnt - 1;
LASSERT(lo->ldo_mirror_count == 1);
lo->ldo_mirrors[0].lme_end = array_cnt - 1;
+ mutex_unlock(&lo->ldo_layout_mutex);
+
}
}
OBD_FREE_PTR_ARRAY(comp_array, array_cnt);
}
}
OBD_FREE_PTR_ARRAY(comp_array, array_cnt);
+ mutex_unlock(&lo->ldo_layout_mutex);
+
+ mutex_lock(&lo->ldo_layout_mutex);
for (i = 0; i < comp_v1->lcm_entry_count; i++) {
__u32 id = comp_v1->lcm_entries[i].lcme_id;
__u32 flags = comp_v1->lcm_entries[i].lcme_flags;
for (i = 0; i < comp_v1->lcm_entry_count; i++) {
__u32 id = comp_v1->lcm_entries[i].lcme_id;
__u32 flags = comp_v1->lcm_entries[i].lcme_flags;
if (flags & LCME_FL_INIT) {
if (changed)
if (flags & LCME_FL_INIT) {
if (changed)
- lod_striping_free(env, lo);
+ lod_striping_free_nolock(env, lo);
+ mutex_unlock(&lo->ldo_layout_mutex);
if (flags) {
if ((flags & LCME_FL_STALE) &&
lod_last_non_stale_mirror(mirror_id,
if (flags) {
if ((flags & LCME_FL_STALE) &&
lod_last_non_stale_mirror(mirror_id,
+ lo)) {
+ mutex_unlock(
+ &lo->ldo_layout_mutex);
lod_comp->llc_flags |= flags;
}
if (mirror_flag) {
lod_comp->llc_flags |= flags;
}
if (mirror_flag) {
+ mutex_unlock(&lo->ldo_layout_mutex);
if (!changed) {
CDEBUG(D_LAYOUT, "%s: requested component(s) not found.\n",
if (!changed) {
CDEBUG(D_LAYOUT, "%s: requested component(s) not found.\n",
+ mutex_lock(&lo->ldo_layout_mutex);
+
+ if (left <= 0) {
+ mutex_unlock(&lo->ldo_layout_mutex);
for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) {
struct lod_layout_component *lod_comp;
for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) {
struct lod_layout_component *lod_comp;
if (left != (i + 1)) {
CDEBUG(D_LAYOUT, "%s: this deletion will create "
"a hole.\n", lod2obd(d)->obd_name);
if (left != (i + 1)) {
CDEBUG(D_LAYOUT, "%s: this deletion will create "
"a hole.\n", lod2obd(d)->obd_name);
+ mutex_unlock(&lo->ldo_layout_mutex);
RETURN(-EINVAL);
}
left--;
RETURN(-EINVAL);
}
left--;
if (obj == NULL)
continue;
rc = lod_sub_declare_destroy(env, obj, th);
if (obj == NULL)
continue;
rc = lod_sub_declare_destroy(env, obj, th);
+ if (rc) {
+ mutex_unlock(&lo->ldo_layout_mutex);
if (left == lo->ldo_comp_cnt) {
CDEBUG(D_LAYOUT, "%s: requested component id:%#x not found\n",
lod2obd(d)->obd_name, id);
if (left == lo->ldo_comp_cnt) {
CDEBUG(D_LAYOUT, "%s: requested component id:%#x not found\n",
lod2obd(d)->obd_name, id);
+ mutex_unlock(&lo->ldo_layout_mutex);
+ mutex_unlock(&lo->ldo_layout_mutex);
+
memset(attr, 0, sizeof(*attr));
attr->la_valid = LA_SIZE;
rc = lod_sub_declare_attr_set(env, next, attr, th);
memset(attr, 0, sizeof(*attr));
attr->la_valid = LA_SIZE;
rc = lod_sub_declare_attr_set(env, next, attr, th);
{
struct lod_thread_info *info = lod_env_info(env);
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
{
struct lod_thread_info *info = lod_env_info(env);
struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+ struct lod_object *lo = lod_dt_obj(dt);
struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
struct lov_comp_md_entry_v1 *entry;
struct lov_mds_md_v1 *lmm;
struct lov_comp_md_v1 *comp_v1 = buf->lb_buf;
struct lov_comp_md_entry_v1 *entry;
struct lov_mds_md_v1 *lmm;
+ /**
+ * other ops (like lod_declare_destroy) could destroying sub objects
+ * as well.
+ */
+ mutex_lock(&lo->ldo_layout_mutex);
+
if (!declare) {
/* prepare sub-objects array */
for (i = 0; i < comp_v1->lcm_entry_count; i++) {
if (!declare) {
/* prepare sub-objects array */
for (i = 0; i < comp_v1->lcm_entry_count; i++) {
array_count += lmm->lmm_stripe_count;
}
OBD_ALLOC_PTR_ARRAY(sub_objs, array_count);
array_count += lmm->lmm_stripe_count;
}
OBD_ALLOC_PTR_ARRAY(sub_objs, array_count);
+ if (sub_objs == NULL) {
+ mutex_unlock(&lo->ldo_layout_mutex);
}
k = 0; /* sub_objs index */
}
k = 0; /* sub_objs index */
OBD_FREE_PTR_ARRAY(sub_objs, array_count);
}
OBD_FREE_PTR_ARRAY(sub_objs, array_count);
}
+ mutex_unlock(&lo->ldo_layout_mutex);
LASSERT(lo);
if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) {
LASSERT(lo);
if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) {
- lod_striping_free(env, lo);
+ lod_striping_free_nolock(env, lo);
rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th);
RETURN(rc);
}
rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th);
RETURN(rc);
}
LASSERT(lo->ldo_mirror_count == 1);
LASSERT(lo->ldo_mirror_count == 1);
+ mutex_lock(&lo->ldo_layout_mutex);
+
rc = lod_layout_del_prep_layout(env, lo, th);
if (rc < 0)
GOTO(out, rc);
rc = lod_layout_del_prep_layout(env, lo, th);
if (rc < 0)
GOTO(out, rc);
- lod_striping_free(env, lo);
+ lod_striping_free_nolock(env, lo);
+
+ mutex_unlock(&lo->ldo_layout_mutex);
+
+ mutex_lock(&lo->ldo_layout_mutex);
+
LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) ||
lo->ldo_is_foreign);
LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) ||
lo->ldo_is_foreign);
+ lo->ldo_comp_cached = 1;
+
rc = lod_generate_and_set_lovea(env, lo, th);
if (rc)
GOTO(out, rc);
rc = lod_generate_and_set_lovea(env, lo, th);
if (rc)
GOTO(out, rc);
- lo->ldo_comp_cached = 1;
+ mutex_unlock(&lo->ldo_layout_mutex);
+
- lod_striping_free(env, lo);
+ lod_striping_free_nolock(env, lo);
+ mutex_unlock(&lo->ldo_layout_mutex);
+
{
if (data->locd_declare)
return lod_sub_declare_destroy(env, dt, th);
{
if (data->locd_declare)
return lod_sub_declare_destroy(env, dt, th);
- else if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
- stripe_idx == cfs_fail_val)
+
+ if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
+ stripe_idx == cfs_fail_val)
return lod_sub_destroy(env, dt, th);
return 0;
return lod_sub_destroy(env, dt, th);
return 0;
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
- if (likely(mdd_cobj != NULL))
- mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
-
rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
lname, ma, handle, no_name, is_dir);
if (rc)
rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
lname, ma, handle, no_name, is_dir);
if (rc)
rc = mdd_trans_start(env, mdd, handle);
if (rc)
rc = mdd_trans_start(env, mdd, handle);
if (rc)
+ GOTO(stop, rc);
+
+ if (likely(mdd_cobj != NULL))
+ mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
rc = mdd_trans_stop(env, mdd, rc, handle);
return rc;
rc = mdd_trans_stop(env, mdd, rc, handle);
return rc;
if (mrd->mrd_obj)
vic = md2mdd_obj(mrd->mrd_obj);
if (mrd->mrd_obj)
vic = md2mdd_obj(mrd->mrd_obj);
- if (vic) {
- /* don't use the same file to save the splitted mirror */
- rc = lu_fid_cmp(mdd_object_fid(obj), mdd_object_fid(vic));
- if (rc == 0)
- RETURN(-EPERM);
-
- if (rc > 0) {
- mdd_write_lock(env, obj, DT_TGT_CHILD);
- mdd_write_lock(env, vic, DT_TGT_CHILD);
- } else {
- mdd_write_lock(env, vic, DT_TGT_CHILD);
- mdd_write_lock(env, obj, DT_TGT_CHILD);
- }
- } else {
- mdd_write_lock(env, obj, DT_TGT_CHILD);
- }
-
handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
- GOTO(unlock, rc = PTR_ERR(handle));
+ RETURN(PTR_ERR(handle));
/* get EA of mirrored file */
memset(buf_save, 0, sizeof(*buf));
rc = mdd_stripe_get(env, obj, buf_save, XATTR_NAME_LOV);
if (rc < 0)
/* get EA of mirrored file */
memset(buf_save, 0, sizeof(*buf));
rc = mdd_stripe_get(env, obj, buf_save, XATTR_NAME_LOV);
if (rc < 0)
lcm = buf_save->lb_buf;
if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
lcm = buf_save->lb_buf;
if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
- GOTO(out, rc = -EINVAL);
+ GOTO(stop, rc = -EINVAL);
/**
* Extract the mirror with specified mirror id, and store the splitted
/**
* Extract the mirror with specified mirror id, and store the splitted
memset(buf_vic, 0, sizeof(*buf_vic));
rc = mdd_split_ea(lcm, mrd->mrd_mirror_id, buf, buf_vic);
if (rc < 0)
memset(buf_vic, 0, sizeof(*buf_vic));
rc = mdd_split_ea(lcm, mrd->mrd_mirror_id, buf, buf_vic);
if (rc < 0)
/**
* @buf stores layout w/o the specified mirror, @buf_vic stores the
* splitted mirror
/**
* @buf stores layout w/o the specified mirror, @buf_vic stores the
* splitted mirror
rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
LU_XATTR_SPLIT, handle);
if (rc)
rc = mdd_declare_xattr_set(env, mdd, obj, buf, XATTR_NAME_LOV,
LU_XATTR_SPLIT, handle);
if (rc)
/* declare vic set splitted layout in @buf_vic */
rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic,
XATTR_NAME_LOV, LU_XATTR_SPLIT,
handle);
if (rc)
/* declare vic set splitted layout in @buf_vic */
rc = mdd_declare_xattr_set(env, mdd, vic, buf_vic,
XATTR_NAME_LOV, LU_XATTR_SPLIT,
handle);
if (rc)
} else {
/**
* declare delete mirror objects in @buf_vic, will change obj's
} else {
/**
* declare delete mirror objects in @buf_vic, will change obj's
XATTR_NAME_LOV, LU_XATTR_PURGE,
handle);
if (rc)
XATTR_NAME_LOV, LU_XATTR_PURGE,
handle);
if (rc)
/* declare obj set remaining layout in @buf */
rc = mdd_declare_xattr_set(env, mdd, obj, buf,
XATTR_NAME_LOV, LU_XATTR_SPLIT,
handle);
if (rc)
/* declare obj set remaining layout in @buf */
rc = mdd_declare_xattr_set(env, mdd, obj, buf,
XATTR_NAME_LOV, LU_XATTR_SPLIT,
handle);
if (rc)
}
rc = mdd_trans_start(env, mdd, handle);
if (rc)
}
rc = mdd_trans_start(env, mdd, handle);
if (rc)
+ GOTO(stop, rc);
+
+ if (vic) {
+ /* don't use the same file to save the splitted mirror */
+ rc = lu_fid_cmp(mdd_object_fid(obj), mdd_object_fid(vic));
+ if (rc == 0)
+ GOTO(stop, rc = -EPERM);
+
+ if (rc > 0) {
+ mdd_write_lock(env, obj, DT_TGT_CHILD);
+ mdd_write_lock(env, vic, DT_TGT_CHILD);
+ } else {
+ mdd_write_lock(env, vic, DT_TGT_CHILD);
+ mdd_write_lock(env, obj, DT_TGT_CHILD);
+ }
+ } else {
+ mdd_write_lock(env, obj, DT_TGT_CHILD);
+ }
/* set obj's layout in @buf */
rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE,
handle);
if (rc)
/* set obj's layout in @buf */
rc = mdo_xattr_set(env, obj, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE,
handle);
if (rc)
if (vic) {
/* set vic's layout in @buf_vic */
if (vic) {
/* set vic's layout in @buf_vic */
rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle,
NULL);
if (rc)
rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle,
NULL);
if (rc)
if (vic) {
rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic,
handle, NULL);
if (rc)
if (vic) {
rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, vic,
handle, NULL);
if (rc)
PFID(mdd_object_fid(obj)), rc);
}
PFID(mdd_object_fid(obj)), rc);
}
+unlock:
+ mdd_write_unlock(env, obj);
+ if (vic)
+ mdd_write_unlock(env, vic);
+stop:
rc = mdd_trans_stop(env, mdd, rc, handle);
/* Truncate local DOM data if all went well */
if (!rc && dom_stripe)
mdd_dom_data_truncate(env, mdd, obj);
rc = mdd_trans_stop(env, mdd, rc, handle);
/* Truncate local DOM data if all went well */
if (!rc && dom_stripe)
mdd_dom_data_truncate(env, mdd, obj);
-unlock:
- mdd_write_unlock(env, obj);
- if (vic)
- mdd_write_unlock(env, vic);
lu_buf_free(buf_save);
lu_buf_free(buf);
lu_buf_free(buf_vic);
lu_buf_free(buf_save);
lu_buf_free(buf);
lu_buf_free(buf_vic);
}
run_test 60a "mirror extend sets correct size on sparse file"
}
run_test 60a "mirror extend sets correct size on sparse file"
+test_70() {
+ local tf=$DIR/$tdir/$tfile
+
+ test_mkdir $DIR/$tdir
+
+ while true; do
+ rm -f $tf
+ $LFS mirror create -N -E 1M -c -1 -E eof -N $tf
+ echo xxxx > $tf
+ done &
+ c_pid=$!
+ echo "mirror create pid $c_pid"
+
+ while true; do
+ $LFS mirror split -d --mirror-id=1 $tf &> /dev/null
+ done &
+ s_pid=$!
+ echo "mirror split pid $s_pid"
+
+ echo "mirror create and split race for 60 seconds, should not crash"
+ sleep 60
+ kill -9 $c_pid &> /dev/null
+ kill -9 $s_pid &> /dev/null
+
+ rm -f $tf
+ true
+}
+run_test 70 "mirror create and split race"
+
ctrl_file=$(mktemp /tmp/CTRL.XXXXXX)
lock_file=$(mktemp /var/lock/FLR.XXXXXX)
ctrl_file=$(mktemp /tmp/CTRL.XXXXXX)
lock_file=$(mktemp /var/lock/FLR.XXXXXX)