Whamcloud - gitweb
LU-15278 mdt: protect layout xattr 63/45663/8
authorBobi Jam <bobijam@whamcloud.com>
Fri, 26 Nov 2021 04:12:25 +0000 (12:12 +0800)
committerBobi Jam <bobijam@whamcloud.com>
Sat, 25 Dec 2021 04:47:01 +0000 (12:47 +0800)
During layout change, the layout xattr could be simultaneously
used by others.

This patch put the server LOVEA access in the protection of
mdt_object::mot_lov_mutex.

Signed-off-by: Bobi Jam <bobijam@whamcloud.com>
Change-Id: I4a749c9c3a53550d81d9fb5961be3094b215af92

lustre/lod/lod_lov.c
lustre/lod/lod_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_io.c
lustre/mdt/mdt_lvb.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_restripe.c
lustre/mdt/mdt_som.c
lustre/mdt/mdt_xattr.c

index 3b0f01a..5169d25 100644 (file)
@@ -565,7 +565,9 @@ int lod_alloc_comp_entries(struct lod_object *lo,
                           int mirror_count, int comp_count)
 {
        LASSERT(comp_count != 0);
-       LASSERT(lo->ldo_comp_cnt == 0 && lo->ldo_comp_entries == NULL);
+       LASSERTF(lo->ldo_comp_cnt == 0 && lo->ldo_comp_entries == NULL,
+                "comp_cnt %d comp_entries %p\n",
+                lo->ldo_comp_cnt, lo->ldo_comp_entries);
 
        if (mirror_count > 0) {
                OBD_ALLOC_PTR_ARRAY(lo->ldo_mirrors, mirror_count);
index 8b4633c..9d057fe 100644 (file)
@@ -4842,12 +4842,9 @@ static int lod_xattr_set(const struct lu_env *env,
                        rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
                } else if (fl & LU_XATTR_SPLIT) {
                        rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
-                       if (rc)
-                               RETURN(rc);
-
-                       rc = lod_striping_reload(env, lo, buf, LVF_ALL_STALE);
-                       if (rc)
-                               RETURN(rc);
+                       if (!rc)
+                               rc = lod_striping_reload(env, lo, buf,
+                                                        LVF_ALL_STALE);
                } else if (fl & LU_XATTR_PURGE) {
                        rc = lod_layout_purge(env, dt, buf, th);
                } else if (dt_object_remote(dt)) {
index da4e5c1..17638d5 100644 (file)
@@ -954,8 +954,10 @@ static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
        const struct lu_env *env = info->mti_env;
        int rc, rc2;
 
+       mutex_lock(&o->mot_lov_mutex);
        rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
                          XATTR_NAME_LOV);
+       mutex_unlock(&o->mot_lov_mutex);
 
        if (rc == -ENODATA)
                rc = 0;
@@ -982,16 +984,20 @@ out:
 }
 
 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
-                     const char *name)
+                     const char *name, bool lov_mutex_taken)
 {
        const struct lu_env *env = info->mti_env;
        int rc;
        ENTRY;
 
        LASSERT(info->mti_big_lmm_used == 0);
+
+       if (strcmp(name, XATTR_NAME_LOV) == 0 && !lov_mutex_taken)
+               mutex_lock(&o->mot_lov_mutex);
+
        rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
        if (rc < 0)
-               RETURN(rc);
+               GOTO(out, rc);
 
        /* big_lmm may need to be grown */
        if (info->mti_big_lmmsize < rc) {
@@ -1008,7 +1014,8 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
 
                OBD_ALLOC_LARGE(info->mti_big_lmm, size);
                if (info->mti_big_lmm == NULL)
-                       RETURN(-ENOMEM);
+                       GOTO(out, rc = -ENOMEM);
+
                info->mti_big_lmmsize = size;
        }
        LASSERT(info->mti_big_lmmsize >= rc);
@@ -1017,17 +1024,24 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
        info->mti_buf.lb_len = info->mti_big_lmmsize;
        rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
 
+out:
+       if (strcmp(name, XATTR_NAME_LOV) == 0 && !lov_mutex_taken)
+               mutex_unlock(&o->mot_lov_mutex);
+
        RETURN(rc);
 }
 
 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
-                    struct md_attr *ma, const char *name)
+                    struct md_attr *ma, const char *name, bool lov_mutex_taken)
 {
        struct md_object *next = mdt_object_child(o);
        struct lu_buf    *buf = &info->mti_buf;
        int rc;
 
        if (strcmp(name, XATTR_NAME_LOV) == 0) {
+               if (!lov_mutex_taken)
+                       mutex_lock(&o->mot_lov_mutex);
+
                buf->lb_buf = ma->ma_lmm;
                buf->lb_len = ma->ma_lmm_size;
                LASSERT(!(ma->ma_valid & MA_LOV));
@@ -1046,8 +1060,8 @@ int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
        LASSERT(buf->lb_buf);
 
        rc = mo_xattr_get(info->mti_env, next, buf, name);
-       if (rc > 0) {
 
+       if (rc > 0) {
 got:
                if (strcmp(name, XATTR_NAME_LOV) == 0) {
                        if (info->mti_big_lmm_used)
@@ -1058,7 +1072,8 @@ got:
                                     LOV_PATTERN_F_HOLE) &&
                            !(exp_connect_flags(info->mti_exp) &
                              OBD_CONNECT_LFSCK)) {
-                               return -EIO;
+                               rc = -EIO;
+                               goto out;
                        } else {
                                ma->ma_lmm_size = rc;
                                ma->ma_valid |= MA_LOV;
@@ -1086,14 +1101,19 @@ got:
                /* Default LMV has fixed size, so it must be able to fit
                 * in the original buffer */
                if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
-                       return rc;
-               rc = mdt_big_xattr_get(info, o, name);
+                       goto out;
+
+               rc = mdt_big_xattr_get(info, o, name, true);
                if (rc > 0) {
                        info->mti_big_lmm_used = 1;
                        goto got;
                }
        }
 
+out:
+       if (strcmp(name, XATTR_NAME_LOV) == 0 && !lov_mutex_taken)
+               mutex_unlock(&o->mot_lov_mutex);
+
        return rc;
 }
 
@@ -1122,7 +1142,7 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
        }
 
        LASSERT(!info->mti_big_lmm_used);
-       rc = __mdt_stripe_get(info, o, ma, name);
+       rc = __mdt_stripe_get(info, o, ma, name, false);
        /* since big_lmm is always used here, clear 'used' flag to avoid
         * assertion in mdt_big_xattr_get().
         */
@@ -1147,7 +1167,7 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
        /* ignore errors, MA_PFID won't be set and it is
         * up to the caller to treat this as an error */
        if (rc == -ERANGE || buf->lb_len == 0) {
-               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK, false);
                buf->lb_buf = info->mti_big_lmm;
                buf->lb_len = info->mti_big_lmmsize;
        }
@@ -1192,7 +1212,7 @@ int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
        rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
                          XATTR_NAME_LINK);
        if (rc == -ERANGE) {
-               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK, false);
                buf->lb_buf = info->mti_big_lmm;
                buf->lb_len = info->mti_big_lmmsize;
        }
@@ -1226,6 +1246,12 @@ int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
 int mdt_attr_get_complex(struct mdt_thread_info *info,
                         struct mdt_object *o, struct md_attr *ma)
 {
+       return __mdt_attr_get_complex(info, o, ma, false);
+}
+
+int __mdt_attr_get_complex(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct md_attr *ma, bool lov_mutex_taken)
+{
        const struct lu_env *env = info->mti_env;
        struct md_object    *next = mdt_object_child(o);
        struct lu_buf       *buf = &info->mti_buf;
@@ -1260,19 +1286,22 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
        }
 
        if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
-               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV,
+                                     lov_mutex_taken);
                if (rc)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV && S_ISDIR(mode)) {
-               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV,
+                                     lov_mutex_taken);
                if (rc != 0)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV_DEF && S_ISDIR(mode)) {
-               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV,
+                                     lov_mutex_taken);
                if (rc != 0)
                        GOTO(out, rc);
        }
index 7ada2f8..f78e249 100644 (file)
@@ -348,7 +348,7 @@ struct mdt_object {
                                mot_auto_split_disabled:1;
        int                     mot_write_count;
        spinlock_t              mot_write_lock;
-        /* Lock to protect create_data */
+       /* Lock to protect create_data and LOVEA */
        struct mutex            mot_lov_mutex;
        /* Lock to protect object's SOM update. */
        struct mutex            mot_som_mutex;
@@ -893,12 +893,16 @@ int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
                                  struct mdt_object *obj);
 
 int mdt_get_info(struct tgt_session_info *tsi);
+int __mdt_attr_get_complex(struct mdt_thread_info *info,
+                          struct mdt_object *o, struct md_attr *ma,
+                          bool lov_mutex_taken);
 int mdt_attr_get_complex(struct mdt_thread_info *info,
                         struct mdt_object *o, struct md_attr *ma);
 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
-                     const char *name);
+                     const char *name, bool lov_mutex_taken);
 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
-                    struct md_attr *ma, const char *name);
+                    struct md_attr *ma, const char *name,
+                    bool lov_mutex_taken);
 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
                   struct md_attr *ma, const char *name);
 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
index bf80be4..595bd3b 100644 (file)
@@ -1624,7 +1624,7 @@ int mdt_data_version_get(struct tgt_session_info *tsi)
        }
 
        /* Read layout to get its version */
-       rc = mdt_big_xattr_get(mti, mo, XATTR_NAME_LOV);
+       rc = mdt_big_xattr_get(mti, mo, XATTR_NAME_LOV, false);
        if (rc == -ENODATA) /* File has no layout yet */
                GOTO(out, rc = 0);
        else if (rc < 0)
index 729cb8e..d6616a4 100644 (file)
@@ -395,9 +395,11 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock,
        child = mdt_object_child(obj);
 
        /* get the length of lsm */
+       mutex_lock(&obj->mot_lov_mutex);
+
        rc = mo_xattr_get(env, child, &LU_BUF_NULL, XATTR_NAME_LOV);
        if (rc < 0)
-               GOTO(out_put, rc);
+               GOTO(unlock, rc);
        if (rc > 0) {
                struct lu_buf *lmm = NULL;
                if (*lvblen < rc) {
@@ -423,16 +425,18 @@ static int mdt_lvbo_fill(struct ldlm_lock *lock,
                                     mdt_obd_name(mdt), *lvblen, rc,
                                     mdt->mdt_max_mdsize, -ERANGE);
                        *lvblen = rc;
-                       GOTO(out_put, rc = -ERANGE);
+                       GOTO(unlock, rc = -ERANGE);
                }
                lmm = &info->mti_buf;
                lmm->lb_buf = lvb;
                lmm->lb_len = rc;
                rc = mo_xattr_get(env, child, lmm, XATTR_NAME_LOV);
                if (rc < 0)
-                       GOTO(out_put, rc);
+                       GOTO(unlock, rc);
        }
 
+unlock:
+       mutex_unlock(&obj->mot_lov_mutex);
 out_put:
        if (obj != NULL && !IS_ERR(obj))
                mdt_object_put(env, obj);
index cc1afe1..80f34bd 100644 (file)
@@ -126,7 +126,7 @@ static int mdt_create_data(struct mdt_thread_info *info,
                                     p ? mdt_object_child(p) : NULL,
                                     mdt_object_child(o), spec, ma);
                if (rc == 0)
-                       rc = mdt_attr_get_complex(info, o, ma);
+                       rc = __mdt_attr_get_complex(info, o, ma, true);
 
                if (rc == 0 && ma->ma_valid & MA_LOV)
                        o->mot_lov_created = 1;
@@ -2143,10 +2143,12 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
 
                buf->lb_len = sizeof(mrd);
                buf->lb_buf = &mrd;
+               mutex_lock(&o->mot_lov_mutex);
                rc = mo_xattr_set(info->mti_env, mdt_object_child(o), buf,
                                  XATTR_LUSTRE_LOV,
                                  ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT ?
                                  LU_XATTR_SPLIT : LU_XATTR_MERGE);
+               mutex_unlock(&o->mot_lov_mutex);
                if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS |
                                                       LA_LSIZE | LA_LBLOCKS)) {
                        int rc2;
index 0e662cf..4eebc8f 100644 (file)
@@ -863,7 +863,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                 */
                if (!exp_connect_flr(info->mti_exp) ||
                    !exp_connect_overstriping(info->mti_exp)) {
-                       rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
+                       rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV, false);
                        if (rc < 0 && rc != -ENODATA)
                                GOTO(out_put, rc);
 
@@ -991,10 +991,15 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                if (rc != 0)
                        GOTO(out_put, rc);
 
+               if (strcmp(name, XATTR_NAME_LOV) == 0)
+                       mutex_lock(&mo->mot_lov_mutex);
                rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
                                  name, 0);
+               if (strcmp(name, XATTR_NAME_LOV) == 0)
+                       mutex_unlock(&mo->mot_lov_mutex);
 
                mdt_object_unlock(info, mo, lh, rc);
+
                if (rc)
                        GOTO(out_put, rc);
        } else {
index 22559f5..2107e56 100644 (file)
@@ -810,7 +810,7 @@ static int mdt_restripe_layout_update(struct mdt_thread_info *info)
                        GOTO(out, rc = PTR_ERR(stripe));
 
                ma->ma_valid = 0;
-               rc = __mdt_stripe_get(info, stripe, ma, XATTR_NAME_LMV);
+               rc = __mdt_stripe_get(info, stripe, ma, XATTR_NAME_LMV, false);
                /* LMV is checked without lock, don't cache it */
                mo_invalidate(env, mdt_object_child(stripe));
                mdt_object_put(env, stripe);
index 62c52a5..0968707 100644 (file)
@@ -201,7 +201,7 @@ int mdt_lsom_update(struct mdt_thread_info *info,
         * should contain valid LOV EA data, and can be used directly.
         */
        if (!info->mti_big_lmm_used) {
-               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LOV);
+               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LOV, false);
                if (rc < 0 && rc != -ENODATA)
                        GOTO(out_lock, rc);
 
index 819086b..c0e812a 100644 (file)
@@ -73,9 +73,14 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info *info)
                    !strncmp(xattr_name, user_string, sizeof(user_string) - 1))
                        RETURN(-EOPNOTSUPP);
 
+               if (strcmp(xattr_name, XATTR_NAME_LOV) == 0)
+                       mutex_lock(&info->mti_object->mot_lov_mutex);
                size = mo_xattr_get(info->mti_env,
                                    mdt_object_child(info->mti_object),
                                    &LU_BUF_NULL, xattr_name);
+               if (strcmp(xattr_name, XATTR_NAME_LOV) == 0)
+                       mutex_unlock(&info->mti_object->mot_lov_mutex);
+
                if (size == -ENODATA) {
                        /* XXX: Some client code will not handle -ENODATA
                         * for XATTR_NAME_LOV (trusted.lov) properly.
@@ -286,7 +291,12 @@ int mdt_getxattr(struct mdt_thread_info *info)
        if (valid == OBD_MD_FLXATTR) {
                const char *xattr_name = req_capsule_client_get(info->mti_pill,
                                                                &RMF_NAME);
+
+               if (strcmp(xattr_name, XATTR_NAME_LOV) == 0)
+                       mutex_lock(&info->mti_object->mot_lov_mutex);
                rc = mo_xattr_get(info->mti_env, next, buf, xattr_name);
+               if (strcmp(xattr_name, XATTR_NAME_LOV) == 0)
+                       mutex_unlock(&info->mti_object->mot_lov_mutex);
                if (rc < 0)
                        GOTO(out, rc);
 
@@ -666,7 +676,11 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
 
                buf->lb_buf = rr->rr_eadata;
                buf->lb_len = xattr_len;
+               if (lockpart & MDS_INODELOCK_LAYOUT)
+                       mutex_lock(&obj->mot_lov_mutex);
                rc = mo_xattr_set(env, child, buf, xattr_name, flags);
+               if (lockpart & MDS_INODELOCK_LAYOUT)
+                       mutex_unlock(&obj->mot_lov_mutex);
                /* update ctime after xattr changed */
                if (rc == 0) {
                        ma->ma_attr_flags |= MDS_PERM_BYPASS;