Whamcloud - gitweb
LU-1943 echo: echo_client to get attributes like MDT does
authorMikhail Pershin <tappro@whamcloud.com>
Wed, 3 Oct 2012 07:18:41 +0000 (11:18 +0400)
committerOleg Drokin <green@whamcloud.com>
Wed, 3 Oct 2012 22:51:39 +0000 (18:51 -0400)
with changes in MDT/MDD the echo_client start working not-correctly,
it uses always 0-stiped files because gets mdsize == 0 from MDD.

- All attributes should be taken explicitely from MDD. Introduce
echo_attr_get_complex() and use it.
- don't use md_getsize_max(), it is not supported anymore.
- fix error handling issues
- remove obsoleted code

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: I6f1277018d4b8716aa2e799e99aea6402c04b605
Reviewed-on: http://review.whamcloud.com/4164
Tested-by: Hudson
Reviewed-by: wangdi <di.wang@whamcloud.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre/lustre_idl.h
lustre/obdecho/echo_client.c

index 67a414e..4705c1d 100644 (file)
@@ -2106,7 +2106,6 @@ enum {
         MDS_CLOSE_CLEANUP = 1 << 6,
         MDS_KEEP_ORPHAN   = 1 << 7,
         MDS_RECOV_OPEN    = 1 << 8,
         MDS_CLOSE_CLEANUP = 1 << 6,
         MDS_KEEP_ORPHAN   = 1 << 7,
         MDS_RECOV_OPEN    = 1 << 8,
-        MDS_UNLINK_DESTROY = 1 << 9,  /* Destory ost object in mdd_unlink */
 };
 
 /* instance of mdt_reint_rec */
 };
 
 /* instance of mdt_reint_rec */
index b51d57e..f1901d8 100644 (file)
@@ -209,6 +209,9 @@ struct echo_thread_info {
         struct lov_user_md_v3   eti_lum;
         struct md_attr          eti_ma;
         struct lu_name          eti_lname;
         struct lov_user_md_v3   eti_lum;
         struct md_attr          eti_ma;
         struct lu_name          eti_lname;
+       /* per-thread values, can be re-used */
+       void                    *eti_big_lmm;
+       int                     eti_big_lmmsize;
         char                    eti_name[20];
         struct lu_buf           eti_buf;
         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
         char                    eti_name[20];
         struct lu_buf           eti_buf;
         char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
@@ -1473,6 +1476,120 @@ static inline void echo_md_build_name(struct lu_name *lname, char *name,
        lname->ln_namelen = strlen(name);
 }
 
        lname->ln_namelen = strlen(name);
 }
 
+/* similar to mdt_attr_get_complex */
+static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
+                           struct md_attr *ma)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       int                      rc;
+
+       ENTRY;
+
+       LASSERT(ma->ma_lmm_size > 0);
+
+       rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
+       if (rc < 0)
+               RETURN(rc);
+
+       /* big_lmm may need to be grown */
+       if (info->eti_big_lmmsize < rc) {
+               int size = size_roundup_power2(rc);
+
+               if (info->eti_big_lmmsize > 0) {
+                       /* free old buffer */
+                       LASSERT(info->eti_big_lmm);
+                       OBD_FREE_LARGE(info->eti_big_lmm,
+                                      info->eti_big_lmmsize);
+                       info->eti_big_lmm = NULL;
+                       info->eti_big_lmmsize = 0;
+               }
+
+               OBD_ALLOC_LARGE(info->eti_big_lmm, size);
+               if (info->eti_big_lmm == NULL)
+                       RETURN(-ENOMEM);
+               info->eti_big_lmmsize = size;
+       }
+       LASSERT(info->eti_big_lmmsize >= rc);
+
+       info->eti_buf.lb_buf = info->eti_big_lmm;
+       info->eti_buf.lb_len = info->eti_big_lmmsize;
+       rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
+       if (rc < 0)
+               RETURN(rc);
+
+       ma->ma_valid |= MA_LOV;
+       ma->ma_lmm = info->eti_big_lmm;
+       ma->ma_lmm_size = rc;
+
+       RETURN(0);
+}
+
+int echo_attr_get_complex(const struct lu_env *env, struct md_object *next,
+                         struct md_attr *ma)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_buf           *buf = &info->eti_buf;
+       cfs_umode_t              mode = lu_object_attr(&next->mo_lu);
+       int                      need = ma->ma_need;
+       int                      rc = 0, rc2;
+
+       ENTRY;
+
+       ma->ma_valid = 0;
+
+       if (need & MA_INODE) {
+               ma->ma_need = MA_INODE;
+               rc = mo_attr_get(env, next, ma);
+               if (rc)
+                       GOTO(out, rc);
+               ma->ma_valid |= MA_INODE;
+       }
+
+       if (need & MA_LOV) {
+               if (S_ISREG(mode) || S_ISDIR(mode)) {
+                       LASSERT(ma->ma_lmm_size > 0);
+                       buf->lb_buf = ma->ma_lmm;
+                       buf->lb_len = ma->ma_lmm_size;
+                       rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
+                       if (rc2 > 0) {
+                               ma->ma_lmm_size = rc2;
+                               ma->ma_valid |= MA_LOV;
+                       } else if (rc2 == -ENODATA) {
+                               /* no LOV EA */
+                               ma->ma_lmm_size = 0;
+                       } else if (rc2 == -ERANGE) {
+                               rc2 = echo_big_lmm_get(env, next, ma);
+                               if (rc2 < 0)
+                                       GOTO(out, rc = rc2);
+                       } else {
+                               GOTO(out, rc = rc2);
+                       }
+               }
+       }
+
+#ifdef CONFIG_FS_POSIX_ACL
+       if (need & MA_ACL_DEF && S_ISDIR(mode)) {
+               buf->lb_buf = ma->ma_acl;
+               buf->lb_len = ma->ma_acl_size;
+               rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
+               if (rc2 > 0) {
+                       ma->ma_acl_size = rc2;
+                       ma->ma_valid |= MA_ACL_DEF;
+               } else if (rc2 == -ENODATA) {
+                       /* no ACLs */
+                       ma->ma_acl_size = 0;
+               } else {
+                       GOTO(out, rc = rc2);
+               }
+       }
+#endif
+out:
+       ma->ma_need = need;
+       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
+              rc, ma->ma_valid, ma->ma_lmm);
+       RETURN(rc);
+}
+
 static int
 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
                        struct md_object *parent, struct lu_fid *fid,
 static int
 echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
                        struct md_object *parent, struct lu_fid *fid,
@@ -1486,6 +1603,8 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
        struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
        int                      rc;
 
        struct lu_object_conf    conf = { .loc_flags = LOC_F_NEW };
        int                      rc;
 
+       ENTRY;
+
        rc = mdo_lookup(env, parent, lname, fid2, spec);
        if (rc == 0)
                return -EEXIST;
        rc = mdo_lookup(env, parent, lname, fid2, spec);
        if (rc == 0)
                return -EEXIST;
@@ -1497,7 +1616,7 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
         if (IS_ERR(ec_child)) {
                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
                         PTR_ERR(ec_child));
         if (IS_ERR(ec_child)) {
                 CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
                         PTR_ERR(ec_child));
-                return PTR_ERR(ec_child);
+               RETURN(PTR_ERR(ec_child));
         }
 
         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
         }
 
         child = lu_object_locate(ec_child->lo_header, ld->ld_type);
@@ -1520,45 +1639,27 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
         }
         CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
         }
         CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
+       EXIT;
 out_put:
         lu_object_put(env, ec_child);
         return rc;
 }
 
 out_put:
         lu_object_put(env, ec_child);
         return rc;
 }
 
-static int echo_set_lmm_size(const struct lu_env *env,
-                            struct lu_device *ld,
-                            struct md_attr *ma, int *lmm_size,
-                            int *cookie_size)
+static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
+                            struct md_attr *ma)
 {
 {
-       struct md_device *md = lu2md_dev(ld);
-       int               rc;
-       ENTRY;
-
-       md = lu2md_dev(ld);
-       rc = md->md_ops->mdo_maxsize_get(env, md,
-                                        lmm_size, cookie_size);
-       if (rc)
-               RETURN(rc);
-
-       ma->ma_lmm_size = *lmm_size;
-       if (*lmm_size > 0) {
-               OBD_ALLOC(ma->ma_lmm, *lmm_size);
-               if (ma->ma_lmm == NULL) {
-                       ma->ma_lmm_size = 0;
-                       RETURN(-ENOMEM);
-               }
-       }
+       struct echo_thread_info *info = echo_env_info(env);
 
 
-       ma->ma_cookie_size = *cookie_size;
-       if (*cookie_size > 0) {
-               OBD_ALLOC(ma->ma_cookie, *cookie_size);
-               if (ma->ma_cookie == NULL) {
-                       ma->ma_cookie_size = 0;
-                       RETURN(-ENOMEM);
-               }
+       if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
+               ma->ma_lmm = (void *)&info->eti_lmm;
+               ma->ma_lmm_size = sizeof(info->eti_lmm);
+       } else {
+               LASSERT(info->eti_big_lmmsize);
+               ma->ma_lmm = info->eti_big_lmm;
+               ma->ma_lmm_size = info->eti_big_lmmsize;
        }
 
        }
 
-       RETURN(0);
+       return 0;
 }
 
 static int echo_create_md_object(const struct lu_env *env,
 }
 
 static int echo_create_md_object(const struct lu_env *env,
@@ -1576,25 +1677,22 @@ static int echo_create_md_object(const struct lu_env *env,
         struct md_attr          *ma = &info->eti_ma;
         struct lu_device        *ld = ed->ed_next;
         int                      rc = 0;
         struct md_attr          *ma = &info->eti_ma;
         struct lu_device        *ld = ed->ed_next;
         int                      rc = 0;
-       int                      lmm_size = 0;
-       int                      cookie_size = 0;
         int                      i;
 
         int                      i;
 
+       ENTRY;
+
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-        if (ec_parent == NULL) {
-                lu_object_put(env, ec_parent);
-                RETURN(PTR_ERR(parent));
-        }
+       if (parent == NULL)
+               RETURN(-ENXIO);
 
         memset(ma, 0, sizeof(*ma));
         memset(spec, 0, sizeof(*spec));
         if (stripe_count != 0) {
                 spec->sp_cr_flags |= FMODE_WRITE;
 
         memset(ma, 0, sizeof(*ma));
         memset(spec, 0, sizeof(*spec));
         if (stripe_count != 0) {
                 spec->sp_cr_flags |= FMODE_WRITE;
-                rc = echo_set_lmm_size(env, ld, ma, &lmm_size, &cookie_size);
-                if (rc)
-                        GOTO(out_free, rc);
+               echo_set_lmm_size(env, ld, ma);
                 if (stripe_count != -1) {
                         struct lov_user_md_v3 *lum = &info->eti_lum;
                 if (stripe_count != -1) {
                         struct lov_user_md_v3 *lum = &info->eti_lum;
+
                         lum->lmm_magic = LOV_USER_MAGIC_V3;
                         lum->lmm_stripe_count = stripe_count;
                         lum->lmm_stripe_offset = stripe_offset;
                         lum->lmm_magic = LOV_USER_MAGIC_V3;
                         lum->lmm_stripe_count = stripe_count;
                         lum->lmm_stripe_offset = stripe_offset;
@@ -1615,7 +1713,7 @@ static int echo_create_md_object(const struct lu_env *env,
                 /* If name is specified, only create one object by name */
                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
                                              spec, ma);
                 /* If name is specified, only create one object by name */
                 rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
                                              spec, ma);
-                GOTO(out_free, rc);
+               RETURN(rc);
         }
 
         /* Create multiple object sequenced by id */
         }
 
         /* Create multiple object sequenced by id */
@@ -1635,13 +1733,7 @@ static int echo_create_md_object(const struct lu_env *env,
                 fid->f_oid++;
         }
 
                 fid->f_oid++;
         }
 
-out_free:
-       if (lmm_size > 0 && ma->ma_lmm != NULL)
-               OBD_FREE(ma->ma_lmm, lmm_size);
-       if (cookie_size > 0 && ma->ma_cookie != NULL)
-               OBD_FREE(ma->ma_cookie, cookie_size);
-
-       return rc;
+       RETURN(rc);
 }
 
 static struct lu_object *echo_md_lookup(const struct lu_env *env,
 }
 
 static struct lu_object *echo_md_lookup(const struct lu_env *env,
@@ -1682,14 +1774,12 @@ static int echo_setattr_object(const struct lu_env *env,
         int                      rc = 0;
         int                      i;
 
         int                      rc = 0;
         int                      i;
 
+       ENTRY;
+
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-        if (ec_parent == NULL) {
-                lu_object_put(env, ec_parent);
-                return PTR_ERR(parent);
-        }
+       if (parent == NULL)
+               RETURN(-ENXIO);
 
 
-        buf->lb_buf = info->eti_xattr_buf;
-        buf->lb_len = sizeof(info->eti_xattr_buf);
         for (i = 0; i < count; i++) {
                 struct lu_object *ec_child, *child;
 
         for (i = 0; i < count; i++) {
                 struct lu_object *ec_child, *child;
 
@@ -1713,10 +1803,13 @@ static int echo_setattr_object(const struct lu_env *env,
                 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
                        PFID(lu_object_fid(child)));
 
                 CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
                        PFID(lu_object_fid(child)));
 
+               buf->lb_buf = info->eti_xattr_buf;
+               buf->lb_len = sizeof(info->eti_xattr_buf);
+
                 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
                 rc = mo_xattr_set(env, lu2md(child), buf, name,
                                   LU_XATTR_CREATE);
                 sprintf(name, "%s.test1", XATTR_USER_PREFIX);
                 rc = mo_xattr_set(env, lu2md(child), buf, name,
                                   LU_XATTR_CREATE);
-                if (rc) {
+               if (rc < 0) {
                         CERROR("Can not setattr child "DFID": rc = %d\n",
                                 PFID(lu_object_fid(child)), rc);
                         lu_object_put(env, ec_child);
                         CERROR("Can not setattr child "DFID": rc = %d\n",
                                 PFID(lu_object_fid(child)), rc);
                         lu_object_put(env, ec_child);
@@ -1727,7 +1820,7 @@ static int echo_setattr_object(const struct lu_env *env,
                 id++;
                 lu_object_put(env, ec_child);
         }
                 id++;
                 lu_object_put(env, ec_child);
         }
-        return rc;
+       RETURN(rc);
 }
 
 static int echo_getattr_object(const struct lu_env *env,
 }
 
 static int echo_getattr_object(const struct lu_env *env,
@@ -1742,21 +1835,15 @@ static int echo_getattr_object(const struct lu_env *env,
         struct md_attr          *ma = &info->eti_ma;
         struct lu_device        *ld = ed->ed_next;
         int                      rc = 0;
         struct md_attr          *ma = &info->eti_ma;
         struct lu_device        *ld = ed->ed_next;
         int                      rc = 0;
-       int                      lmm_size = 0;
-       int                      cookie_size = 0;
         int                      i;
 
         int                      i;
 
+       ENTRY;
+
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-        if (ec_parent == NULL) {
-                lu_object_put(env, ec_parent);
-                return PTR_ERR(parent);
-        }
+       if (parent == NULL)
+               RETURN(-ENXIO);
 
         memset(ma, 0, sizeof(*ma));
 
         memset(ma, 0, sizeof(*ma));
-       rc = echo_set_lmm_size(env, ld, ma, &lmm_size, &cookie_size);
-       if (rc)
-               GOTO(out_free, rc);
-
         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
         ma->ma_acl = info->eti_xattr_buf;
         ma->ma_acl_size = sizeof(info->eti_xattr_buf);
         ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
         ma->ma_acl = info->eti_xattr_buf;
         ma->ma_acl_size = sizeof(info->eti_xattr_buf);
@@ -1766,6 +1853,7 @@ static int echo_getattr_object(const struct lu_env *env,
 
                 ma->ma_valid = 0;
                 echo_md_build_name(lname, name, id);
 
                 ma->ma_valid = 0;
                 echo_md_build_name(lname, name, id);
+               echo_set_lmm_size(env, ld, ma);
 
                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
                 if (IS_ERR(ec_child)) {
 
                 ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
                 if (IS_ERR(ec_child)) {
@@ -1778,12 +1866,12 @@ static int echo_getattr_object(const struct lu_env *env,
                 if (child == NULL) {
                         CERROR("Can not locate the child %s\n", lname->ln_name);
                         lu_object_put(env, ec_child);
                 if (child == NULL) {
                         CERROR("Can not locate the child %s\n", lname->ln_name);
                         lu_object_put(env, ec_child);
-                        GOTO(out_free, rc = -EINVAL);
+                       RETURN(-EINVAL);
                 }
 
                 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
                        PFID(lu_object_fid(child)));
                 }
 
                 CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
                        PFID(lu_object_fid(child)));
-                rc = mo_attr_get(env, lu2md(child), ma);
+               rc = echo_attr_get_complex(env, lu2md(child), ma);
                 if (rc) {
                         CERROR("Can not getattr child "DFID": rc = %d\n",
                                 PFID(lu_object_fid(child)), rc);
                 if (rc) {
                         CERROR("Can not getattr child "DFID": rc = %d\n",
                                 PFID(lu_object_fid(child)), rc);
@@ -1796,12 +1884,7 @@ static int echo_getattr_object(const struct lu_env *env,
                 lu_object_put(env, ec_child);
         }
 
                 lu_object_put(env, ec_child);
         }
 
-out_free:
-       if (lmm_size > 0 && ma->ma_lmm != NULL)
-               OBD_FREE(ma->ma_lmm, lmm_size);
-       if (cookie_size > 0 && ma->ma_cookie != NULL)
-               OBD_FREE(ma->ma_cookie, cookie_size);
-       return rc;
+       RETURN(rc);
 }
 
 static int echo_lookup_object(const struct lu_env *env,
 }
 
 static int echo_lookup_object(const struct lu_env *env,
@@ -1819,10 +1902,8 @@ static int echo_lookup_object(const struct lu_env *env,
         int                      i;
 
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
         int                      i;
 
         parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-        if (ec_parent == NULL) {
-                lu_object_put(env, ec_parent);
-                return PTR_ERR(parent);
-        }
+       if (parent == NULL)
+               return -ENXIO;
 
         /*prepare the requests*/
         for (i = 0; i < count; i++) {
 
         /*prepare the requests*/
         for (i = 0; i < count; i++) {
@@ -1855,6 +1936,8 @@ static int echo_md_destroy_internal(const struct lu_env *env,
         struct lu_object   *child;
         int                 rc;
 
         struct lu_object   *child;
         int                 rc;
 
+       ENTRY;
+
         ec_child = echo_md_lookup(env, ed, parent, lname);
         if (IS_ERR(ec_child)) {
                 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
         ec_child = echo_md_lookup(env, ed, parent, lname);
         if (IS_ERR(ec_child)) {
                 CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
@@ -1897,8 +1980,6 @@ static int echo_destroy_object(const struct lu_env *env,
         struct lu_device        *ld = ed->ed_next;
         struct lu_object        *parent;
         int                      rc = 0;
         struct lu_device        *ld = ed->ed_next;
         struct lu_object        *parent;
         int                      rc = 0;
-       int                      lmm_size = 0;
-       int                      cookie_size = 0;
         int                      i;
         ENTRY;
 
         int                      i;
         ENTRY;
 
@@ -1913,22 +1994,18 @@ static int echo_destroy_object(const struct lu_env *env,
         ma->ma_need = MA_INODE;
         ma->ma_valid = 0;
 
         ma->ma_need = MA_INODE;
         ma->ma_valid = 0;
 
-        rc = echo_set_lmm_size(env, ld, ma, &lmm_size, &cookie_size);
-        if (rc)
-                GOTO(out_free, rc);
         if (name != NULL) {
                 lname->ln_name = name;
                 lname->ln_namelen = namelen;
                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
                                               ma);
         if (name != NULL) {
                 lname->ln_name = name;
                 lname->ln_namelen = namelen;
                 rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
                                               ma);
-                GOTO(out_free, rc);
+               RETURN(rc);
         }
 
         /*prepare the requests*/
         for (i = 0; i < count; i++) {
                 char *tmp_name = info->eti_name;
 
         }
 
         /*prepare the requests*/
         for (i = 0; i < count; i++) {
                 char *tmp_name = info->eti_name;
 
-                ma->ma_need |= MA_LOV;
                 ma->ma_valid = 0;
                 echo_md_build_name(lname, tmp_name, id);
 
                 ma->ma_valid = 0;
                 echo_md_build_name(lname, tmp_name, id);
 
@@ -1941,11 +2018,6 @@ static int echo_destroy_object(const struct lu_env *env,
                 id++;
         }
 
                 id++;
         }
 
-out_free:
-       if (lmm_size > 0 && ma->ma_lmm != NULL)
-               OBD_FREE(ma->ma_lmm, lmm_size);
-       if (cookie_size > 0 && ma->ma_cookie != NULL)
-               OBD_FREE(ma->ma_cookie, cookie_size);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -2024,6 +2096,7 @@ static int echo_md_handler(struct echo_device *ed, int command,
                            char *path, int path_len, int id, int count,
                            struct obd_ioctl_data *data)
 {
                            char *path, int path_len, int id, int count,
                            struct obd_ioctl_data *data)
 {
+       struct echo_thread_info *info;
         struct lu_device      *ld = ed->ed_next;
         struct lu_env         *env;
         int                    refcheck;
         struct lu_device      *ld = ed->ed_next;
         struct lu_env         *env;
         int                    refcheck;
@@ -2048,27 +2121,30 @@ static int echo_md_handler(struct echo_device *ed, int command,
                 RETURN(PTR_ERR(env));
 
         rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
                 RETURN(PTR_ERR(env));
 
         rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
-        if (rc != 0) {
-                cl_env_put(env, &refcheck);
-                RETURN(rc);
-        }
+       if (rc != 0)
+               GOTO(out_env, rc);
+
+       /* init big_lmm buffer */
+       info = echo_env_info(env);
+       LASSERT(info->eti_big_lmm == NULL);
+       OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
+       if (info->eti_big_lmm == NULL)
+               GOTO(out_env, rc = -ENOMEM);
+       info->eti_big_lmmsize = MIN_MD_SIZE;
 
         parent = echo_resolve_path(env, ed, path, path_len);
         if (IS_ERR(parent)) {
                 CERROR("Can not resolve the path %s: rc = %ld\n", path,
                         PTR_ERR(parent));
 
         parent = echo_resolve_path(env, ed, path, path_len);
         if (IS_ERR(parent)) {
                 CERROR("Can not resolve the path %s: rc = %ld\n", path,
                         PTR_ERR(parent));
-                cl_env_put(env, &refcheck);
-                RETURN(PTR_ERR(parent));
+               GOTO(out_free, PTR_ERR(parent));
         }
 
         if (namelen > 0) {
                 OBD_ALLOC(name, namelen + 1);
                 if (name == NULL)
         }
 
         if (namelen > 0) {
                 OBD_ALLOC(name, namelen + 1);
                 if (name == NULL)
-                        RETURN(-ENOMEM);
-                if (cfs_copy_from_user(name, data->ioc_pbuf2, namelen)) {
-                        OBD_FREE(name, namelen + 1);
-                        RETURN(-EFAULT);
-                }
+                       GOTO(out_put, rc = -ENOMEM);
+               if (cfs_copy_from_user(name, data->ioc_pbuf2, namelen))
+                       GOTO(out_name, rc = -EFAULT);
         }
 
         switch (command) {
         }
 
         switch (command) {
@@ -2113,9 +2189,17 @@ static int echo_md_handler(struct echo_device *ed, int command,
                 rc = -EINVAL;
                 break;
         }
                 rc = -EINVAL;
                 break;
         }
+out_name:
         if (name != NULL)
                 OBD_FREE(name, namelen + 1);
         if (name != NULL)
                 OBD_FREE(name, namelen + 1);
+out_put:
         lu_object_put(env, parent);
         lu_object_put(env, parent);
+out_free:
+       LASSERT(info->eti_big_lmm);
+       OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
+       info->eti_big_lmm = NULL;
+       info->eti_big_lmmsize = 0;
+out_env:
         cl_env_put(env, &refcheck);
         return rc;
 }
         cl_env_put(env, &refcheck);
         return rc;
 }