Whamcloud - gitweb
LU-9052 lod: accept lfs mkdir from old client
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
index 371e7cb..5f7812e 100644 (file)
@@ -45,6 +45,7 @@
 #include <lprocfs_status.h>
 #include <cl_object.h>
 #include <lustre_fid.h>
+#include <lustre_lmv.h>
 #include <lustre_acl.h>
 #include <uapi/linux/lustre/lustre_ioctl.h>
 #include <lustre_net.h>
@@ -1393,7 +1394,12 @@ static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
 
        LASSERT(ma->ma_lmm_size > 0);
 
-       rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
+       LASSERT(ma->ma_need & (MA_LOV | MA_LMV));
+       if (ma->ma_need & MA_LOV)
+               rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
+       else
+               rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LMV);
+
        if (rc < 0)
                RETURN(rc);
 
@@ -1419,11 +1425,18 @@ static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
 
        info->eti_buf.lb_buf = info->eti_big_lmm;
        info->eti_buf.lb_len = info->eti_big_lmmsize;
-       rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
+       if (ma->ma_need & MA_LOV)
+               rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
+       else
+               rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LMV);
        if (rc < 0)
                RETURN(rc);
 
-       ma->ma_valid |= MA_LOV;
+       if (ma->ma_need & MA_LOV)
+               ma->ma_valid |= MA_LOV;
+       else
+               ma->ma_valid |= MA_LMV;
+
        ma->ma_lmm = info->eti_big_lmm;
        ma->ma_lmm_size = rc;
 
@@ -1474,6 +1487,26 @@ static int echo_attr_get_complex(const struct lu_env *env,
                }
        }
 
+       if (need & MA_LMV && S_ISDIR(mode)) {
+               LASSERT(ma->ma_lmm_size > 0);
+               buf->lb_buf = ma->ma_lmm;
+               buf->lb_len = ma->ma_lmm_size;
+               rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV);
+               if (rc2 > 0) {
+                       ma->ma_lmm_size = rc2;
+                       ma->ma_valid |= MA_LMV;
+               } else if (rc2 == -ENODATA) {
+                       /* no LMV EA */
+                       ma->ma_lmm_size = 0;
+               } else if (rc2 == -ERANGE) {
+                       rc2 = echo_big_lmm_get(env, next, ma);
+                       if (rc2 < 0)
+                               GOTO(out, rc = rc2);
+               } else {
+                       GOTO(out, rc = rc2);
+               }
+       }
+
 #ifdef CONFIG_FS_POSIX_ACL
        if (need & MA_ACL_DEF && S_ISDIR(mode)) {
                buf->lb_buf = ma->ma_acl;
@@ -1569,6 +1602,81 @@ static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
        return 0;
 }
 
+static int
+echo_md_dir_stripe_choose(const struct lu_env *env, struct echo_device *ed,
+                         struct lu_object *obj, const char *name,
+                         unsigned int namelen, __u64 id,
+                         struct lu_object **new_parent)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       struct md_attr          *ma = &info->eti_ma;
+       struct lmv_mds_md_v1    *lmv;
+       struct lu_device        *ld = ed->ed_next;
+       unsigned int            idx;
+       struct lu_name          tmp_ln_name;
+       struct lu_fid           stripe_fid;
+       struct lu_object        *stripe_obj;
+       int                     rc;
+
+       LASSERT(obj != NULL);
+       LASSERT(S_ISDIR(obj->lo_header->loh_attr));
+
+       memset(ma, 0, sizeof(*ma));
+       echo_set_lmm_size(env, ld, ma);
+       ma->ma_need = MA_LMV;
+       rc = echo_attr_get_complex(env, lu2md(obj), ma);
+       if (rc) {
+               CERROR("Can not getattr child "DFID": rc = %d\n",
+                       PFID(lu_object_fid(obj)), rc);
+               return rc;
+       }
+
+       if (!(ma->ma_valid & MA_LMV)) {
+               *new_parent = obj;
+               return 0;
+       }
+
+       lmv = (struct lmv_mds_md_v1 *)ma->ma_lmm;
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) {
+               rc = -EINVAL;
+               CERROR("Invalid mds md magic %x "DFID": rc = %d\n",
+                      le32_to_cpu(lmv->lmv_magic), PFID(lu_object_fid(obj)),
+                      rc);
+               return rc;
+       }
+
+       if (name != NULL) {
+               tmp_ln_name.ln_name = name;
+               tmp_ln_name.ln_namelen = namelen;
+       } else {
+               LASSERT(id != -1);
+               echo_md_build_name(&tmp_ln_name, info->eti_name, id);
+       }
+
+       idx = lmv_name_to_stripe_index(LMV_HASH_TYPE_FNV_1A_64,
+                               le32_to_cpu(lmv->lmv_stripe_count),
+                               tmp_ln_name.ln_name, tmp_ln_name.ln_namelen);
+
+       LASSERT(idx < le32_to_cpu(lmv->lmv_stripe_count));
+       fid_le_to_cpu(&stripe_fid, &lmv->lmv_stripe_fids[idx]);
+
+       stripe_obj = lu_object_find_at(env, ld, &stripe_fid, NULL);
+       if (IS_ERR(stripe_obj)) {
+               rc = PTR_ERR(stripe_obj);
+               CERROR("Can not find the parent "DFID": rc = %d\n",
+                      PFID(&stripe_fid), rc);
+               return rc;
+       }
+
+       *new_parent = lu_object_locate(stripe_obj->lo_header, ld->ld_type);
+       if (*new_parent == NULL) {
+               lu_object_put(env, stripe_obj);
+               RETURN(-ENXIO);
+       }
+
+       return rc;
+}
+
 static int echo_create_md_object(const struct lu_env *env,
                                  struct echo_device *ed,
                                  struct lu_object *ec_parent,
@@ -1577,37 +1685,56 @@ static int echo_create_md_object(const struct lu_env *env,
                                  __u64 id, __u32 mode, int count,
                                  int stripe_count, int stripe_offset)
 {
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        struct md_op_spec       *spec = &info->eti_spec;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       struct md_op_spec       *spec = &info->eti_spec;
+       struct md_attr          *ma = &info->eti_ma;
+       struct lu_device        *ld = ed->ed_next;
+       int                      rc = 0;
+       int                      i;
 
        ENTRY;
 
        if (ec_parent == NULL)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
        if (parent == NULL)
                RETURN(-ENXIO);
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
+                                      id, &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
+       LASSERT(new_parent != NULL);
        memset(ma, 0, sizeof(*ma));
        memset(spec, 0, sizeof(*spec));
+       echo_set_lmm_size(env, ld, ma);
        if (stripe_count != 0) {
                spec->sp_cr_flags |= FMODE_WRITE;
-               echo_set_lmm_size(env, ld, ma);
                if (stripe_count != -1) {
-                       struct lov_user_md_v3 *lum = &info->eti_lum;
-
-                       lum->lmm_magic = LOV_USER_MAGIC_V3;
-                       lum->lmm_stripe_count = stripe_count;
-                       lum->lmm_stripe_offset = stripe_offset;
-                       lum->lmm_pattern = LOV_PATTERN_NONE;
-                       spec->u.sp_ea.eadata = lum;
-                       spec->u.sp_ea.eadatalen = sizeof(*lum);
+                       if (S_ISDIR(mode)) {
+                               struct lmv_user_md *lmu;
+
+                               lmu = (struct lmv_user_md *)&info->eti_lum;
+                               lmu->lum_magic = LMV_USER_MAGIC;
+                               lmu->lum_stripe_offset = stripe_offset;
+                               lmu->lum_stripe_count = stripe_count;
+                               lmu->lum_hash_type = LMV_HASH_TYPE_FNV_1A_64;
+                               spec->u.sp_ea.eadata = lmu;
+                               spec->u.sp_ea.eadatalen = sizeof(*lmu);
+                       } else {
+                               struct lov_user_md_v3 *lum = &info->eti_lum;
+
+                               lum->lmm_magic = LOV_USER_MAGIC_V3;
+                               lum->lmm_stripe_count = stripe_count;
+                               lum->lmm_stripe_offset = stripe_offset;
+                               lum->lmm_pattern = LOV_PATTERN_NONE;
+                               spec->u.sp_ea.eadata = lum;
+                               spec->u.sp_ea.eadatalen = sizeof(*lum);
+                       }
                        spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
                }
        }
@@ -1616,31 +1743,35 @@ static int echo_create_md_object(const struct lu_env *env,
        ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
         ma->ma_attr.la_ctime = cfs_time_current_64();
 
-        if (name != NULL) {
-                lname->ln_name = name;
-                lname->ln_namelen = namelen;
-                /* If name is specified, only create one object by name */
-                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
-                                             spec, ma);
-               RETURN(rc);
-        }
+       if (name != NULL) {
+               lname->ln_name = name;
+               lname->ln_namelen = namelen;
+               /* If name is specified, only create one object by name */
+               rc = echo_md_create_internal(env, ed, lu2md(new_parent), fid,
+                                            lname, spec, ma);
+               GOTO(out_put, rc);
+       }
 
-        /* Create multiple object sequenced by id */
-        for (i = 0; i < count; i++) {
-                char *tmp_name = info->eti_name;
+       /* Create multiple object sequenced by id */
+       for (i = 0; i < count; i++) {
+               char *tmp_name = info->eti_name;
 
-                echo_md_build_name(lname, tmp_name, id);
+               echo_md_build_name(lname, tmp_name, id);
 
-                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
-                                             spec, ma);
-                if (rc) {
-                        CERROR("Can not create child %s: rc = %d\n", tmp_name,
-                                rc);
-                        break;
-                }
-                id++;
-                fid->f_oid++;
-        }
+               rc = echo_md_create_internal(env, ed, lu2md(new_parent),
+                                            fid, lname, spec, ma);
+               if (rc) {
+                       CERROR("Can not create child %s: rc = %d\n", tmp_name,
+                               rc);
+                       break;
+               }
+               id++;
+               fid->f_oid++;
+       }
+
+out_put:
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }
@@ -1658,11 +1789,12 @@ static struct lu_object *echo_md_lookup(const struct lu_env *env,
 
         CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
                PFID(fid), parent);
-        rc = mdo_lookup(env, parent, lname, fid, NULL);
-        if (rc) {
-                CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
-                RETURN(ERR_PTR(rc));
-        }
+
+       rc = mdo_lookup(env, parent, lname, fid, NULL);
+       if (rc) {
+               CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
+               RETURN(ERR_PTR(rc));
+       }
 
        /* In the function below, .hs_keycmp resolves to
         * lu_obj_hop_keycmp() */
@@ -1677,14 +1809,15 @@ static int echo_setattr_object(const struct lu_env *env,
                                struct lu_object *ec_parent,
                                __u64 id, int count)
 {
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct lu_device        *ld = ed->ed_next;
-        struct lu_buf           *buf = &info->eti_buf;
-        int                      rc = 0;
-        int                      i;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       char                    *name = info->eti_name;
+       struct lu_device        *ld = ed->ed_next;
+       struct lu_buf           *buf = &info->eti_buf;
+       int                      rc = 0;
+       int                      i;
 
        ENTRY;
 
@@ -1694,17 +1827,23 @@ static int echo_setattr_object(const struct lu_env *env,
        if (parent == NULL)
                RETURN(-ENXIO);
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
         for (i = 0; i < count; i++) {
                 struct lu_object *ec_child, *child;
 
                 echo_md_build_name(lname, name, id);
 
-                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
-                if (IS_ERR(ec_child)) {
-                        CERROR("Can't find child %s: rc = %ld\n",
-                                lname->ln_name, PTR_ERR(ec_child));
-                        RETURN(PTR_ERR(ec_child));
-                }
+               ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
+               if (IS_ERR(ec_child)) {
+                       rc = PTR_ERR(ec_child);
+                       CERROR("Can't find child %s: rc = %d\n",
+                               lname->ln_name, rc);
+                       break;
+               }
 
                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
                 if (child == NULL) {
@@ -1734,6 +1873,10 @@ static int echo_setattr_object(const struct lu_env *env,
                 id++;
                 lu_object_put(env, ec_child);
         }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
        RETURN(rc);
 }
 
@@ -1742,14 +1885,15 @@ static int echo_getattr_object(const struct lu_env *env,
                                struct lu_object *ec_parent,
                                __u64 id, int count)
 {
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       char                    *name = info->eti_name;
+       struct md_attr          *ma = &info->eti_ma;
+       struct lu_device        *ld = ed->ed_next;
+       int                      rc = 0;
+       int                      i;
 
        ENTRY;
 
@@ -1759,6 +1903,11 @@ static int echo_getattr_object(const struct lu_env *env,
        if (parent == NULL)
                RETURN(-ENXIO);
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
         memset(ma, 0, sizeof(*ma));
         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
         ma->ma_acl = info->eti_xattr_buf;
@@ -1771,12 +1920,12 @@ static int echo_getattr_object(const struct lu_env *env,
                 echo_md_build_name(lname, name, id);
                echo_set_lmm_size(env, ld, ma);
 
-                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
-                if (IS_ERR(ec_child)) {
-                        CERROR("Can't find child %s: rc = %ld\n",
-                               lname->ln_name, PTR_ERR(ec_child));
-                        RETURN(PTR_ERR(ec_child));
-                }
+               ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
+               if (IS_ERR(ec_child)) {
+                       CERROR("Can't find child %s: rc = %ld\n",
+                              lname->ln_name, PTR_ERR(ec_child));
+                       RETURN(PTR_ERR(ec_child));
+               }
 
                 child = lu_object_locate(ec_child->lo_header, ld->ld_type);
                 if (child == NULL) {
@@ -1800,6 +1949,9 @@ static int echo_getattr_object(const struct lu_env *env,
                 lu_object_put(env, ec_child);
         }
 
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
        RETURN(rc);
 }
 
@@ -1808,14 +1960,15 @@ static int echo_lookup_object(const struct lu_env *env,
                               struct lu_object *ec_parent,
                               __u64 id, int count)
 {
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct lu_fid           *fid = &info->eti_fid;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       char                    *name = info->eti_name;
+       struct lu_fid           *fid = &info->eti_fid;
+       struct lu_device        *ld = ed->ed_next;
+       int                      rc = 0;
+       int                      i;
 
        if (ec_parent == NULL)
                return -1;
@@ -1823,24 +1976,36 @@ static int echo_lookup_object(const struct lu_env *env,
        if (parent == NULL)
                return -ENXIO;
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
         /*prepare the requests*/
         for (i = 0; i < count; i++) {
-                echo_md_build_name(lname, name, id);
+               echo_md_build_name(lname, name, id);
 
-                CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
-                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+               CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
+                      PFID(lu_object_fid(new_parent)), lname->ln_name,
+                      new_parent);
 
-                rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
-                if (rc) {
-                        CERROR("Can not lookup child %s: rc = %d\n", name, rc);
-                        break;
-                }
-                CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
-                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+               rc = mdo_lookup(env, lu2md(new_parent), lname, fid, NULL);
+               if (rc) {
+                       CERROR("Can not lookup child %s: rc = %d\n", name, rc);
+                       break;
+               }
 
-                id++;
-        }
-        return rc;
+               CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
+                      PFID(lu_object_fid(new_parent)), lname->ln_name,
+                      new_parent);
+
+               id++;
+       }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
+       return rc;
 }
 
 static int echo_md_destroy_internal(const struct lu_env *env,
@@ -1897,19 +2062,25 @@ static int echo_destroy_object(const struct lu_env *env,
                                __u64 id, __u32 mode,
                                int count)
 {
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        struct lu_object        *parent;
-        int                      rc = 0;
-        int                      i;
-        ENTRY;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       struct md_attr          *ma = &info->eti_ma;
+       struct lu_device        *ld = ed->ed_next;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       int                      rc = 0;
+       int                      i;
+       ENTRY;
 
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
         if (parent == NULL)
                 RETURN(-EINVAL);
 
+       rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
+                                      id, &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
         memset(ma, 0, sizeof(*ma));
         ma->ma_attr.la_mode = mode;
         ma->ma_attr.la_valid = LA_CTIME;
@@ -1920,26 +2091,30 @@ static int echo_destroy_object(const struct lu_env *env,
         if (name != NULL) {
                 lname->ln_name = name;
                 lname->ln_namelen = namelen;
-                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
-                                              ma);
-               RETURN(rc);
-        }
+               rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
+                                             ma);
+               GOTO(out_put, rc);
+       }
 
-        /*prepare the requests*/
-        for (i = 0; i < count; i++) {
-                char *tmp_name = info->eti_name;
+       /*prepare the requests*/
+       for (i = 0; i < count; i++) {
+               char *tmp_name = info->eti_name;
 
-                ma->ma_valid = 0;
-                echo_md_build_name(lname, tmp_name, id);
+               ma->ma_valid = 0;
+               echo_md_build_name(lname, tmp_name, id);
 
-                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
-                                              ma);
-                if (rc) {
-                        CERROR("Can not unlink child %s: rc = %d\n", name, rc);
-                        break;
-                }
-                id++;
-        }
+               rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
+                                             ma);
+               if (rc) {
+                       CERROR("Can not unlink child %s: rc = %d\n", name, rc);
+                       break;
+               }
+               id++;
+       }
+
+out_put:
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }