Whamcloud - gitweb
LU-9052 lod: accept lfs mkdir from old client 91/25091/9
authorDi Wang <di.wang@intel.com>
Wed, 25 Jan 2017 16:24:28 +0000 (11:24 -0500)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 22 Nov 2017 03:54:56 +0000 (03:54 +0000)
MDS should also accept create striped directory request
from old client, but it will only honour stripe index in
this case, since the old client can only create remote
directory.

And also move the magic check from LOD to MDD to validate
LMV as early as possible.

Directory creation for echo client should go with lmv_user_md,
otherwise validation on user md will fail.

For striped directory, echo client should first choose stripe
before MD operation.

And also remove some unecessary lmv audit.

Re-enable 225b tests for DNE to test echo operations on
striped directory.

In mds-survey, if the current environment has more than one MDS,
let's skip the test. Because echo test will attach echo_client
individually on each MDSs(all of MDDs on the MDS), so each echo
client will not be able to access all of MDTs. But the test will
create a striped dir by MAX MDS count internally in mds_survey,
so echo client can not resolve the directory path in this case.

Signed-off-by: Di Wang <di.wang@intel.com>
Change-Id: I74a3712edfa36fd178f92965f8e35f5ff77ad422
Reviewed-on: https://review.whamcloud.com/25091
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/obdecho/echo_client.c
lustre/tests/mds-survey.sh
lustre/tests/sanity.sh

index f4adad5..a0c38c1 100644 (file)
@@ -427,7 +427,8 @@ enum ll_lease_type {
 #define LOV_USER_MAGIC_SPECIFIC 0x0BD50BD0     /* for specific OSTs */
 #define LOV_USER_MAGIC_COMP_V1 0x0BD60BD0
 
-#define LMV_USER_MAGIC    0x0CD30CD0    /*default lmv magic*/
+#define LMV_USER_MAGIC         0x0CD30CD0    /* default lmv magic */
+#define LMV_USER_MAGIC_V0      0x0CD20CD0    /* old default lmv magic*/
 
 #define LOV_PATTERN_NONE       0x000
 #define LOV_PATTERN_RAID0      0x001
index 6fbedf0..03e9f9f 100644 (file)
@@ -1954,12 +1954,10 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env,
                                     struct thandle *th)
 {
        struct lod_object       *lo = lod_dt_obj(dt);
-       struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
-       struct lmv_user_md_v1   *lum;
+       struct lmv_user_md_v1   *lum = lum_buf->lb_buf;
        int                     rc;
        ENTRY;
 
-       lum = lum_buf->lb_buf;
        LASSERT(lum != NULL);
 
        CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
@@ -1969,10 +1967,6 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env,
        if (le32_to_cpu(lum->lum_stripe_count) == 0)
                GOTO(out, rc = 0);
 
-       rc = lod_verify_md_striping(lod, lum);
-       if (rc != 0)
-               GOTO(out, rc);
-
        /* prepare dir striped objects */
        rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
        if (rc != 0) {
@@ -3882,6 +3876,8 @@ static void lod_ah_init(const struct lu_env *env,
                nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
 
        if (S_ISDIR(child_mode)) {
+               const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
+
                /* other default values are 0 */
                lc->ldo_dir_stripe_offset = -1;
 
@@ -3894,10 +3890,12 @@ static void lod_ah_init(const struct lu_env *env,
                        lc->ldo_def_striping = lds;
 
                /* It should always honour the specified stripes */
+               /* Note: old client (< 2.7)might also do lfs mkdir, whose EA
+                * will have old magic. In this case, we should ignore the
+                * stripe count and try to create dir by default stripe.
+                */
                if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 &&
-                   lod_verify_md_striping(d, ah->dah_eadata) == 0) {
-                       const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
-
+                   le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC) {
                        lc->ldo_dir_stripe_count =
                                le32_to_cpu(lum1->lum_stripe_count);
                        lc->ldo_dir_stripe_offset =
index ed18566..1d1538d 100644 (file)
@@ -1938,6 +1938,24 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                check_perm = false;
        }
 
+       if (S_ISDIR(cattr->la_mode) &&
+           unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
+           spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
+               const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+
+               if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC) &&
+                   le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
+                       rc = -EINVAL;
+                       CERROR("%s: invalid lmv_user_md: magic = %x, "
+                              "stripe_offset = %d, stripe_count = %u: "
+                              "rc = %d\n", mdd2obd_dev(m)->obd_name,
+                               le32_to_cpu(lum->lum_magic),
+                              (int)le32_to_cpu(lum->lum_stripe_offset),
+                              le32_to_cpu(lum->lum_stripe_count), rc);
+                       return rc;
+               }
+       }
+
        rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
        if (rc != 0)
                RETURN(rc);
index 371e7cb..5f7812e 100644 (file)
@@ -45,6 +45,7 @@
 #include <lprocfs_status.h>
 #include <cl_object.h>
 #include <lustre_fid.h>
+#include <lustre_lmv.h>
 #include <lustre_acl.h>
 #include <uapi/linux/lustre/lustre_ioctl.h>
 #include <lustre_net.h>
@@ -1393,7 +1394,12 @@ static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
 
        LASSERT(ma->ma_lmm_size > 0);
 
-       rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
+       LASSERT(ma->ma_need & (MA_LOV | MA_LMV));
+       if (ma->ma_need & MA_LOV)
+               rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
+       else
+               rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LMV);
+
        if (rc < 0)
                RETURN(rc);
 
@@ -1419,11 +1425,18 @@ static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
 
        info->eti_buf.lb_buf = info->eti_big_lmm;
        info->eti_buf.lb_len = info->eti_big_lmmsize;
-       rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
+       if (ma->ma_need & MA_LOV)
+               rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
+       else
+               rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LMV);
        if (rc < 0)
                RETURN(rc);
 
-       ma->ma_valid |= MA_LOV;
+       if (ma->ma_need & MA_LOV)
+               ma->ma_valid |= MA_LOV;
+       else
+               ma->ma_valid |= MA_LMV;
+
        ma->ma_lmm = info->eti_big_lmm;
        ma->ma_lmm_size = rc;
 
@@ -1474,6 +1487,26 @@ static int echo_attr_get_complex(const struct lu_env *env,
                }
        }
 
+       if (need & MA_LMV && S_ISDIR(mode)) {
+               LASSERT(ma->ma_lmm_size > 0);
+               buf->lb_buf = ma->ma_lmm;
+               buf->lb_len = ma->ma_lmm_size;
+               rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV);
+               if (rc2 > 0) {
+                       ma->ma_lmm_size = rc2;
+                       ma->ma_valid |= MA_LMV;
+               } else if (rc2 == -ENODATA) {
+                       /* no LMV EA */
+                       ma->ma_lmm_size = 0;
+               } else if (rc2 == -ERANGE) {
+                       rc2 = echo_big_lmm_get(env, next, ma);
+                       if (rc2 < 0)
+                               GOTO(out, rc = rc2);
+               } else {
+                       GOTO(out, rc = rc2);
+               }
+       }
+
 #ifdef CONFIG_FS_POSIX_ACL
        if (need & MA_ACL_DEF && S_ISDIR(mode)) {
                buf->lb_buf = ma->ma_acl;
@@ -1569,6 +1602,81 @@ static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
        return 0;
 }
 
+static int
+echo_md_dir_stripe_choose(const struct lu_env *env, struct echo_device *ed,
+                         struct lu_object *obj, const char *name,
+                         unsigned int namelen, __u64 id,
+                         struct lu_object **new_parent)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       struct md_attr          *ma = &info->eti_ma;
+       struct lmv_mds_md_v1    *lmv;
+       struct lu_device        *ld = ed->ed_next;
+       unsigned int            idx;
+       struct lu_name          tmp_ln_name;
+       struct lu_fid           stripe_fid;
+       struct lu_object        *stripe_obj;
+       int                     rc;
+
+       LASSERT(obj != NULL);
+       LASSERT(S_ISDIR(obj->lo_header->loh_attr));
+
+       memset(ma, 0, sizeof(*ma));
+       echo_set_lmm_size(env, ld, ma);
+       ma->ma_need = MA_LMV;
+       rc = echo_attr_get_complex(env, lu2md(obj), ma);
+       if (rc) {
+               CERROR("Can not getattr child "DFID": rc = %d\n",
+                       PFID(lu_object_fid(obj)), rc);
+               return rc;
+       }
+
+       if (!(ma->ma_valid & MA_LMV)) {
+               *new_parent = obj;
+               return 0;
+       }
+
+       lmv = (struct lmv_mds_md_v1 *)ma->ma_lmm;
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) {
+               rc = -EINVAL;
+               CERROR("Invalid mds md magic %x "DFID": rc = %d\n",
+                      le32_to_cpu(lmv->lmv_magic), PFID(lu_object_fid(obj)),
+                      rc);
+               return rc;
+       }
+
+       if (name != NULL) {
+               tmp_ln_name.ln_name = name;
+               tmp_ln_name.ln_namelen = namelen;
+       } else {
+               LASSERT(id != -1);
+               echo_md_build_name(&tmp_ln_name, info->eti_name, id);
+       }
+
+       idx = lmv_name_to_stripe_index(LMV_HASH_TYPE_FNV_1A_64,
+                               le32_to_cpu(lmv->lmv_stripe_count),
+                               tmp_ln_name.ln_name, tmp_ln_name.ln_namelen);
+
+       LASSERT(idx < le32_to_cpu(lmv->lmv_stripe_count));
+       fid_le_to_cpu(&stripe_fid, &lmv->lmv_stripe_fids[idx]);
+
+       stripe_obj = lu_object_find_at(env, ld, &stripe_fid, NULL);
+       if (IS_ERR(stripe_obj)) {
+               rc = PTR_ERR(stripe_obj);
+               CERROR("Can not find the parent "DFID": rc = %d\n",
+                      PFID(&stripe_fid), rc);
+               return rc;
+       }
+
+       *new_parent = lu_object_locate(stripe_obj->lo_header, ld->ld_type);
+       if (*new_parent == NULL) {
+               lu_object_put(env, stripe_obj);
+               RETURN(-ENXIO);
+       }
+
+       return rc;
+}
+
 static int echo_create_md_object(const struct lu_env *env,
                                  struct echo_device *ed,
                                  struct lu_object *ec_parent,
@@ -1577,37 +1685,56 @@ static int echo_create_md_object(const struct lu_env *env,
                                  __u64 id, __u32 mode, int count,
                                  int stripe_count, int stripe_offset)
 {
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        struct md_op_spec       *spec = &info->eti_spec;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       struct md_op_spec       *spec = &info->eti_spec;
+       struct md_attr          *ma = &info->eti_ma;
+       struct lu_device        *ld = ed->ed_next;
+       int                      rc = 0;
+       int                      i;
 
        ENTRY;
 
        if (ec_parent == NULL)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
        if (parent == NULL)
                RETURN(-ENXIO);
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
+                                      id, &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
+       LASSERT(new_parent != NULL);
        memset(ma, 0, sizeof(*ma));
        memset(spec, 0, sizeof(*spec));
+       echo_set_lmm_size(env, ld, ma);
        if (stripe_count != 0) {
                spec->sp_cr_flags |= FMODE_WRITE;
-               echo_set_lmm_size(env, ld, ma);
                if (stripe_count != -1) {
-                       struct lov_user_md_v3 *lum = &info->eti_lum;
-
-                       lum->lmm_magic = LOV_USER_MAGIC_V3;
-                       lum->lmm_stripe_count = stripe_count;
-                       lum->lmm_stripe_offset = stripe_offset;
-                       lum->lmm_pattern = LOV_PATTERN_NONE;
-                       spec->u.sp_ea.eadata = lum;
-                       spec->u.sp_ea.eadatalen = sizeof(*lum);
+                       if (S_ISDIR(mode)) {
+                               struct lmv_user_md *lmu;
+
+                               lmu = (struct lmv_user_md *)&info->eti_lum;
+                               lmu->lum_magic = LMV_USER_MAGIC;
+                               lmu->lum_stripe_offset = stripe_offset;
+                               lmu->lum_stripe_count = stripe_count;
+                               lmu->lum_hash_type = LMV_HASH_TYPE_FNV_1A_64;
+                               spec->u.sp_ea.eadata = lmu;
+                               spec->u.sp_ea.eadatalen = sizeof(*lmu);
+                       } else {
+                               struct lov_user_md_v3 *lum = &info->eti_lum;
+
+                               lum->lmm_magic = LOV_USER_MAGIC_V3;
+                               lum->lmm_stripe_count = stripe_count;
+                               lum->lmm_stripe_offset = stripe_offset;
+                               lum->lmm_pattern = LOV_PATTERN_NONE;
+                               spec->u.sp_ea.eadata = lum;
+                               spec->u.sp_ea.eadatalen = sizeof(*lum);
+                       }
                        spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
                }
        }
@@ -1616,31 +1743,35 @@ static int echo_create_md_object(const struct lu_env *env,
        ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
         ma->ma_attr.la_ctime = cfs_time_current_64();
 
-        if (name != NULL) {
-                lname->ln_name = name;
-                lname->ln_namelen = namelen;
-                /* If name is specified, only create one object by name */
-                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
-                                             spec, ma);
-               RETURN(rc);
-        }
+       if (name != NULL) {
+               lname->ln_name = name;
+               lname->ln_namelen = namelen;
+               /* If name is specified, only create one object by name */
+               rc = echo_md_create_internal(env, ed, lu2md(new_parent), fid,
+                                            lname, spec, ma);
+               GOTO(out_put, rc);
+       }
 
-        /* Create multiple object sequenced by id */
-        for (i = 0; i < count; i++) {
-                char *tmp_name = info->eti_name;
+       /* Create multiple object sequenced by id */
+       for (i = 0; i < count; i++) {
+               char *tmp_name = info->eti_name;
 
-                echo_md_build_name(lname, tmp_name, id);
+               echo_md_build_name(lname, tmp_name, id);
 
-                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
-                                             spec, ma);
-                if (rc) {
-                        CERROR("Can not create child %s: rc = %d\n", tmp_name,
-                                rc);
-                        break;
-                }
-                id++;
-                fid->f_oid++;
-        }
+               rc = echo_md_create_internal(env, ed, lu2md(new_parent),
+                                            fid, lname, spec, ma);
+               if (rc) {
+                       CERROR("Can not create child %s: rc = %d\n", tmp_name,
+                               rc);
+                       break;
+               }
+               id++;
+               fid->f_oid++;
+       }
+
+out_put:
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }
@@ -1658,11 +1789,12 @@ static struct lu_object *echo_md_lookup(const struct lu_env *env,
 
         CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
                PFID(fid), parent);
-        rc = mdo_lookup(env, parent, lname, fid, NULL);
-        if (rc) {
-                CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
-                RETURN(ERR_PTR(rc));
-        }
+
+       rc = mdo_lookup(env, parent, lname, fid, NULL);
+       if (rc) {
+               CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
+               RETURN(ERR_PTR(rc));
+       }
 
        /* In the function below, .hs_keycmp resolves to
         * lu_obj_hop_keycmp() */
@@ -1677,14 +1809,15 @@ static int echo_setattr_object(const struct lu_env *env,
                                struct lu_object *ec_parent,
                                __u64 id, int count)
 {
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct lu_device        *ld = ed->ed_next;
-        struct lu_buf           *buf = &info->eti_buf;
-        int                      rc = 0;
-        int                      i;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       char                    *name = info->eti_name;
+       struct lu_device        *ld = ed->ed_next;
+       struct lu_buf           *buf = &info->eti_buf;
+       int                      rc = 0;
+       int                      i;
 
        ENTRY;
 
@@ -1694,17 +1827,23 @@ static int echo_setattr_object(const struct lu_env *env,
        if (parent == NULL)
                RETURN(-ENXIO);
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
         for (i = 0; i < count; i++) {
                 struct lu_object *ec_child, *child;
 
                 echo_md_build_name(lname, name, id);
 
-                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
-                if (IS_ERR(ec_child)) {
-                        CERROR("Can't find child %s: rc = %ld\n",
-                                lname->ln_name, PTR_ERR(ec_child));
-                        RETURN(PTR_ERR(ec_child));
-                }
+               ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
+               if (IS_ERR(ec_child)) {
+                       rc = PTR_ERR(ec_child);
+                       CERROR("Can't find child %s: rc = %d\n",
+                               lname->ln_name, rc);
+                       break;
+               }
 
                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
                 if (child == NULL) {
@@ -1734,6 +1873,10 @@ static int echo_setattr_object(const struct lu_env *env,
                 id++;
                 lu_object_put(env, ec_child);
         }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
        RETURN(rc);
 }
 
@@ -1742,14 +1885,15 @@ static int echo_getattr_object(const struct lu_env *env,
                                struct lu_object *ec_parent,
                                __u64 id, int count)
 {
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       char                    *name = info->eti_name;
+       struct md_attr          *ma = &info->eti_ma;
+       struct lu_device        *ld = ed->ed_next;
+       int                      rc = 0;
+       int                      i;
 
        ENTRY;
 
@@ -1759,6 +1903,11 @@ static int echo_getattr_object(const struct lu_env *env,
        if (parent == NULL)
                RETURN(-ENXIO);
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
         memset(ma, 0, sizeof(*ma));
         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
         ma->ma_acl = info->eti_xattr_buf;
@@ -1771,12 +1920,12 @@ static int echo_getattr_object(const struct lu_env *env,
                 echo_md_build_name(lname, name, id);
                echo_set_lmm_size(env, ld, ma);
 
-                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
-                if (IS_ERR(ec_child)) {
-                        CERROR("Can't find child %s: rc = %ld\n",
-                               lname->ln_name, PTR_ERR(ec_child));
-                        RETURN(PTR_ERR(ec_child));
-                }
+               ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
+               if (IS_ERR(ec_child)) {
+                       CERROR("Can't find child %s: rc = %ld\n",
+                              lname->ln_name, PTR_ERR(ec_child));
+                       RETURN(PTR_ERR(ec_child));
+               }
 
                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
                 if (child == NULL) {
@@ -1800,6 +1949,9 @@ static int echo_getattr_object(const struct lu_env *env,
                 lu_object_put(env, ec_child);
         }
 
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
        RETURN(rc);
 }
 
@@ -1808,14 +1960,15 @@ static int echo_lookup_object(const struct lu_env *env,
                               struct lu_object *ec_parent,
                               __u64 id, int count)
 {
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct lu_fid           *fid = &info->eti_fid;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       char                    *name = info->eti_name;
+       struct lu_fid           *fid = &info->eti_fid;
+       struct lu_device        *ld = ed->ed_next;
+       int                      rc = 0;
+       int                      i;
 
        if (ec_parent == NULL)
                return -1;
@@ -1823,24 +1976,36 @@ static int echo_lookup_object(const struct lu_env *env,
        if (parent == NULL)
                return -ENXIO;
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
         /*prepare the requests*/
         for (i = 0; i < count; i++) {
-                echo_md_build_name(lname, name, id);
+               echo_md_build_name(lname, name, id);
 
-                CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
-                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+               CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
+                      PFID(lu_object_fid(new_parent)), lname->ln_name,
+                      new_parent);
 
-                rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
-                if (rc) {
-                        CERROR("Can not lookup child %s: rc = %d\n", name, rc);
-                        break;
-                }
-                CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
-                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+               rc = mdo_lookup(env, lu2md(new_parent), lname, fid, NULL);
+               if (rc) {
+                       CERROR("Can not lookup child %s: rc = %d\n", name, rc);
+                       break;
+               }
 
-                id++;
-        }
-        return rc;
+               CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
+                      PFID(lu_object_fid(new_parent)), lname->ln_name,
+                      new_parent);
+
+               id++;
+       }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
+       return rc;
 }
 
 static int echo_md_destroy_internal(const struct lu_env *env,
@@ -1897,19 +2062,25 @@ static int echo_destroy_object(const struct lu_env *env,
                                __u64 id, __u32 mode,
                                int count)
 {
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        struct lu_object        *parent;
-        int                      rc = 0;
-        int                      i;
-        ENTRY;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       struct md_attr          *ma = &info->eti_ma;
+       struct lu_device        *ld = ed->ed_next;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       int                      rc = 0;
+       int                      i;
+       ENTRY;
 
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
         if (parent == NULL)
                 RETURN(-EINVAL);
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
+                                      id, &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
         memset(ma, 0, sizeof(*ma));
         ma->ma_attr.la_mode = mode;
         ma->ma_attr.la_valid = LA_CTIME;
@@ -1920,26 +2091,30 @@ static int echo_destroy_object(const struct lu_env *env,
         if (name != NULL) {
                 lname->ln_name = name;
                 lname->ln_namelen = namelen;
-                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
-                                              ma);
-               RETURN(rc);
-        }
+               rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
+                                             ma);
+               GOTO(out_put, rc);
+       }
 
-        /*prepare the requests*/
-        for (i = 0; i < count; i++) {
-                char *tmp_name = info->eti_name;
+       /*prepare the requests*/
+       for (i = 0; i < count; i++) {
+               char *tmp_name = info->eti_name;
 
-                ma->ma_valid = 0;
-                echo_md_build_name(lname, tmp_name, id);
+               ma->ma_valid = 0;
+               echo_md_build_name(lname, tmp_name, id);
 
-                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
-                                              ma);
-                if (rc) {
-                        CERROR("Can not unlink child %s: rc = %d\n", name, rc);
-                        break;
-                }
-                id++;
-        }
+               rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
+                                             ma);
+               if (rc) {
+                       CERROR("Can not unlink child %s: rc = %d\n", name, rc);
+                       break;
+               }
+               id++;
+       }
+
+out_put:
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }
index a408567..5945711 100644 (file)
@@ -114,15 +114,21 @@ mds_survey_run() {
 }
 
 test_1() {
-    mds_survey_run "mdd" "0"
+       mds_survey_run "mdd" "0"
 }
 run_test 1 "Metadata survey with zero-stripe"
 
 test_2() {
-    if [ $ost_count -eq 0 ]; then
-        skip_env "Need to mount OST to test" && return
-    fi
-    mds_survey_run "mdd" "1"
+       local mdscount=$(get_node_count "$(mdts_nodes)")
+
+       if [ $mdscount -gt 1 ]; then
+               skip_env "Only run this test on single MDS" && return
+       fi
+
+       if [ $ost_count -eq 0 ]; then
+               skip_env "Need to mount OST to test" && return
+       fi
+       mds_survey_run "mdd" "1"
 }
 run_test 2 "Metadata survey with stripe_count = 1"
 
index c9101d4..7c3285f 100755 (executable)
@@ -13209,8 +13209,6 @@ test_225b () {
              skip_env "Need to mount OST to test" && return
        fi
 
-       [ $MDSCOUNT -ge 2 ] &&
-               skip "skipping now for more than one MDT" && return
        local mds=$(facet_host $SINGLEMDS)
        local target=$(do_nodes $mds 'lctl dl' | \
                       awk "{if (\$2 == \"UP\" && \$3 == \"mdt\") {print \$4}}")