int mirror_count, int comp_count)
{
LASSERT(comp_count != 0);
- LASSERT(lo->ldo_comp_cnt == 0 && lo->ldo_comp_entries == NULL);
+ LASSERTF(lo->ldo_comp_cnt == 0 && lo->ldo_comp_entries == NULL,
+ "comp_cnt %d comp_entries %p\n",
+ lo->ldo_comp_cnt, lo->ldo_comp_entries);
if (mirror_count > 0) {
OBD_ALLOC_PTR_ARRAY(lo->ldo_mirrors, mirror_count);
rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
} else if (fl & LU_XATTR_SPLIT) {
rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
- if (rc)
- RETURN(rc);
-
- rc = lod_striping_reload(env, lo, buf, LVF_ALL_STALE);
- if (rc)
- RETURN(rc);
+ if (!rc)
+ rc = lod_striping_reload(env, lo, buf,
+ LVF_ALL_STALE);
} else if (fl & LU_XATTR_PURGE) {
rc = lod_layout_purge(env, dt, buf, th);
} else if (dt_object_remote(dt)) {
const struct lu_env *env = info->mti_env;
int rc, rc2;
+ mutex_lock(&o->mot_lov_mutex);
rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
XATTR_NAME_LOV);
+ mutex_unlock(&o->mot_lov_mutex);
if (rc == -ENODATA)
rc = 0;
}
int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
- const char *name)
+ const char *name, bool lov_mutex_taken)
{
const struct lu_env *env = info->mti_env;
int rc;
ENTRY;
LASSERT(info->mti_big_lmm_used == 0);
+
+ if (strcmp(name, XATTR_NAME_LOV) == 0 && !lov_mutex_taken)
+ mutex_lock(&o->mot_lov_mutex);
+
rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
if (rc < 0)
- RETURN(rc);
+ GOTO(out, rc);
/* big_lmm may need to be grown */
if (info->mti_big_lmmsize < rc) {
OBD_ALLOC_LARGE(info->mti_big_lmm, size);
if (info->mti_big_lmm == NULL)
- RETURN(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
+
info->mti_big_lmmsize = size;
}
LASSERT(info->mti_big_lmmsize >= rc);
info->mti_buf.lb_len = info->mti_big_lmmsize;
rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
+out:
+ if (strcmp(name, XATTR_NAME_LOV) == 0 && !lov_mutex_taken)
+ mutex_unlock(&o->mot_lov_mutex);
+
RETURN(rc);
}
int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
- struct md_attr *ma, const char *name)
+ struct md_attr *ma, const char *name, bool lov_mutex_taken)
{
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
int rc;
if (strcmp(name, XATTR_NAME_LOV) == 0) {
+ if (!lov_mutex_taken)
+ mutex_lock(&o->mot_lov_mutex);
+
buf->lb_buf = ma->ma_lmm;
buf->lb_len = ma->ma_lmm_size;
LASSERT(!(ma->ma_valid & MA_LOV));
LASSERT(buf->lb_buf);
rc = mo_xattr_get(info->mti_env, next, buf, name);
- if (rc > 0) {
+ if (rc > 0) {
got:
if (strcmp(name, XATTR_NAME_LOV) == 0) {
if (info->mti_big_lmm_used)
LOV_PATTERN_F_HOLE) &&
!(exp_connect_flags(info->mti_exp) &
OBD_CONNECT_LFSCK)) {
- return -EIO;
+ rc = -EIO;
+ goto out;
} else {
ma->ma_lmm_size = rc;
ma->ma_valid |= MA_LOV;
/* Default LMV has fixed size, so it must be able to fit
* in the original buffer */
if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
- return rc;
- rc = mdt_big_xattr_get(info, o, name);
+ goto out;
+
+ rc = mdt_big_xattr_get(info, o, name, true);
if (rc > 0) {
info->mti_big_lmm_used = 1;
goto got;
}
}
+out:
+ if (strcmp(name, XATTR_NAME_LOV) == 0 && !lov_mutex_taken)
+ mutex_unlock(&o->mot_lov_mutex);
+
return rc;
}
}
LASSERT(!info->mti_big_lmm_used);
- rc = __mdt_stripe_get(info, o, ma, name);
+ rc = __mdt_stripe_get(info, o, ma, name, false);
/* since big_lmm is always used here, clear 'used' flag to avoid
* assertion in mdt_big_xattr_get().
*/
/* ignore errors, MA_PFID won't be set and it is
* up to the caller to treat this as an error */
if (rc == -ERANGE || buf->lb_len == 0) {
- rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+ rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK, false);
buf->lb_buf = info->mti_big_lmm;
buf->lb_len = info->mti_big_lmmsize;
}
rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
XATTR_NAME_LINK);
if (rc == -ERANGE) {
- rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+ rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK, false);
buf->lb_buf = info->mti_big_lmm;
buf->lb_len = info->mti_big_lmmsize;
}
int mdt_attr_get_complex(struct mdt_thread_info *info,
struct mdt_object *o, struct md_attr *ma)
{
+ return __mdt_attr_get_complex(info, o, ma, false);
+}
+
+int __mdt_attr_get_complex(struct mdt_thread_info *info, struct mdt_object *o,
+ struct md_attr *ma, bool lov_mutex_taken)
+{
const struct lu_env *env = info->mti_env;
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
}
if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
- rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
+ rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV,
+ lov_mutex_taken);
if (rc)
GOTO(out, rc);
}
if (need & MA_LMV && S_ISDIR(mode)) {
- rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+ rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV,
+ lov_mutex_taken);
if (rc != 0)
GOTO(out, rc);
}
if (need & MA_LMV_DEF && S_ISDIR(mode)) {
- rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
+ rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV,
+ lov_mutex_taken);
if (rc != 0)
GOTO(out, rc);
}
mot_auto_split_disabled:1;
int mot_write_count;
spinlock_t mot_write_lock;
- /* Lock to protect create_data */
+ /* Lock to protect create_data and LOVEA */
struct mutex mot_lov_mutex;
/* Lock to protect object's SOM update. */
struct mutex mot_som_mutex;
struct mdt_object *obj);
int mdt_get_info(struct tgt_session_info *tsi);
+int __mdt_attr_get_complex(struct mdt_thread_info *info,
+ struct mdt_object *o, struct md_attr *ma,
+ bool lov_mutex_taken);
int mdt_attr_get_complex(struct mdt_thread_info *info,
struct mdt_object *o, struct md_attr *ma);
int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
- const char *name);
+ const char *name, bool lov_mutex_taken);
int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
- struct md_attr *ma, const char *name);
+ struct md_attr *ma, const char *name,
+ bool lov_mutex_taken);
int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
struct md_attr *ma, const char *name);
int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
}
/* Read layout to get its version */
- rc = mdt_big_xattr_get(mti, mo, XATTR_NAME_LOV);
+ rc = mdt_big_xattr_get(mti, mo, XATTR_NAME_LOV, false);
if (rc == -ENODATA) /* File has no layout yet */
GOTO(out, rc = 0);
else if (rc < 0)
child = mdt_object_child(obj);
/* get the length of lsm */
+ mutex_lock(&obj->mot_lov_mutex);
+
rc = mo_xattr_get(env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
if (rc < 0)
- GOTO(out_put, rc);
+ GOTO(unlock, rc);
if (rc > 0) {
struct lu_buf *lmm = NULL;
if (*lvblen < rc) {
mdt_obd_name(mdt), *lvblen, rc,
mdt->mdt_max_mdsize, -ERANGE);
*lvblen = rc;
- GOTO(out_put, rc = -ERANGE);
+ GOTO(unlock, rc = -ERANGE);
}
lmm = &info->mti_buf;
lmm->lb_buf = lvb;
lmm->lb_len = rc;
rc = mo_xattr_get(env, child, lmm, XATTR_NAME_LOV);
if (rc < 0)
- GOTO(out_put, rc);
+ GOTO(unlock, rc);
}
+unlock:
+ mutex_unlock(&obj->mot_lov_mutex);
out_put:
if (obj != NULL && !IS_ERR(obj))
mdt_object_put(env, obj);
p ? mdt_object_child(p) : NULL,
mdt_object_child(o), spec, ma);
if (rc == 0)
- rc = mdt_attr_get_complex(info, o, ma);
+ rc = __mdt_attr_get_complex(info, o, ma, true);
if (rc == 0 && ma->ma_valid & MA_LOV)
o->mot_lov_created = 1;
buf->lb_len = sizeof(mrd);
buf->lb_buf = &mrd;
+ mutex_lock(&o->mot_lov_mutex);
rc = mo_xattr_set(info->mti_env, mdt_object_child(o), buf,
XATTR_LUSTRE_LOV,
ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT ?
LU_XATTR_SPLIT : LU_XATTR_MERGE);
+ mutex_unlock(&o->mot_lov_mutex);
if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS |
LA_LSIZE | LA_LBLOCKS)) {
int rc2;
*/
if (!exp_connect_flr(info->mti_exp) ||
!exp_connect_overstriping(info->mti_exp)) {
- rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
+ rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV, false);
if (rc < 0 && rc != -ENODATA)
GOTO(out_put, rc);
if (rc != 0)
GOTO(out_put, rc);
+ if (strcmp(name, XATTR_NAME_LOV) == 0)
+ mutex_lock(&mo->mot_lov_mutex);
rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
name, 0);
+ if (strcmp(name, XATTR_NAME_LOV) == 0)
+ mutex_unlock(&mo->mot_lov_mutex);
mdt_object_unlock(info, mo, lh, rc);
+
if (rc)
GOTO(out_put, rc);
} else {
GOTO(out, rc = PTR_ERR(stripe));
ma->ma_valid = 0;
- rc = __mdt_stripe_get(info, stripe, ma, XATTR_NAME_LMV);
+ rc = __mdt_stripe_get(info, stripe, ma, XATTR_NAME_LMV, false);
/* LMV is checked without lock, don't cache it */
mo_invalidate(env, mdt_object_child(stripe));
mdt_object_put(env, stripe);
* should contain valid LOV EA data, and can be used directly.
*/
if (!info->mti_big_lmm_used) {
- rc = mdt_big_xattr_get(info, o, XATTR_NAME_LOV);
+ rc = mdt_big_xattr_get(info, o, XATTR_NAME_LOV, false);
if (rc < 0 && rc != -ENODATA)
GOTO(out_lock, rc);
!strncmp(xattr_name, user_string, sizeof(user_string) - 1))
RETURN(-EOPNOTSUPP);
+ if (strcmp(xattr_name, XATTR_NAME_LOV) == 0)
+ mutex_lock(&info->mti_object->mot_lov_mutex);
size = mo_xattr_get(info->mti_env,
mdt_object_child(info->mti_object),
&LU_BUF_NULL, xattr_name);
+ if (strcmp(xattr_name, XATTR_NAME_LOV) == 0)
+ mutex_unlock(&info->mti_object->mot_lov_mutex);
+
if (size == -ENODATA) {
/* XXX: Some client code will not handle -ENODATA
* for XATTR_NAME_LOV (trusted.lov) properly.
if (valid == OBD_MD_FLXATTR) {
const char *xattr_name = req_capsule_client_get(info->mti_pill,
&RMF_NAME);
+
+ if (strcmp(xattr_name, XATTR_NAME_LOV) == 0)
+ mutex_lock(&info->mti_object->mot_lov_mutex);
rc = mo_xattr_get(info->mti_env, next, buf, xattr_name);
+ if (strcmp(xattr_name, XATTR_NAME_LOV) == 0)
+ mutex_unlock(&info->mti_object->mot_lov_mutex);
if (rc < 0)
GOTO(out, rc);
buf->lb_buf = rr->rr_eadata;
buf->lb_len = xattr_len;
+ if (lockpart & MDS_INODELOCK_LAYOUT)
+ mutex_lock(&obj->mot_lov_mutex);
rc = mo_xattr_set(env, child, buf, xattr_name, flags);
+ if (lockpart & MDS_INODELOCK_LAYOUT)
+ mutex_unlock(&obj->mot_lov_mutex);
/* update ctime after xattr changed */
if (rc == 0) {
ma->ma_attr_flags |= MDS_PERM_BYPASS;