Whamcloud - gitweb
LU-11997 ptlrpc: Properly swab ll_fiemap_info_key
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
index 1dc8e63..07ed6b7 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  */
 
 #define DEBUG_SUBSYSTEM S_ECHO
-#ifdef __KERNEL__
-#include <libcfs/libcfs.h>
-#else
-#include <liblustre.h>
+
+#include <linux/user_namespace.h>
+#ifdef HAVE_UIDGID_HEADER
+# include <linux/uidgid.h>
 #endif
+#include <libcfs/libcfs.h>
 
 #include <obd.h>
 #include <obd_support.h>
 #include <lustre_debug.h>
 #include <lprocfs_status.h>
 #include <cl_object.h>
-#include <md_object.h>
 #include <lustre_fid.h>
+#include <lustre_lmv.h>
 #include <lustre_acl.h>
+#include <uapi/linux/lustre/lustre_ioctl.h>
 #include <lustre_net.h>
+#ifdef HAVE_SERVER_SUPPORT
+# include <md_object.h>
+
+#define ETI_NAME_LEN   20
+
+#endif /* HAVE_SERVER_SUPPORT */
 
 #include "echo_internal.h"
 
  * @{
  */
 
+/* echo thread key have a CL_THREAD flag, which set cl_env function directly */
+#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
+#define ECHO_DT_CTX_TAG (LCT_REMEMBER | LCT_DT_THREAD)
+#define ECHO_SES_TAG    (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
+
 struct echo_device {
-        struct cl_device        ed_cl;
-        struct echo_client_obd *ed_ec;
-
-        struct cl_site          ed_site_myself;
-        struct cl_site         *ed_site;
-        struct lu_device       *ed_next;
-        int                     ed_next_islov;
-        int                     ed_next_ismd;
-        struct lu_client_seq   *ed_cl_seq;
+       struct cl_device          ed_cl;
+       struct echo_client_obd   *ed_ec;
+
+       struct cl_site            ed_site_myself;
+       struct lu_site           *ed_site;
+       struct lu_device         *ed_next;
+       int                       ed_next_ismd;
+       struct lu_client_seq     *ed_cl_seq;
+#ifdef HAVE_SERVER_SUPPORT
+       struct local_oid_storage *ed_los;
+       struct lu_fid             ed_root_fid;
+#endif /* HAVE_SERVER_SUPPORT */
 };
 
 struct echo_object {
-        struct cl_object        eo_cl;
-        struct cl_object_header eo_hdr;
-
-        struct echo_device     *eo_dev;
-        cfs_list_t              eo_obj_chain;
-        struct lov_stripe_md   *eo_lsm;
-       atomic_t            eo_npages;
-        int                     eo_deleted;
+       struct cl_object        eo_cl;
+       struct cl_object_header eo_hdr;
+       struct echo_device     *eo_dev;
+       struct list_head        eo_obj_chain;
+       struct lov_oinfo       *eo_oinfo;
+       atomic_t                eo_npages;
+       int                     eo_deleted;
 };
 
 struct echo_object_conf {
-        struct cl_object_conf  eoc_cl;
-        struct lov_stripe_md **eoc_md;
+       struct cl_object_conf   eoc_cl;
+       struct lov_oinfo      **eoc_oinfo;
 };
 
 struct echo_page {
        struct cl_page_slice    ep_cl;
-       struct mutex            ep_lock;
+       unsigned long           ep_lock;
 };
 
 struct echo_lock {
-        struct cl_lock_slice   el_cl;
-        cfs_list_t             el_chain;
-        struct echo_object    *el_object;
-        __u64                  el_cookie;
-       atomic_t           el_refcount;
+       struct cl_lock_slice    el_cl;
+       struct list_head        el_chain;
+       struct echo_object     *el_object;
+       __u64                   el_cookie;
+       atomic_t                el_refcount;
+};
+
+#ifdef HAVE_SERVER_SUPPORT
+static const char echo_md_root_dir_name[] = "ROOT_ECHO";
+
+/**
+ * In order to use the values of members in struct mdd_device,
+ * we define an alias structure here.
+ */
+struct echo_md_device {
+       struct md_device                 emd_md_dev;
+       struct obd_export               *emd_child_exp;
+       struct dt_device                *emd_child;
+       struct dt_device                *emd_bottom;
+       struct lu_fid                    emd_root_fid;
+       struct lu_fid                    emd_local_root_fid;
 };
+#endif /* HAVE_SERVER_SUPPORT */
 
 static int echo_client_setup(const struct lu_env *env,
-                             struct obd_device *obddev,
-                             struct lustre_cfg *lcfg);
+                            struct obd_device *obddev,
+                            struct lustre_cfg *lcfg);
 static int echo_client_cleanup(struct obd_device *obddev);
 
-
 /** \defgroup echo_helpers Helper functions
  * @{
  */
 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
 {
-        return container_of0(dev, struct echo_device, ed_cl);
+       return container_of0(dev, struct echo_device, ed_cl);
 }
 
 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
 {
-        return &d->ed_cl;
+       return &d->ed_cl;
 }
 
 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
 {
-        return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
+       return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
 }
 
 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
 {
-        return &eco->eo_cl;
+       return &eco->eo_cl;
 }
 
 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
 {
-        return container_of(o, struct echo_object, eo_cl);
+       return container_of(o, struct echo_object, eo_cl);
 }
 
 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
 {
-        return container_of(s, struct echo_page, ep_cl);
+       return container_of(s, struct echo_page, ep_cl);
 }
 
 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
 {
-        return container_of(s, struct echo_lock, el_cl);
+       return container_of(s, struct echo_lock, el_cl);
 }
 
 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
 {
-        return ecl->el_cl.cls_lock;
+       return ecl->el_cl.cls_lock;
 }
 
 static struct lu_context_key echo_thread_key;
+
 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
 {
-        struct echo_thread_info *info;
-        info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
-        LASSERT(info != NULL);
-        return info;
+       struct echo_thread_info *info;
+
+       info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
+       LASSERT(info != NULL);
+       return info;
 }
 
 static inline
 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
 {
-        return container_of(c, struct echo_object_conf, eoc_cl);
+       return container_of(c, struct echo_object_conf, eoc_cl);
+}
+
+#ifdef HAVE_SERVER_SUPPORT
+static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
+{
+       return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
+}
+
+static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
+{
+       return &d->emd_md_dev.md_lu_dev;
+}
+
+static inline struct seq_server_site *echo_md_seq_site(struct echo_md_device *d)
+{
+       return emd2lu_dev(d)->ld_site->ld_seq_site;
+}
+
+static inline struct obd_device *emd2obd_dev(struct echo_md_device *d)
+{
+       return d->emd_md_dev.md_lu_dev.ld_obd;
 }
+#endif /* HAVE_SERVER_SUPPORT */
 
 /** @} echo_helpers */
 
-static struct echo_object *cl_echo_object_find(struct echo_device *d,
-                                               struct lov_stripe_md **lsm);
 static int cl_echo_object_put(struct echo_object *eco);
-static int cl_echo_enqueue   (struct echo_object *eco, obd_off start,
-                              obd_off end, int mode, __u64 *cookie);
-static int cl_echo_cancel    (struct echo_device *d, __u64 cookie);
-static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
+static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
                              struct page **pages, int npages, int async);
 
 struct echo_thread_info {
        struct echo_object_conf eti_conf;
        struct lustre_md        eti_md;
-
        struct cl_2queue        eti_queue;
        struct cl_io            eti_io;
-       struct cl_lock_descr    eti_descr;
+       struct cl_lock          eti_lock;
        struct lu_fid           eti_fid;
        struct lu_fid           eti_fid2;
 #ifdef HAVE_SERVER_SUPPORT
@@ -190,17 +235,18 @@ struct echo_thread_info {
        struct md_attr          eti_ma;
        struct lu_name          eti_lname;
        /* per-thread values, can be re-used */
-       void                    *eti_big_lmm;
+       void                    *eti_big_lmm; /* may be vmalloc'd */
        int                     eti_big_lmmsize;
-       char                    eti_name[20];
+       char                    eti_name[ETI_NAME_LEN];
        struct lu_buf           eti_buf;
-       char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
+       /* If we want to test large ACL, then need to enlarge the buffer. */
+       char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE_OLD];
 #endif
 };
 
 /* No session used right now */
 struct echo_session_info {
-        unsigned long dummy;
+       unsigned long dummy;
 };
 
 static struct kmem_cache *echo_lock_kmem;
@@ -210,29 +256,29 @@ static struct kmem_cache *echo_session_kmem;
 /* static struct kmem_cache *echo_req_kmem; */
 
 static struct lu_kmem_descr echo_caches[] = {
-        {
-                .ckd_cache = &echo_lock_kmem,
-                .ckd_name  = "echo_lock_kmem",
-                .ckd_size  = sizeof (struct echo_lock)
-        },
-        {
-                .ckd_cache = &echo_object_kmem,
-                .ckd_name  = "echo_object_kmem",
-                .ckd_size  = sizeof (struct echo_object)
-        },
-        {
-                .ckd_cache = &echo_thread_kmem,
-                .ckd_name  = "echo_thread_kmem",
-                .ckd_size  = sizeof (struct echo_thread_info)
-        },
-        {
-                .ckd_cache = &echo_session_kmem,
-                .ckd_name  = "echo_session_kmem",
-                .ckd_size  = sizeof (struct echo_session_info)
-        },
-        {
-                .ckd_cache = NULL
-        }
+       {
+               .ckd_cache = &echo_lock_kmem,
+               .ckd_name  = "echo_lock_kmem",
+               .ckd_size  = sizeof(struct echo_lock)
+       },
+       {
+               .ckd_cache = &echo_object_kmem,
+               .ckd_name  = "echo_object_kmem",
+               .ckd_size  = sizeof(struct echo_object)
+       },
+       {
+               .ckd_cache = &echo_thread_kmem,
+               .ckd_name  = "echo_thread_kmem",
+               .ckd_size  = sizeof(struct echo_thread_info)
+       },
+       {
+               .ckd_cache = &echo_session_kmem,
+               .ckd_name  = "echo_session_kmem",
+               .ckd_size  = sizeof(struct echo_session_info)
+       },
+       {
+               .ckd_cache = NULL
+       }
 };
 
 /** \defgroup echo_page Page operations
@@ -242,98 +288,103 @@ static struct lu_kmem_descr echo_caches[] = {
  * @{
  */
 static int echo_page_own(const struct lu_env *env,
-                         const struct cl_page_slice *slice,
-                         struct cl_io *io, int nonblock)
+                        const struct cl_page_slice *slice,
+                        struct cl_io *io, int nonblock)
 {
-        struct echo_page *ep = cl2echo_page(slice);
+       struct echo_page *ep = cl2echo_page(slice);
 
-        if (!nonblock)
-               mutex_lock(&ep->ep_lock);
-       else if (!mutex_trylock(&ep->ep_lock))
-                return -EAGAIN;
-        return 0;
+       if (!nonblock) {
+               if (test_and_set_bit(0, &ep->ep_lock))
+                       return -EAGAIN;
+       } else {
+               while (test_and_set_bit(0, &ep->ep_lock))
+                       wait_on_bit(&ep->ep_lock, 0, TASK_UNINTERRUPTIBLE);
+       }
+       return 0;
 }
 
 static void echo_page_disown(const struct lu_env *env,
-                             const struct cl_page_slice *slice,
-                             struct cl_io *io)
+                            const struct cl_page_slice *slice,
+                            struct cl_io *io)
 {
-        struct echo_page *ep = cl2echo_page(slice);
+       struct echo_page *ep = cl2echo_page(slice);
 
-       LASSERT(mutex_is_locked(&ep->ep_lock));
-       mutex_unlock(&ep->ep_lock);
+       LASSERT(test_bit(0, &ep->ep_lock));
+       clear_and_wake_up_bit(0, &ep->ep_lock);
 }
 
 static void echo_page_discard(const struct lu_env *env,
-                              const struct cl_page_slice *slice,
-                              struct cl_io *unused)
+                             const struct cl_page_slice *slice,
+                             struct cl_io *unused)
 {
-        cl_page_delete(env, slice->cpl_page);
+       cl_page_delete(env, slice->cpl_page);
 }
 
 static int echo_page_is_vmlocked(const struct lu_env *env,
-                                 const struct cl_page_slice *slice)
+                                const struct cl_page_slice *slice)
 {
-       if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
-                return -EBUSY;
-        return -ENODATA;
+       if (test_bit(0, &cl2echo_page(slice)->ep_lock))
+               return -EBUSY;
+       return -ENODATA;
 }
 
 static void echo_page_completion(const struct lu_env *env,
-                                 const struct cl_page_slice *slice,
-                                 int ioret)
+                                const struct cl_page_slice *slice,
+                                int ioret)
 {
-        LASSERT(slice->cpl_page->cp_sync_io != NULL);
+       LASSERT(slice->cpl_page->cp_sync_io != NULL);
 }
 
 static void echo_page_fini(const struct lu_env *env,
-                          struct cl_page_slice *slice)
+                          struct cl_page_slice *slice,
+                          struct pagevec *pvec)
 {
        struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
-       ENTRY;
 
+       ENTRY;
        atomic_dec(&eco->eo_npages);
-       page_cache_release(slice->cpl_page->cp_vmpage);
+       put_page(slice->cpl_page->cp_vmpage);
        EXIT;
 }
 
 static int echo_page_prep(const struct lu_env *env,
-                          const struct cl_page_slice *slice,
-                          struct cl_io *unused)
+                         const struct cl_page_slice *slice,
+                         struct cl_io *unused)
 {
-        return 0;
+       return 0;
 }
 
 static int echo_page_print(const struct lu_env *env,
-                           const struct cl_page_slice *slice,
-                           void *cookie, lu_printer_t printer)
+                          const struct cl_page_slice *slice,
+                          void *cookie, lu_printer_t printer)
 {
        struct echo_page *ep = cl2echo_page(slice);
 
        (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
-                  ep, mutex_is_locked(&ep->ep_lock),
+                  ep, test_bit(0, &ep->ep_lock),
                   slice->cpl_page->cp_vmpage);
        return 0;
 }
 
 static const struct cl_page_operations echo_page_ops = {
-        .cpo_own           = echo_page_own,
-        .cpo_disown        = echo_page_disown,
-        .cpo_discard       = echo_page_discard,
-        .cpo_fini          = echo_page_fini,
-        .cpo_print         = echo_page_print,
-        .cpo_is_vmlocked   = echo_page_is_vmlocked,
-        .io = {
-                [CRT_READ] = {
-                        .cpo_prep        = echo_page_prep,
-                        .cpo_completion  = echo_page_completion,
-                },
-                [CRT_WRITE] = {
-                        .cpo_prep        = echo_page_prep,
-                        .cpo_completion  = echo_page_completion,
-                }
-        }
+       .cpo_own           = echo_page_own,
+       .cpo_disown        = echo_page_disown,
+       .cpo_discard       = echo_page_discard,
+       .cpo_fini          = echo_page_fini,
+       .cpo_print         = echo_page_print,
+       .cpo_is_vmlocked   = echo_page_is_vmlocked,
+       .io = {
+               [CRT_READ] = {
+                       .cpo_prep        = echo_page_prep,
+                       .cpo_completion  = echo_page_completion,
+               },
+               [CRT_WRITE] = {
+                       .cpo_prep        = echo_page_prep,
+                       .cpo_completion  = echo_page_completion,
+               }
+       }
 };
+
 /** @} echo_page */
 
 /** \defgroup echo_lock Locking
@@ -343,34 +394,16 @@ static const struct cl_page_operations echo_page_ops = {
  * @{
  */
 static void echo_lock_fini(const struct lu_env *env,
-                           struct cl_lock_slice *slice)
-{
-        struct echo_lock *ecl = cl2echo_lock(slice);
-
-        LASSERT(cfs_list_empty(&ecl->el_chain));
-        OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
-}
-
-static void echo_lock_delete(const struct lu_env *env,
-                             const struct cl_lock_slice *slice)
+                          struct cl_lock_slice *slice)
 {
-        struct echo_lock *ecl      = cl2echo_lock(slice);
-
-        LASSERT(cfs_list_empty(&ecl->el_chain));
-}
+       struct echo_lock *ecl = cl2echo_lock(slice);
 
-static int echo_lock_fits_into(const struct lu_env *env,
-                               const struct cl_lock_slice *slice,
-                               const struct cl_lock_descr *need,
-                               const struct cl_io *unused)
-{
-        return 1;
+       LASSERT(list_empty(&ecl->el_chain));
+       OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
 }
 
 static struct cl_lock_operations echo_lock_ops = {
-        .clo_fini      = echo_lock_fini,
-        .clo_delete    = echo_lock_delete,
-        .clo_fits_into = echo_lock_fits_into
+       .clo_fini      = echo_lock_fini,
 };
 
 /** @} echo_lock */
@@ -386,19 +419,25 @@ static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
 {
        struct echo_page *ep = cl_object_page_slice(obj, page);
        struct echo_object *eco = cl2echo_obj(obj);
-       ENTRY;
 
-       page_cache_get(page->cp_vmpage);
-       mutex_init(&ep->ep_lock);
+       ENTRY;
+       get_page(page->cp_vmpage);
+       /*
+        * ep_lock is similar to the lock_page() lock, and
+        * cannot usefully be monitored by lockdep.
+        * So just use a bit in an "unsigned long" and use the
+        * wait_on_bit() interface to wait for the bit to be clear.
+        */
+       ep->ep_lock = 0;
        cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
        atomic_inc(&eco->eo_npages);
        RETURN(0);
 }
 
 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
-                        struct cl_io *io)
+                       struct cl_io *io)
 {
-        return 0;
+       return 0;
 }
 
 static int echo_lock_init(const struct lu_env *env,
@@ -406,29 +445,29 @@ static int echo_lock_init(const struct lu_env *env,
                          const struct cl_io *unused)
 {
        struct echo_lock *el;
-       ENTRY;
 
+       ENTRY;
        OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
-       if (el != NULL) {
+       if (el) {
                cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
                el->el_object = cl2echo_obj(obj);
-               CFS_INIT_LIST_HEAD(&el->el_chain);
+               INIT_LIST_HEAD(&el->el_chain);
                atomic_set(&el->el_refcount, 0);
        }
-       RETURN(el == NULL ? -ENOMEM : 0);
+       RETURN(el ? 0 : -ENOMEM);
 }
 
 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
-                         const struct cl_object_conf *conf)
+                        const struct cl_object_conf *conf)
 {
-        return 0;
+       return 0;
 }
 
 static const struct cl_object_operations echo_cl_obj_ops = {
-        .coo_page_init = echo_page_init,
-        .coo_lock_init = echo_lock_init,
-        .coo_io_init   = echo_io_init,
-        .coo_conf_set  = echo_conf_set
+       .coo_page_init = echo_page_init,
+       .coo_lock_init = echo_lock_init,
+       .coo_io_init   = echo_io_init,
+       .coo_conf_set  = echo_conf_set
 };
 /** @} echo_cl_ops */
 
@@ -439,135 +478,103 @@ static const struct cl_object_operations echo_cl_obj_ops = {
  * @{
  */
 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
-                            const struct lu_object_conf *conf)
-{
-        struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
-        struct echo_client_obd *ec     = ed->ed_ec;
-        struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
-        ENTRY;
-
-        if (ed->ed_next) {
-                struct lu_object  *below;
-                struct lu_device  *under;
-
-                under = ed->ed_next;
-                below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
-                                                        under);
-                if (below == NULL)
-                        RETURN(-ENOMEM);
-                lu_object_add(obj, below);
-        }
-
-        if (!ed->ed_next_ismd) {
-                const struct cl_object_conf *cconf = lu2cl_conf(conf);
-                struct echo_object_conf *econf = cl2echo_conf(cconf);
-
-                LASSERT(econf->eoc_md);
-                eco->eo_lsm = *econf->eoc_md;
-                /* clear the lsm pointer so that it won't get freed. */
-                *econf->eoc_md = NULL;
-        } else {
-                eco->eo_lsm = NULL;
-        }
-
-        eco->eo_dev = ed;
+                           const struct lu_object_conf *conf)
+{
+       struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
+       struct echo_client_obd *ec     = ed->ed_ec;
+       struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
+
+       ENTRY;
+       if (ed->ed_next) {
+               struct lu_object  *below;
+               struct lu_device  *under;
+
+               under = ed->ed_next;
+               below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
+                                                       under);
+               if (!below)
+                       RETURN(-ENOMEM);
+               lu_object_add(obj, below);
+       }
+
+       if (!ed->ed_next_ismd) {
+               const struct cl_object_conf *cconf = lu2cl_conf(conf);
+               struct echo_object_conf *econf = cl2echo_conf(cconf);
+
+               LASSERT(econf->eoc_oinfo != NULL);
+
+               /*
+                * Transfer the oinfo pointer to eco that it won't be
+                * freed.
+                */
+               eco->eo_oinfo = *econf->eoc_oinfo;
+               *econf->eoc_oinfo = NULL;
+       } else {
+               eco->eo_oinfo = NULL;
+       }
+
+       eco->eo_dev = ed;
        atomic_set(&eco->eo_npages, 0);
        cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
 
        spin_lock(&ec->ec_lock);
-       cfs_list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
+       list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
        spin_unlock(&ec->ec_lock);
 
        RETURN(0);
 }
 
-/* taken from osc_unpackmd() */
-static int echo_alloc_memmd(struct echo_device *ed,
-                           struct lov_stripe_md **lsmp)
+static void echo_object_delete(const struct lu_env *env, struct lu_object *obj)
 {
-       int lsm_size;
+       struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
+       struct echo_client_obd *ec;
 
        ENTRY;
 
-       /* If export is lov/osc then use their obd method */
-       if (ed->ed_next != NULL)
-               return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
-       /* OFD has no unpackmd method, do everything here */
-       lsm_size = lov_stripe_md_size(1);
-
-       LASSERT(*lsmp == NULL);
-       OBD_ALLOC(*lsmp, lsm_size);
-       if (*lsmp == NULL)
-               RETURN(-ENOMEM);
-
-       OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
-       if ((*lsmp)->lsm_oinfo[0] == NULL) {
-               OBD_FREE(*lsmp, lsm_size);
-               RETURN(-ENOMEM);
-       }
-
-       loi_init((*lsmp)->lsm_oinfo[0]);
-       (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
-       ostid_set_seq_echo(&(*lsmp)->lsm_oi);
-
-       RETURN(lsm_size);
-}
+       /* object delete called unconditolally - layer init or not */
+       if (eco->eo_dev == NULL)
+               return;
 
-static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
-{
-       int lsm_size;
+       ec = eco->eo_dev->ed_ec;
 
-       ENTRY;
+       LASSERT(atomic_read(&eco->eo_npages) == 0);
 
-       /* If export is lov/osc then use their obd method */
-       if (ed->ed_next != NULL)
-               return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
-       /* OFD has no unpackmd method, do everything here */
-       lsm_size = lov_stripe_md_size(1);
+       spin_lock(&ec->ec_lock);
+       list_del_init(&eco->eo_obj_chain);
+       spin_unlock(&ec->ec_lock);
 
-       LASSERT(*lsmp != NULL);
-       OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
-       OBD_FREE(*lsmp, lsm_size);
-       *lsmp = NULL;
-       RETURN(0);
+       if (eco->eo_oinfo)
+               OBD_FREE_PTR(eco->eo_oinfo);
 }
 
 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
 {
-        struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
-        struct echo_client_obd *ec = eco->eo_dev->ed_ec;
-        ENTRY;
-
-       LASSERT(atomic_read(&eco->eo_npages) == 0);
+       struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
 
-       spin_lock(&ec->ec_lock);
-        cfs_list_del_init(&eco->eo_obj_chain);
-       spin_unlock(&ec->ec_lock);
+       ENTRY;
 
-        lu_object_fini(obj);
-        lu_object_header_fini(obj->lo_header);
+       lu_object_fini(obj);
+       lu_object_header_fini(obj->lo_header);
 
-       if (eco->eo_lsm)
-               echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
        OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
        EXIT;
 }
 
 static int echo_object_print(const struct lu_env *env, void *cookie,
-                            lu_printer_t p, const struct lu_object *o)
+                            lu_printer_t p, const struct lu_object *o)
 {
-        struct echo_object *obj = cl2echo_obj(lu2cl(o));
+       struct echo_object *obj = cl2echo_obj(lu2cl(o));
 
-        return (*p)(env, cookie, "echoclient-object@%p", obj);
+       return (*p)(env, cookie, "echoclient-object@%p", obj);
 }
 
 static const struct lu_object_operations echo_lu_obj_ops = {
-        .loo_object_init      = echo_object_init,
-        .loo_object_delete    = NULL,
-        .loo_object_release   = NULL,
-        .loo_object_free      = echo_object_free,
-        .loo_object_print     = echo_object_print,
-        .loo_object_invariant = NULL
+       .loo_object_init      = echo_object_init,
+       .loo_object_delete    = echo_object_delete,
+       .loo_object_release   = NULL,
+       .loo_object_free      = echo_object_free,
+       .loo_object_print     = echo_object_print,
+       .loo_object_invariant = NULL
 };
 /** @} echo_lu_ops */
 
@@ -583,12 +590,12 @@ static struct lu_object *echo_object_alloc(const struct lu_env *env,
 {
        struct echo_object *eco;
        struct lu_object *obj = NULL;
-       ENTRY;
 
+       ENTRY;
        /* we're the top dev. */
        LASSERT(hdr == NULL);
        OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
-       if (eco != NULL) {
+       if (eco) {
                struct cl_object_header *hdr = &eco->eo_hdr;
 
                obj = &echo_obj2cl(eco)->co_lu;
@@ -605,14 +612,11 @@ static struct lu_object *echo_object_alloc(const struct lu_env *env,
 }
 
 static struct lu_device_operations echo_device_lu_ops = {
-        .ldo_object_alloc   = echo_object_alloc,
+       .ldo_object_alloc   = echo_object_alloc,
 };
 
 /** @} echo_lu_dev_ops */
 
-static struct cl_device_operations echo_device_cl_ops = {
-};
-
 /** \defgroup echo_init Setup and teardown
  *
  * Init and fini functions for echo client.
@@ -621,31 +625,33 @@ static struct cl_device_operations echo_device_cl_ops = {
  */
 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
 {
-        struct cl_site *site = &ed->ed_site_myself;
-        int rc;
+       struct cl_site *site = &ed->ed_site_myself;
+       int rc;
 
-        /* initialize site */
-        rc = cl_site_init(site, &ed->ed_cl);
-        if (rc) {
-                CERROR("Cannot initilize site for echo client(%d)\n", rc);
-                return rc;
-        }
+       /* initialize site */
+       rc = cl_site_init(site, &ed->ed_cl);
+       if (rc) {
+               CERROR("Cannot initialize site for echo client(%d)\n", rc);
+               return rc;
+       }
 
-        rc = lu_site_init_finish(&site->cs_lu);
-        if (rc)
-                return rc;
+       rc = lu_site_init_finish(&site->cs_lu);
+       if (rc) {
+               cl_site_fini(site);
+               return rc;
+       }
 
-        ed->ed_site = site;
-        return 0;
+       ed->ed_site = &site->cs_lu;
+       return 0;
 }
 
 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
 {
-        if (ed->ed_site) {
-                if (!ed->ed_next_ismd)
-                        cl_site_fini(ed->ed_site);
-                ed->ed_site = NULL;
-        }
+       if (ed->ed_site) {
+               if (!ed->ed_next_ismd)
+                       lu_site_fini(ed->ed_site);
+               ed->ed_site = NULL;
+       }
 }
 
 static void *echo_thread_key_init(const struct lu_context *ctx,
@@ -654,28 +660,23 @@ static void *echo_thread_key_init(const struct lu_context *ctx,
        struct echo_thread_info *info;
 
        OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
-       if (info == NULL)
+       if (!info)
                info = ERR_PTR(-ENOMEM);
        return info;
 }
 
 static void echo_thread_key_fini(const struct lu_context *ctx,
-                         struct lu_context_key *key, void *data)
+                                struct lu_context_key *key, void *data)
 {
-        struct echo_thread_info *info = data;
-        OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
-}
+       struct echo_thread_info *info = data;
 
-static void echo_thread_key_exit(const struct lu_context *ctx,
-                         struct lu_context_key *key, void *data)
-{
+       OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
 }
 
 static struct lu_context_key echo_thread_key = {
-        .lct_tags = LCT_CL_THREAD,
-        .lct_init = echo_thread_key_init,
-        .lct_fini = echo_thread_key_fini,
-        .lct_exit = echo_thread_key_exit
+       .lct_tags = LCT_CL_THREAD,
+       .lct_init = echo_thread_key_init,
+       .lct_fini = echo_thread_key_fini,
 };
 
 static void *echo_session_key_init(const struct lu_context *ctx,
@@ -684,28 +685,23 @@ static void *echo_session_key_init(const struct lu_context *ctx,
        struct echo_session_info *session;
 
        OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
-       if (session == NULL)
+       if (!session)
                session = ERR_PTR(-ENOMEM);
        return session;
 }
 
 static void echo_session_key_fini(const struct lu_context *ctx,
-                                 struct lu_context_key *key, void *data)
+                                 struct lu_context_key *key, void *data)
 {
-        struct echo_session_info *session = data;
-        OBD_SLAB_FREE_PTR(session, echo_session_kmem);
-}
+       struct echo_session_info *session = data;
 
-static void echo_session_key_exit(const struct lu_context *ctx,
-                                 struct lu_context_key *key, void *data)
-{
+       OBD_SLAB_FREE_PTR(session, echo_session_kmem);
 }
 
 static struct lu_context_key echo_session_key = {
-        .lct_tags = LCT_SESSION,
-        .lct_init = echo_session_key_init,
-        .lct_fini = echo_session_key_fini,
-        .lct_exit = echo_session_key_exit
+       .lct_tags = LCT_SESSION,
+       .lct_init = echo_session_key_init,
+       .lct_fini = echo_session_key_fini,
 };
 
 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
@@ -715,134 +711,222 @@ LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
 static int echo_fid_init(struct echo_device *ed, char *obd_name,
                         struct seq_server_site *ss)
 {
-        char *prefix;
-        int rc;
-        ENTRY;
+       char *prefix;
+       int rc;
 
-        OBD_ALLOC_PTR(ed->ed_cl_seq);
-        if (ed->ed_cl_seq == NULL)
-                RETURN(-ENOMEM);
+       ENTRY;
+       OBD_ALLOC_PTR(ed->ed_cl_seq);
+       if (!ed->ed_cl_seq)
+               RETURN(-ENOMEM);
 
-        OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
-        if (prefix == NULL)
-                GOTO(out_free_seq, rc = -ENOMEM);
+       OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+       if (!prefix)
+               GOTO(out_free_seq, rc = -ENOMEM);
 
-        snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
+       snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
 
        /* Init client side sequence-manager */
        rc = seq_client_init(ed->ed_cl_seq, NULL,
                             LUSTRE_SEQ_METADATA,
                             prefix, ss->ss_server_seq);
-        ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
-        OBD_FREE(prefix, MAX_OBD_NAME + 5);
-        if (rc)
-                GOTO(out_free_seq, rc);
+       ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
+       OBD_FREE(prefix, MAX_OBD_NAME + 5);
+       if (rc)
+               GOTO(out_free_seq, rc);
 
-        RETURN(0);
+       RETURN(0);
 
 out_free_seq:
-        OBD_FREE_PTR(ed->ed_cl_seq);
-        ed->ed_cl_seq = NULL;
-        RETURN(rc);
+       OBD_FREE_PTR(ed->ed_cl_seq);
+       ed->ed_cl_seq = NULL;
+       RETURN(rc);
 }
 
 static int echo_fid_fini(struct obd_device *obddev)
 {
-        struct echo_device *ed = obd2echo_dev(obddev);
-        ENTRY;
+       struct echo_device *ed = obd2echo_dev(obddev);
+
+       ENTRY;
+       if (ed->ed_cl_seq) {
+               seq_client_fini(ed->ed_cl_seq);
+               OBD_FREE_PTR(ed->ed_cl_seq);
+               ed->ed_cl_seq = NULL;
+       }
+
+       RETURN(0);
+}
+
+static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
+{
+       ENTRY;
+       if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
+               local_oid_storage_fini(env, ed->ed_los);
+               ed->ed_los = NULL;
+       }
+}
+
+static int
+echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
+                         struct local_oid_storage *los,
+                         const struct lu_fid *pfid, const char *name,
+                         __u32 mode, struct lu_fid *fid)
+{
+       struct dt_object        *parent = NULL;
+       struct dt_object        *dto = NULL;
+       int                      rc = 0;
+
+       ENTRY;
+       LASSERT(!fid_is_zero(pfid));
+       parent = dt_locate(env, emd->emd_bottom, pfid);
+       if (unlikely(IS_ERR(parent)))
+               RETURN(PTR_ERR(parent));
+
+       /* create local file with @fid */
+       dto = local_file_find_or_create_with_fid(env, emd->emd_bottom, fid,
+                                                parent, name, mode);
+       if (IS_ERR(dto))
+               GOTO(out_put, rc = PTR_ERR(dto));
+
+       *fid = *lu_object_fid(&dto->do_lu);
+       /*
+        * since stack is not fully set up the local_storage uses own stack
+        * and we should drop its object from cache
+        */
+       dt_object_put_nocache(env, dto);
+
+       EXIT;
+out_put:
+       dt_object_put(env, parent);
+       RETURN(rc);
+}
+
+static int
+echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
+                struct echo_device *ed)
+{
+       struct lu_fid fid;
+       int rc = 0;
+
+       ENTRY;
+       /* Setup local dirs */
+       fid.f_seq = FID_SEQ_LOCAL_NAME;
+       fid.f_oid = 1;
+       fid.f_ver = 0;
+       rc = local_oid_storage_init(env, emd->emd_bottom, &fid, &ed->ed_los);
+       if (rc != 0)
+               RETURN(rc);
+
+       lu_echo_root_fid(&fid);
+       if (echo_md_seq_site(emd)->ss_node_id == 0) {
+               rc = echo_md_local_file_create(env, emd, ed->ed_los,
+                                              &emd->emd_local_root_fid,
+                                              echo_md_root_dir_name, S_IFDIR |
+                                              S_IRUGO | S_IWUSR | S_IXUGO,
+                                              &fid);
+               if (rc != 0) {
+                       CERROR("%s: create md echo root fid failed: rc = %d\n",
+                              emd2obd_dev(emd)->obd_name, rc);
+                       GOTO(out_los, rc);
+               }
+       }
+       ed->ed_root_fid = fid;
 
-        if (ed->ed_cl_seq != NULL) {
-                seq_client_fini(ed->ed_cl_seq);
-                OBD_FREE_PTR(ed->ed_cl_seq);
-                ed->ed_cl_seq = NULL;
-        }
+       RETURN(0);
+out_los:
+       echo_ed_los_fini(env, ed);
 
-        RETURN(0);
+       RETURN(rc);
 }
 #endif /* HAVE_SERVER_SUPPORT */
 
 static struct lu_device *echo_device_alloc(const struct lu_env *env,
-                                           struct lu_device_type *t,
-                                           struct lustre_cfg *cfg)
-{
-        struct lu_device   *next;
-        struct echo_device *ed;
-        struct cl_device   *cd;
-        struct obd_device  *obd = NULL; /* to keep compiler happy */
-        struct obd_device  *tgt;
-        const char *tgt_type_name;
-        int rc;
-        int cleanup = 0;
-        ENTRY;
-
-        OBD_ALLOC_PTR(ed);
-        if (ed == NULL)
-                GOTO(out, rc = -ENOMEM);
-
-        cleanup = 1;
-        cd = &ed->ed_cl;
-        rc = cl_device_init(cd, t);
-        if (rc)
-                GOTO(out, rc);
-
-        cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
-        cd->cd_ops = &echo_device_cl_ops;
-
-        cleanup = 2;
-        obd = class_name2obd(lustre_cfg_string(cfg, 0));
-        LASSERT(obd != NULL);
-        LASSERT(env != NULL);
-
-        tgt = class_name2obd(lustre_cfg_string(cfg, 1));
-        if (tgt == NULL) {
-                CERROR("Can not find tgt device %s\n",
-                        lustre_cfg_string(cfg, 1));
-                GOTO(out, rc = -ENODEV);
-        }
-
-        next = tgt->obd_lu_dev;
-        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
-                ed->ed_next_ismd = 1;
-        } else {
-                ed->ed_next_ismd = 0;
-                rc = echo_site_init(env, ed);
-                if (rc)
-                        GOTO(out, rc);
-        }
-        cleanup = 3;
-
-        rc = echo_client_setup(env, obd, cfg);
-        if (rc)
-                GOTO(out, rc);
-
-        ed->ed_ec = &obd->u.echo_client;
-        cleanup = 4;
-
-        if (ed->ed_next_ismd) {
+                                          struct lu_device_type *t,
+                                          struct lustre_cfg *cfg)
+{
+       struct lu_device   *next;
+       struct echo_device *ed;
+       struct cl_device   *cd;
+       struct obd_device  *obd = NULL; /* to keep compiler happy */
+       struct obd_device  *tgt;
+       const char *tgt_type_name;
+       int rc;
+       int cleanup = 0;
+
+       ENTRY;
+       OBD_ALLOC_PTR(ed);
+       if (!ed)
+               GOTO(out, rc = -ENOMEM);
+
+       cleanup = 1;
+       cd = &ed->ed_cl;
+       rc = cl_device_init(cd, t);
+       if (rc)
+               GOTO(out, rc);
+
+       cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
+
+       cleanup = 2;
+       obd = class_name2obd(lustre_cfg_string(cfg, 0));
+       LASSERT(obd != NULL);
+       LASSERT(env != NULL);
+
+       tgt = class_name2obd(lustre_cfg_string(cfg, 1));
+       if (!tgt) {
+               CERROR("Can not find tgt device %s\n",
+                       lustre_cfg_string(cfg, 1));
+               GOTO(out, rc = -ENODEV);
+       }
+
+       next = tgt->obd_lu_dev;
+
+       if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
+               ed->ed_next_ismd = 1;
+       } else if (strcmp(tgt->obd_type->typ_name, LUSTRE_OST_NAME) == 0 ||
+                  strcmp(tgt->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) {
+               ed->ed_next_ismd = 0;
+               rc = echo_site_init(env, ed);
+               if (rc)
+                       GOTO(out, rc);
+       } else {
+               GOTO(out, rc = -EINVAL);
+       }
+
+       cleanup = 3;
+
+       rc = echo_client_setup(env, obd, cfg);
+       if (rc)
+               GOTO(out, rc);
+
+       ed->ed_ec = &obd->u.echo_client;
+       cleanup = 4;
+
+       if (ed->ed_next_ismd) {
 #ifdef HAVE_SERVER_SUPPORT
-                /* Suppose to connect to some Metadata layer */
-                struct lu_site *ls;
-                struct lu_device *ld;
-                int    found = 0;
-
-                if (next == NULL) {
-                        CERROR("%s is not lu device type!\n",
-                               lustre_cfg_string(cfg, 1));
-                        GOTO(out, rc = -EINVAL);
-                }
-
-                tgt_type_name = lustre_cfg_string(cfg, 2);
-                if (!tgt_type_name) {
-                        CERROR("%s no type name for echo %s setup\n",
-                                lustre_cfg_string(cfg, 1),
-                                tgt->obd_type->typ_name);
-                        GOTO(out, rc = -EINVAL);
-                }
-
-                ls = next->ld_site;
+               /* Suppose to connect to some Metadata layer */
+               struct lu_site          *ls = NULL;
+               struct lu_device        *ld = NULL;
+               struct md_device        *md = NULL;
+               struct echo_md_device   *emd = NULL;
+               int                      found = 0;
+
+               if (!next) {
+                       CERROR("%s is not lu device type!\n",
+                              lustre_cfg_string(cfg, 1));
+                       GOTO(out, rc = -EINVAL);
+               }
+
+               tgt_type_name = lustre_cfg_string(cfg, 2);
+               if (!tgt_type_name) {
+                       CERROR("%s no type name for echo %s setup\n",
+                               lustre_cfg_string(cfg, 1),
+                               tgt->obd_type->typ_name);
+                       GOTO(out, rc = -EINVAL);
+               }
+
+               ls = next->ld_site;
 
                spin_lock(&ls->ls_ld_lock);
-               cfs_list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
+               list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
                        if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
                                found = 1;
                                break;
@@ -850,199 +934,200 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
                }
                spin_unlock(&ls->ls_ld_lock);
 
-                if (found == 0) {
-                        CERROR("%s is not lu device type!\n",
-                               lustre_cfg_string(cfg, 1));
-                        GOTO(out, rc = -EINVAL);
-                }
-
-                next = ld;
-                /* For MD echo client, it will use the site in MDS stack */
-                ed->ed_site_myself.cs_lu = *ls;
-                ed->ed_site = &ed->ed_site_myself;
-                ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu;
+               if (found == 0) {
+                       CERROR("%s is not lu device type!\n",
+                              lustre_cfg_string(cfg, 1));
+                       GOTO(out, rc = -EINVAL);
+               }
+
+               next = ld;
+               /* For MD echo client, it will use the site in MDS stack */
+               ed->ed_site = ls;
+               ed->ed_cl.cd_lu_dev.ld_site = ls;
                rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
                if (rc) {
                        CERROR("echo fid init error %d\n", rc);
                        GOTO(out, rc);
                }
+
+               md = lu2md_dev(next);
+               emd = lu2emd_dev(&md->md_lu_dev);
+               rc = echo_md_root_get(env, emd, ed);
+               if (rc != 0) {
+                       CERROR("%s: get root error: rc = %d\n",
+                               emd2obd_dev(emd)->obd_name, rc);
+                       GOTO(out, rc);
+               }
 #else /* !HAVE_SERVER_SUPPORT */
-               CERROR("Local operations are NOT supported on client side. "
-                      "Only remote operations are supported. Metadata client "
-                      "must be run on server side.\n");
+               CERROR(
+                      "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
                GOTO(out, rc = -EOPNOTSUPP);
-#endif
-        } else {
-                 /* if echo client is to be stacked upon ost device, the next is
-                  * NULL since ost is not a clio device so far */
-                if (next != NULL && !lu_device_is_cl(next))
-                        next = NULL;
-
-                tgt_type_name = tgt->obd_type->typ_name;
-                if (next != NULL) {
-                        LASSERT(next != NULL);
-                        if (next->ld_site != NULL)
-                                GOTO(out, rc = -EBUSY);
-
-                        next->ld_site = &ed->ed_site->cs_lu;
-                        rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
-                                                     next->ld_type->ldt_name,
-                                                     NULL);
-                        if (rc)
-                                GOTO(out, rc);
-
-                        /* Tricky case, I have to determine the obd type since
-                         * CLIO uses the different parameters to initialize
-                         * objects for lov & osc. */
-                        if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
-                                ed->ed_next_islov = 1;
-                        else
-                                LASSERT(strcmp(tgt_type_name,
-                                               LUSTRE_OSC_NAME) == 0);
-                } else
-                        LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
-        }
-
-        ed->ed_next = next;
-        RETURN(&cd->cd_lu_dev);
+#endif /* HAVE_SERVER_SUPPORT */
+       } else {
+               /*
+                * if echo client is to be stacked upon ost device, the next is
+                * NULL since ost is not a clio device so far
+                */
+               if (next != NULL && !lu_device_is_cl(next))
+                       next = NULL;
+
+               tgt_type_name = tgt->obd_type->typ_name;
+               if (next) {
+                       LASSERT(next != NULL);
+                       if (next->ld_site)
+                               GOTO(out, rc = -EBUSY);
+
+                       next->ld_site = ed->ed_site;
+                       rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
+                                                       next->ld_type->ldt_name,
+                                                       NULL);
+                       if (rc)
+                               GOTO(out, rc);
+               } else {
+                       LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
+               }
+       }
+
+       ed->ed_next = next;
+       RETURN(&cd->cd_lu_dev);
 out:
-        switch(cleanup) {
-        case 4: {
-                int rc2;
-                rc2 = echo_client_cleanup(obd);
-                if (rc2)
-                        CERROR("Cleanup obd device %s error(%d)\n",
-                               obd->obd_name, rc2);
-        }
-
-        case 3:
-                echo_site_fini(env, ed);
-        case 2:
-                cl_device_fini(&ed->ed_cl);
-        case 1:
-                OBD_FREE_PTR(ed);
-        case 0:
-        default:
-                break;
-        }
-        return(ERR_PTR(rc));
+       switch (cleanup) {
+       case 4: {
+               int rc2;
+
+               rc2 = echo_client_cleanup(obd);
+               if (rc2)
+                       CERROR("Cleanup obd device %s error(%d)\n",
+                              obd->obd_name, rc2);
+       }
+       /* fallthrough */
+
+       case 3:
+               echo_site_fini(env, ed);
+               /* fallthrough */
+       case 2:
+               cl_device_fini(&ed->ed_cl);
+               /* fallthrough */
+       case 1:
+               OBD_FREE_PTR(ed);
+               /* fallthrough */
+       case 0:
+       default:
+               break;
+       }
+       return ERR_PTR(rc);
 }
 
 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
-                          const char *name, struct lu_device *next)
+                           const char *name, struct lu_device *next)
 {
-        LBUG();
-        return 0;
+       LBUG();
+       return 0;
 }
 
 static struct lu_device *echo_device_fini(const struct lu_env *env,
-                                          struct lu_device *d)
+                                         struct lu_device *d)
 {
-        struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
-        struct lu_device *next = ed->ed_next;
+       struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
+       struct lu_device *next = ed->ed_next;
 
-        while (next && !ed->ed_next_ismd)
-                next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
-        return NULL;
+       while (next && !ed->ed_next_ismd)
+               next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
+       return NULL;
 }
 
 static void echo_lock_release(const struct lu_env *env,
-                              struct echo_lock *ecl,
-                              int still_used)
+                             struct echo_lock *ecl,
+                             int still_used)
 {
-        struct cl_lock *clk = echo_lock2cl(ecl);
+       struct cl_lock *clk = echo_lock2cl(ecl);
 
-        cl_lock_get(clk);
-        cl_unuse(env, clk);
-        cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
-        if (!still_used) {
-                cl_lock_mutex_get(env, clk);
-                cl_lock_cancel(env, clk);
-                cl_lock_delete(env, clk);
-                cl_lock_mutex_put(env, clk);
-        }
-        cl_lock_put(env, clk);
+       cl_lock_release(env, clk);
 }
 
 static struct lu_device *echo_device_free(const struct lu_env *env,
-                                          struct lu_device *d)
+                                         struct lu_device *d)
 {
-        struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
-        struct echo_client_obd *ec   = ed->ed_ec;
-        struct echo_object     *eco;
-        struct lu_device       *next = ed->ed_next;
+       struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
+       struct echo_client_obd *ec   = ed->ed_ec;
+       struct echo_object     *eco;
+       struct lu_device       *next = ed->ed_next;
 
-        CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
-               ed, next);
+       CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
+              ed, next);
 
-        lu_site_purge(env, &ed->ed_site->cs_lu, -1);
+       lu_site_purge(env, ed->ed_site, -1);
 
-        /* check if there are objects still alive.
-         * It shouldn't have any object because lu_site_purge would cleanup
-         * all of cached objects. Anyway, probably the echo device is being
-         * parallelly accessed.
-         */
+       /*
+        * check if there are objects still alive.
+        * It shouldn't have any object because lu_site_purge would cleanup
+        * all of cached objects. Anyway, probably the echo device is being
+        * parallelly accessed.
+        */
        spin_lock(&ec->ec_lock);
-       cfs_list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
+       list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
                eco->eo_deleted = 1;
        spin_unlock(&ec->ec_lock);
 
        /* purge again */
-       lu_site_purge(env, &ed->ed_site->cs_lu, -1);
+       lu_site_purge(env, ed->ed_site, -1);
 
        CDEBUG(D_INFO,
               "Waiting for the reference of echo object to be dropped\n");
 
        /* Wait for the last reference to be dropped. */
        spin_lock(&ec->ec_lock);
-       while (!cfs_list_empty(&ec->ec_objects)) {
+       while (!list_empty(&ec->ec_objects)) {
                spin_unlock(&ec->ec_lock);
-               CERROR("echo_client still has objects at cleanup time, "
-                      "wait for 1 second\n");
-               schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
-                                                  cfs_time_seconds(1));
-               lu_site_purge(env, &ed->ed_site->cs_lu, -1);
+               CERROR(
+                      "echo_client still has objects at cleanup time, wait for 1 second\n");
+               set_current_state(TASK_UNINTERRUPTIBLE);
+               schedule_timeout(cfs_time_seconds(1));
+               lu_site_purge(env, ed->ed_site, -1);
                spin_lock(&ec->ec_lock);
        }
        spin_unlock(&ec->ec_lock);
 
-        LASSERT(cfs_list_empty(&ec->ec_locks));
+       LASSERT(list_empty(&ec->ec_locks));
 
        CDEBUG(D_INFO, "No object exists, exiting...\n");
 
        echo_client_cleanup(d->ld_obd);
 #ifdef HAVE_SERVER_SUPPORT
        echo_fid_fini(d->ld_obd);
+       echo_ed_los_fini(env, ed);
 #endif
        while (next && !ed->ed_next_ismd)
                next = next->ld_type->ldt_ops->ldto_device_free(env, next);
 
-        LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
-        echo_site_fini(env, ed);
-        cl_device_fini(&ed->ed_cl);
-        OBD_FREE_PTR(ed);
+       LASSERT(ed->ed_site == d->ld_site);
+       echo_site_fini(env, ed);
+       cl_device_fini(&ed->ed_cl);
+       OBD_FREE_PTR(ed);
 
-        return NULL;
+       cl_env_cache_purge(~0);
+
+       return NULL;
 }
 
 static const struct lu_device_type_operations echo_device_type_ops = {
-        .ldto_init = echo_type_init,
-        .ldto_fini = echo_type_fini,
+       .ldto_init = echo_type_init,
+       .ldto_fini = echo_type_fini,
 
-        .ldto_start = echo_type_start,
-        .ldto_stop  = echo_type_stop,
+       .ldto_start = echo_type_start,
+       .ldto_stop  = echo_type_stop,
 
-        .ldto_device_alloc = echo_device_alloc,
-        .ldto_device_free  = echo_device_free,
-        .ldto_device_init  = echo_device_init,
-        .ldto_device_fini  = echo_device_fini
+       .ldto_device_alloc = echo_device_alloc,
+       .ldto_device_free  = echo_device_free,
+       .ldto_device_init  = echo_device_init,
+       .ldto_device_fini  = echo_device_fini
 };
 
 static struct lu_device_type echo_device_type = {
-        .ldt_tags     = LU_DEVICE_CL,
-        .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
-        .ldt_ops      = &echo_device_type_ops,
-        .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
+       .ldt_tags     = LU_DEVICE_CL,
+       .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
+       .ldt_ops      = &echo_device_type_ops,
+       .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
 };
 /** @} echo_init */
 
@@ -1054,396 +1139,301 @@ static struct lu_device_type echo_device_type = {
  */
 
 /* Interfaces to echo client obd device */
-static struct echo_object *cl_echo_object_find(struct echo_device *d,
-                                               struct lov_stripe_md **lsmp)
-{
-        struct lu_env *env;
-        struct echo_thread_info *info;
-        struct echo_object_conf *conf;
-        struct lov_stripe_md    *lsm;
-        struct echo_object *eco;
-        struct cl_object   *obj;
-        struct lu_fid *fid;
-        int refcheck;
+static struct echo_object *
+cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
+{
+       struct lu_env *env;
+       struct echo_thread_info *info;
+       struct echo_object_conf *conf;
+       struct echo_object *eco;
+       struct cl_object *obj;
+       struct lov_oinfo *oinfo = NULL;
+       struct lu_fid *fid;
+       __u16  refcheck;
        int rc;
+
        ENTRY;
+       LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
+       LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
+
+       /* Never return an object if the obd is to be freed. */
+       if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
+               RETURN(ERR_PTR(-ENODEV));
+
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN((void *)env);
+
+       info = echo_env_info(env);
+       conf = &info->eti_conf;
+       if (d->ed_next) {
+               OBD_ALLOC_PTR(oinfo);
+               if (!oinfo)
+                       GOTO(out, eco = ERR_PTR(-ENOMEM));
+
+               oinfo->loi_oi = *oi;
+               conf->eoc_cl.u.coc_oinfo = oinfo;
+       }
+
+       /*
+        * If echo_object_init() is successful then ownership of oinfo
+        * is transferred to the object.
+        */
+       conf->eoc_oinfo = &oinfo;
 
-       LASSERT(lsmp);
-       lsm = *lsmp;
-       LASSERT(lsm);
-       LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
-       LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
-                POSTID(&lsm->lsm_oi));
-
-        /* Never return an object if the obd is to be freed. */
-        if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
-                RETURN(ERR_PTR(-ENODEV));
-
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN((void *)env);
-
-        info = echo_env_info(env);
-        conf = &info->eti_conf;
-        if (d->ed_next) {
-                if (!d->ed_next_islov) {
-                        struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
-                        LASSERT(oinfo != NULL);
-                       oinfo->loi_oi = lsm->lsm_oi;
-                        conf->eoc_cl.u.coc_oinfo = oinfo;
-                } else {
-                        struct lustre_md *md;
-                        md = &info->eti_md;
-                        memset(md, 0, sizeof *md);
-                        md->lsm = lsm;
-                        conf->eoc_cl.u.coc_md = md;
-                }
-        }
-        conf->eoc_md = lsmp;
-
-       fid  = &info->eti_fid;
-       rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
+       fid = &info->eti_fid;
+       rc = ostid_to_fid(fid, oi, 0);
        if (rc != 0)
                GOTO(out, eco = ERR_PTR(rc));
 
-       /* In the function below, .hs_keycmp resolves to
-        * lu_obj_hop_keycmp() */
+       /*
+        * In the function below, .hs_keycmp resolves to
+        * lu_obj_hop_keycmp()
+        */
        /* coverity[overrun-buffer-val] */
-        obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
-        if (IS_ERR(obj))
-                GOTO(out, eco = (void*)obj);
-
-        eco = cl2echo_obj(obj);
-        if (eco->eo_deleted) {
-                cl_object_put(env, obj);
-                eco = ERR_PTR(-EAGAIN);
-        }
+       obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
+       if (IS_ERR(obj))
+               GOTO(out, eco = (void *)obj);
+
+       eco = cl2echo_obj(obj);
+       if (eco->eo_deleted) {
+               cl_object_put(env, obj);
+               eco = ERR_PTR(-EAGAIN);
+       }
 
 out:
-        cl_env_put(env, &refcheck);
-        RETURN(eco);
+       if (oinfo)
+               OBD_FREE_PTR(oinfo);
+
+       cl_env_put(env, &refcheck);
+       RETURN(eco);
 }
 
 static int cl_echo_object_put(struct echo_object *eco)
 {
-        struct lu_env *env;
-        struct cl_object *obj = echo_obj2cl(eco);
-        int refcheck;
-        ENTRY;
+       struct lu_env *env;
+       struct cl_object *obj = echo_obj2cl(eco);
+       __u16  refcheck;
+
+       ENTRY;
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
 
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
+       /* an external function to kill an object? */
+       if (eco->eo_deleted) {
+               struct lu_object_header *loh = obj->co_lu.lo_header;
 
-        /* an external function to kill an object? */
-        if (eco->eo_deleted) {
-                struct lu_object_header *loh = obj->co_lu.lo_header;
-                LASSERT(&eco->eo_hdr == luh2coh(loh));
+               LASSERT(&eco->eo_hdr == luh2coh(loh));
                set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
-        }
+       }
 
-        cl_object_put(env, obj);
-        cl_env_put(env, &refcheck);
-        RETURN(0);
+       cl_object_put(env, obj);
+       cl_env_put(env, &refcheck);
+       RETURN(0);
 }
 
 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
-                            obd_off start, obd_off end, int mode,
-                            __u64 *cookie , __u32 enqflags)
-{
-        struct cl_io *io;
-        struct cl_lock *lck;
-        struct cl_object *obj;
-        struct cl_lock_descr *descr;
-        struct echo_thread_info *info;
-        int rc = -ENOMEM;
-        ENTRY;
-
-        info = echo_env_info(env);
-        io = &info->eti_io;
-        descr = &info->eti_descr;
-        obj = echo_obj2cl(eco);
-
-        descr->cld_obj   = obj;
-        descr->cld_start = cl_index(obj, start);
-        descr->cld_end   = cl_index(obj, end);
-        descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
-        descr->cld_enq_flags = enqflags;
-        io->ci_obj = obj;
-
-        lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
-        if (lck) {
-                struct echo_client_obd *ec = eco->eo_dev->ed_ec;
-                struct echo_lock *el;
-
-                rc = cl_wait(env, lck);
-                if (rc == 0) {
-                        el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
-                       spin_lock(&ec->ec_lock);
-                       if (cfs_list_empty(&el->el_chain)) {
-                               cfs_list_add(&el->el_chain, &ec->ec_locks);
-                               el->el_cookie = ++ec->ec_unique;
-                       }
-                       atomic_inc(&el->el_refcount);
-                       *cookie = el->el_cookie;
-                       spin_unlock(&ec->ec_lock);
-               } else {
-                       cl_lock_release(env, lck, "ec enqueue", current);
-               }
-       }
-       RETURN(rc);
-}
-
-static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end,
-                           int mode, __u64 *cookie)
+                           u64 start, u64 end, int mode,
+                           __u64 *cookie, __u32 enqflags)
 {
-        struct echo_thread_info *info;
-        struct lu_env *env;
-        struct cl_io *io;
-        int refcheck;
-        int result;
-        ENTRY;
-
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
-
-        info = echo_env_info(env);
-        io = &info->eti_io;
-
-       io->ci_ignore_layout = 1;
-        result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
-        if (result < 0)
-                GOTO(out, result);
-        LASSERT(result == 0);
+       struct cl_io *io;
+       struct cl_lock *lck;
+       struct cl_object *obj;
+       struct cl_lock_descr *descr;
+       struct echo_thread_info *info;
+       int rc = -ENOMEM;
 
-        result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
-        cl_io_fini(env, io);
+       ENTRY;
+       info = echo_env_info(env);
+       io = &info->eti_io;
+       lck = &info->eti_lock;
+       obj = echo_obj2cl(eco);
+
+       memset(lck, 0, sizeof(*lck));
+       descr = &lck->cll_descr;
+       descr->cld_obj   = obj;
+       descr->cld_start = cl_index(obj, start);
+       descr->cld_end   = cl_index(obj, end);
+       descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
+       descr->cld_enq_flags = enqflags;
+       io->ci_obj = obj;
+
+       rc = cl_lock_request(env, io, lck);
+       if (rc == 0) {
+               struct echo_client_obd *ec = eco->eo_dev->ed_ec;
+               struct echo_lock *el;
 
-        EXIT;
-out:
-        cl_env_put(env, &refcheck);
-        return result;
+               el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
+               spin_lock(&ec->ec_lock);
+               if (list_empty(&el->el_chain)) {
+                       list_add(&el->el_chain, &ec->ec_locks);
+                       el->el_cookie = ++ec->ec_unique;
+               }
+               atomic_inc(&el->el_refcount);
+               *cookie = el->el_cookie;
+               spin_unlock(&ec->ec_lock);
+       }
+       RETURN(rc);
 }
 
 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
-                           __u64 cookie)
+                          __u64 cookie)
 {
-        struct echo_client_obd *ec = ed->ed_ec;
-        struct echo_lock       *ecl = NULL;
-        cfs_list_t             *el;
-        int found = 0, still_used = 0;
-        ENTRY;
+       struct echo_client_obd *ec = ed->ed_ec;
+       struct echo_lock *ecl = NULL;
+       struct list_head *el;
+       int found = 0, still_used = 0;
 
-        LASSERT(ec != NULL);
+       ENTRY;
+       LASSERT(ec != NULL);
        spin_lock(&ec->ec_lock);
-        cfs_list_for_each (el, &ec->ec_locks) {
-                ecl = cfs_list_entry (el, struct echo_lock, el_chain);
-                CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
-                found = (ecl->el_cookie == cookie);
-                if (found) {
+       list_for_each(el, &ec->ec_locks) {
+               ecl = list_entry(el, struct echo_lock, el_chain);
+               CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
+               found = (ecl->el_cookie == cookie);
+               if (found) {
                        if (atomic_dec_and_test(&ecl->el_refcount))
-                                cfs_list_del_init(&ecl->el_chain);
-                        else
-                                still_used = 1;
-                        break;
-                }
-        }
+                               list_del_init(&ecl->el_chain);
+                       else
+                               still_used = 1;
+                       break;
+               }
+       }
        spin_unlock(&ec->ec_lock);
 
-        if (!found)
-                RETURN(-ENOENT);
+       if (!found)
+               RETURN(-ENOENT);
 
-        echo_lock_release(env, ecl, still_used);
-        RETURN(0);
-}
-
-static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
-{
-        struct lu_env *env;
-        int refcheck;
-        int rc;
-        ENTRY;
-
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
-
-        rc = cl_echo_cancel0(env, ed, cookie);
-
-        cl_env_put(env, &refcheck);
-        RETURN(rc);
+       echo_lock_release(env, ecl, still_used);
+       RETURN(0);
 }
 
 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
-                               struct cl_page *page)
+                                struct pagevec *pvec)
 {
        struct echo_thread_info *info;
        struct cl_2queue        *queue;
+       int i = 0;
 
        info = echo_env_info(env);
        LASSERT(io == &info->eti_io);
 
        queue = &info->eti_queue;
-       cl_page_list_add(&queue->c2_qout, page);
-}
 
-static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
-                             struct page **pages, int npages, int async)
-{
-        struct lu_env           *env;
-        struct echo_thread_info *info;
-        struct cl_object        *obj = echo_obj2cl(eco);
-        struct echo_device      *ed  = eco->eo_dev;
-        struct cl_2queue        *queue;
-        struct cl_io            *io;
-        struct cl_page          *clp;
-        struct lustre_handle    lh = { 0 };
-        int page_size = cl_page_size(obj);
-        int refcheck;
-        int rc;
-        int i;
-        ENTRY;
-
-        LASSERT((offset & ~CFS_PAGE_MASK) == 0);
-        LASSERT(ed->ed_next != NULL);
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
-
-        info    = echo_env_info(env);
-        io      = &info->eti_io;
-        queue   = &info->eti_queue;
-
-        cl_2queue_init(queue);
-
-       io->ci_ignore_layout = 1;
-        rc = cl_io_init(env, io, CIT_MISC, obj);
-        if (rc < 0)
-                GOTO(out, rc);
-        LASSERT(rc == 0);
-
-
-        rc = cl_echo_enqueue0(env, eco, offset,
-                             offset + npages * PAGE_CACHE_SIZE - 1,
-                              rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
-                              CEF_NEVER);
-        if (rc < 0)
-                GOTO(error_lock, rc);
-
-        for (i = 0; i < npages; i++) {
-                LASSERT(pages[i]);
-                clp = cl_page_find(env, obj, cl_index(obj, offset),
-                                   pages[i], CPT_TRANSIENT);
-                if (IS_ERR(clp)) {
-                        rc = PTR_ERR(clp);
-                        break;
-                }
-                LASSERT(clp->cp_type == CPT_TRANSIENT);
-
-                rc = cl_page_own(env, io, clp);
-                if (rc) {
-                        LASSERT(clp->cp_state == CPS_FREEING);
-                        cl_page_put(env, clp);
-                        break;
-                }
-
-                cl_2queue_add(queue, clp);
-
-                /* drop the reference count for cl_page_find, so that the page
-                 * will be freed in cl_2queue_fini. */
-                cl_page_put(env, clp);
-                cl_page_clip(env, clp, 0, page_size);
-
-                offset += page_size;
-        }
-
-        if (rc == 0) {
-                enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
-
-                async = async && (typ == CRT_WRITE);
-                if (async)
-                       rc = cl_io_commit_async(env, io, &queue->c2_qin,
-                                               0, PAGE_SIZE,
-                                               echo_commit_callback);
-               else
-                       rc = cl_io_submit_sync(env, io, typ, queue, 0);
-                CDEBUG(D_INFO, "echo_client %s write returns %d\n",
-                       async ? "async" : "sync", rc);
-        }
+       for (i = 0; i < pagevec_count(pvec); i++) {
+               struct page *vmpage = pvec->pages[i];
+               struct cl_page *page = (struct cl_page *)vmpage->private;
 
-        cl_echo_cancel0(env, ed, lh.cookie);
-        EXIT;
-error_lock:
-        cl_2queue_discard(env, io, queue);
-        cl_2queue_disown(env, io, queue);
-        cl_2queue_fini(env, queue);
-        cl_io_fini(env, io);
-out:
-        cl_env_put(env, &refcheck);
-        return rc;
+               cl_page_list_add(&queue->c2_qout, page);
+       }
 }
-/** @} echo_exports */
 
+static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
+                             struct page **pages, int npages, int async)
+{
+       struct lu_env           *env;
+       struct echo_thread_info *info;
+       struct cl_object        *obj = echo_obj2cl(eco);
+       struct echo_device      *ed  = eco->eo_dev;
+       struct cl_2queue        *queue;
+       struct cl_io            *io;
+       struct cl_page          *clp;
+       struct lustre_handle    lh = { 0 };
+       int page_size = cl_page_size(obj);
+       int rc;
+       int i;
+       __u16 refcheck;
 
-static obd_id last_object_id;
+       ENTRY;
+       LASSERT((offset & ~PAGE_MASK) == 0);
+       LASSERT(ed->ed_next != NULL);
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
 
-static int
-echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
-{
-        struct lov_stripe_md *ulsm = _ulsm;
-        int nob, i;
+       info    = echo_env_info(env);
+       io      = &info->eti_io;
+       queue   = &info->eti_queue;
 
-        nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
-        if (nob > ulsm_nob)
-                return (-EINVAL);
+       cl_2queue_init(queue);
 
-       if (copy_to_user (ulsm, lsm, sizeof(*ulsm)))
-                return (-EFAULT);
+       io->ci_ignore_layout = 1;
+       rc = cl_io_init(env, io, CIT_MISC, obj);
+       if (rc < 0)
+               GOTO(out, rc);
+       LASSERT(rc == 0);
 
-        for (i = 0; i < lsm->lsm_stripe_count; i++) {
-               if (copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
-                                      sizeof(lsm->lsm_oinfo[0])))
-                        return (-EFAULT);
-        }
-        return 0;
-}
+       rc = cl_echo_enqueue0(env, eco, offset,
+                             offset + npages * PAGE_SIZE - 1,
+                             rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
+                             CEF_NEVER);
+       if (rc < 0)
+               GOTO(error_lock, rc);
+
+       for (i = 0; i < npages; i++) {
+               LASSERT(pages[i]);
+               clp = cl_page_find(env, obj, cl_index(obj, offset),
+                                  pages[i], CPT_TRANSIENT);
+               if (IS_ERR(clp)) {
+                       rc = PTR_ERR(clp);
+                       break;
+               }
+               LASSERT(clp->cp_type == CPT_TRANSIENT);
 
-static int
-echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
-                 void *ulsm, int ulsm_nob)
-{
-        struct echo_client_obd *ec = ed->ed_ec;
-        int                     i;
+               rc = cl_page_own(env, io, clp);
+               if (rc) {
+                       LASSERT(clp->cp_state == CPS_FREEING);
+                       cl_page_put(env, clp);
+                       break;
+               }
+
+               cl_2queue_add(queue, clp);
 
-        if (ulsm_nob < sizeof (*lsm))
-                return (-EINVAL);
+               /*
+                * drop the reference count for cl_page_find, so that the page
+                * will be freed in cl_2queue_fini.
+                */
+               cl_page_put(env, clp);
+               cl_page_clip(env, clp, 0, page_size);
 
-       if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
-                return (-EFAULT);
+               offset += page_size;
+       }
 
-        if (lsm->lsm_stripe_count > ec->ec_nstripes ||
-            lsm->lsm_magic != LOV_MAGIC ||
-            (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
-            ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
-                return (-EINVAL);
+       if (rc == 0) {
+               enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
 
+               async = async && (typ == CRT_WRITE);
+               if (async)
+                       rc = cl_io_commit_async(env, io, &queue->c2_qin,
+                                               0, PAGE_SIZE,
+                                               echo_commit_callback);
+               else
+                       rc = cl_io_submit_sync(env, io, typ, queue, 0);
+               CDEBUG(D_INFO, "echo_client %s write returns %d\n",
+                      async ? "async" : "sync", rc);
+       }
 
-        for (i = 0; i < lsm->lsm_stripe_count; i++) {
-               if (copy_from_user(lsm->lsm_oinfo[i],
-                                       ((struct lov_stripe_md *)ulsm)-> \
-                                       lsm_oinfo[i],
-                                       sizeof(lsm->lsm_oinfo[0])))
-                        return (-EFAULT);
-        }
-        return (0);
+       cl_echo_cancel0(env, ed, lh.cookie);
+       EXIT;
+error_lock:
+       cl_2queue_discard(env, io, queue);
+       cl_2queue_disown(env, io, queue);
+       cl_2queue_fini(env, queue);
+       cl_io_fini(env, io);
+out:
+       cl_env_put(env, &refcheck);
+       return rc;
 }
+/** @} echo_exports */
+
+static u64 last_object_id;
 
 #ifdef HAVE_SERVER_SUPPORT
 static inline void echo_md_build_name(struct lu_name *lname, char *name,
                                      __u64 id)
 {
-       sprintf(name, LPU64, id);
+       snprintf(name, ETI_NAME_LEN, "%llu", id);
        lname->ln_name = name;
        lname->ln_namelen = strlen(name);
 }
@@ -1452,14 +1442,19 @@ static inline void echo_md_build_name(struct lu_name *lname, char *name,
 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
                            struct md_attr *ma)
 {
-       struct echo_thread_info *info = echo_env_info(env);
-       int                      rc;
+       struct echo_thread_info *info = echo_env_info(env);
+       int rc;
 
        ENTRY;
 
        LASSERT(ma->ma_lmm_size > 0);
 
-       rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
+       LASSERT(ma->ma_need & (MA_LOV | MA_LMV));
+       if (ma->ma_need & MA_LOV)
+               rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
+       else
+               rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LMV);
+
        if (rc < 0)
                RETURN(rc);
 
@@ -1477,7 +1472,7 @@ static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
                }
 
                OBD_ALLOC_LARGE(info->eti_big_lmm, size);
-               if (info->eti_big_lmm == NULL)
+               if (!info->eti_big_lmm)
                        RETURN(-ENOMEM);
                info->eti_big_lmmsize = size;
        }
@@ -1485,11 +1480,18 @@ static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
 
        info->eti_buf.lb_buf = info->eti_big_lmm;
        info->eti_buf.lb_len = info->eti_big_lmmsize;
-       rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
+       if (ma->ma_need & MA_LOV)
+               rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
+       else
+               rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LMV);
        if (rc < 0)
                RETURN(rc);
 
-       ma->ma_valid |= MA_LOV;
+       if (ma->ma_need & MA_LOV)
+               ma->ma_valid |= MA_LOV;
+       else
+               ma->ma_valid |= MA_LMV;
+
        ma->ma_lmm = info->eti_big_lmm;
        ma->ma_lmm_size = rc;
 
@@ -1502,46 +1504,62 @@ static int echo_attr_get_complex(const struct lu_env *env,
 {
        struct echo_thread_info *info = echo_env_info(env);
        struct lu_buf           *buf = &info->eti_buf;
-       umode_t          mode = lu_object_attr(&next->mo_lu);
-       int                      need = ma->ma_need;
+       umode_t                  mode = lu_object_attr(&next->mo_lu);
        int                      rc = 0, rc2;
 
        ENTRY;
 
        ma->ma_valid = 0;
 
-       if (need & MA_INODE) {
-               ma->ma_need = MA_INODE;
+       if (ma->ma_need & MA_INODE) {
                rc = mo_attr_get(env, next, ma);
                if (rc)
                        GOTO(out, rc);
                ma->ma_valid |= MA_INODE;
        }
 
-       if (need & MA_LOV) {
-               if (S_ISREG(mode) || S_ISDIR(mode)) {
-                       LASSERT(ma->ma_lmm_size > 0);
-                       buf->lb_buf = ma->ma_lmm;
-                       buf->lb_len = ma->ma_lmm_size;
-                       rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
-                       if (rc2 > 0) {
-                               ma->ma_lmm_size = rc2;
-                               ma->ma_valid |= MA_LOV;
-                       } else if (rc2 == -ENODATA) {
-                               /* no LOV EA */
-                               ma->ma_lmm_size = 0;
-                       } else if (rc2 == -ERANGE) {
-                               rc2 = echo_big_lmm_get(env, next, ma);
-                               if (rc2 < 0)
-                                       GOTO(out, rc = rc2);
-                       } else {
+       if ((ma->ma_need & MA_LOV) && (S_ISREG(mode) || S_ISDIR(mode))) {
+               LASSERT(ma->ma_lmm_size > 0);
+               buf->lb_buf = ma->ma_lmm;
+               buf->lb_len = ma->ma_lmm_size;
+               rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
+               if (rc2 > 0) {
+                       ma->ma_lmm_size = rc2;
+                       ma->ma_valid |= MA_LOV;
+               } else if (rc2 == -ENODATA) {
+                       /* no LOV EA */
+                       ma->ma_lmm_size = 0;
+               } else if (rc2 == -ERANGE) {
+                       rc2 = echo_big_lmm_get(env, next, ma);
+                       if (rc2 < 0)
                                GOTO(out, rc = rc2);
-                       }
+               } else {
+                       GOTO(out, rc = rc2);
+               }
+       }
+
+       if ((ma->ma_need & MA_LMV) && S_ISDIR(mode)) {
+               LASSERT(ma->ma_lmm_size > 0);
+               buf->lb_buf = ma->ma_lmm;
+               buf->lb_len = ma->ma_lmm_size;
+               rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV);
+               if (rc2 > 0) {
+                       ma->ma_lmm_size = rc2;
+                       ma->ma_valid |= MA_LMV;
+               } else if (rc2 == -ENODATA) {
+                       /* no LMV EA */
+                       ma->ma_lmm_size = 0;
+               } else if (rc2 == -ERANGE) {
+                       rc2 = echo_big_lmm_get(env, next, ma);
+                       if (rc2 < 0)
+                               GOTO(out, rc = rc2);
+               } else {
+                       GOTO(out, rc = rc2);
                }
        }
 
-#ifdef CONFIG_FS_POSIX_ACL
-       if (need & MA_ACL_DEF && S_ISDIR(mode)) {
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
+       if ((ma->ma_need & MA_ACL_DEF) && S_ISDIR(mode)) {
                buf->lb_buf = ma->ma_acl;
                buf->lb_len = ma->ma_acl_size;
                rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
@@ -1557,8 +1575,7 @@ static int echo_attr_get_complex(const struct lu_env *env,
        }
 #endif
 out:
-       ma->ma_need = need;
-       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
+       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
               rc, ma->ma_valid, ma->ma_lmm);
        RETURN(rc);
 }
@@ -1586,36 +1603,36 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
 
        ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
                                     fid, &conf);
-        if (IS_ERR(ec_child)) {
-                CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
-                        PTR_ERR(ec_child));
+       if (IS_ERR(ec_child)) {
+               CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
+                       PTR_ERR(ec_child));
                RETURN(PTR_ERR(ec_child));
-        }
+       }
 
-        child = lu_object_locate(ec_child->lo_header, ld->ld_type);
-        if (child == NULL) {
-                CERROR("Can not locate the child "DFID"\n", PFID(fid));
-                GOTO(out_put, rc = -EINVAL);
-        }
+       child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+       if (!child) {
+               CERROR("Can not locate the child "DFID"\n", PFID(fid));
+               GOTO(out_put, rc = -EINVAL);
+       }
 
-        CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
-               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
+       CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
+              PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
        /*
         * Do not perform lookup sanity check. We know that name does not exist.
         */
        spec->sp_cr_lookup = 0;
-        rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
-        if (rc) {
-                CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
-                GOTO(out_put, rc);
-        }
-        CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
-               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
+       rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
+       if (rc) {
+               CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
+               GOTO(out_put, rc);
+       }
+       CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
+              PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
        EXIT;
 out_put:
-        lu_object_put(env, ec_child);
-        return rc;
+       lu_object_put(env, ec_child);
+       return rc;
 }
 
 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
@@ -1635,313 +1652,448 @@ static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
        return 0;
 }
 
+static int
+echo_md_dir_stripe_choose(const struct lu_env *env, struct echo_device *ed,
+                         struct lu_object *obj, const char *name,
+                         unsigned int namelen, __u64 id,
+                         struct lu_object **new_parent)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       struct md_attr          *ma = &info->eti_ma;
+       struct lmv_mds_md_v1    *lmv;
+       struct lu_device        *ld = ed->ed_next;
+       unsigned int            idx;
+       struct lu_name          tmp_ln_name;
+       struct lu_fid           stripe_fid;
+       struct lu_object        *stripe_obj;
+       int                     rc;
+
+       LASSERT(obj != NULL);
+       LASSERT(S_ISDIR(obj->lo_header->loh_attr));
+
+       memset(ma, 0, sizeof(*ma));
+       echo_set_lmm_size(env, ld, ma);
+       ma->ma_need = MA_LMV;
+       rc = echo_attr_get_complex(env, lu2md(obj), ma);
+       if (rc) {
+               CERROR("Can not getattr child "DFID": rc = %d\n",
+                       PFID(lu_object_fid(obj)), rc);
+               return rc;
+       }
+
+       if (!(ma->ma_valid & MA_LMV)) {
+               *new_parent = obj;
+               return 0;
+       }
+
+       lmv = (struct lmv_mds_md_v1 *)ma->ma_lmm;
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) {
+               rc = -EINVAL;
+               CERROR("Invalid mds md magic %x "DFID": rc = %d\n",
+                      le32_to_cpu(lmv->lmv_magic), PFID(lu_object_fid(obj)),
+                      rc);
+               return rc;
+       }
+
+       if (name) {
+               tmp_ln_name.ln_name = name;
+               tmp_ln_name.ln_namelen = namelen;
+       } else {
+               LASSERT(id != -1);
+               echo_md_build_name(&tmp_ln_name, info->eti_name, id);
+       }
+
+       idx = lmv_name_to_stripe_index(LMV_HASH_TYPE_FNV_1A_64,
+                               le32_to_cpu(lmv->lmv_stripe_count),
+                               tmp_ln_name.ln_name, tmp_ln_name.ln_namelen);
+
+       LASSERT(idx < le32_to_cpu(lmv->lmv_stripe_count));
+       fid_le_to_cpu(&stripe_fid, &lmv->lmv_stripe_fids[idx]);
+
+       stripe_obj = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, &stripe_fid,
+                                      NULL);
+       if (IS_ERR(stripe_obj)) {
+               rc = PTR_ERR(stripe_obj);
+               CERROR("Can not find the parent "DFID": rc = %d\n",
+                      PFID(&stripe_fid), rc);
+               return rc;
+       }
+
+       *new_parent = lu_object_locate(stripe_obj->lo_header, ld->ld_type);
+       if (!*new_parent) {
+               lu_object_put(env, stripe_obj);
+               RETURN(-ENXIO);
+       }
+
+       return rc;
+}
+
 static int echo_create_md_object(const struct lu_env *env,
-                                 struct echo_device *ed,
-                                 struct lu_object *ec_parent,
-                                 struct lu_fid *fid,
-                                 char *name, int namelen,
-                                 __u64 id, __u32 mode, int count,
-                                 int stripe_count, int stripe_offset)
-{
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        struct md_op_spec       *spec = &info->eti_spec;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+                                struct echo_device *ed,
+                                struct lu_object *ec_parent,
+                                struct lu_fid *fid,
+                                char *name, int namelen,
+                                 __u64 id, __u32 mode, int count,
+                                int stripe_count, int stripe_offset)
+{
+       struct lu_object *parent;
+       struct lu_object *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name *lname = &info->eti_lname;
+       struct md_op_spec *spec = &info->eti_spec;
+       struct md_attr *ma = &info->eti_ma;
+       struct lu_device *ld = ed->ed_next;
+       int rc = 0;
+       int i;
 
        ENTRY;
 
-       if (ec_parent == NULL)
+       if (!ec_parent)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-       if (parent == NULL)
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
                RETURN(-ENXIO);
 
-        memset(ma, 0, sizeof(*ma));
-        memset(spec, 0, sizeof(*spec));
-        if (stripe_count != 0) {
-                spec->sp_cr_flags |= FMODE_WRITE;
-               echo_set_lmm_size(env, ld, ma);
-                if (stripe_count != -1) {
-                        struct lov_user_md_v3 *lum = &info->eti_lum;
-
-                        lum->lmm_magic = LOV_USER_MAGIC_V3;
-                        lum->lmm_stripe_count = stripe_count;
-                        lum->lmm_stripe_offset = stripe_offset;
-                        lum->lmm_pattern = 0;
-                        spec->u.sp_ea.eadata = lum;
-                       spec->u.sp_ea.eadatalen = sizeof(*lum);
-                        spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
-                }
-        }
-
-        ma->ma_attr.la_mode = mode;
-       ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
-        ma->ma_attr.la_ctime = cfs_time_current_64();
-
-        if (name != NULL) {
-                lname->ln_name = name;
-                lname->ln_namelen = namelen;
-                /* If name is specified, only create one object by name */
-                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
-                                             spec, ma);
+       rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
+                                      id, &new_parent);
+       if (rc != 0)
                RETURN(rc);
-        }
 
-        /* Create multiple object sequenced by id */
-        for (i = 0; i < count; i++) {
-                char *tmp_name = info->eti_name;
+       LASSERT(new_parent != NULL);
+       memset(ma, 0, sizeof(*ma));
+       memset(spec, 0, sizeof(*spec));
+       echo_set_lmm_size(env, ld, ma);
+       if (stripe_count != 0) {
+               spec->sp_cr_flags |= MDS_FMODE_WRITE;
+               if (stripe_count != -1) {
+                       if (S_ISDIR(mode)) {
+                               struct lmv_user_md *lmu;
+
+                               lmu = (struct lmv_user_md *)&info->eti_lum;
+                               lmu->lum_magic = LMV_USER_MAGIC;
+                               lmu->lum_stripe_offset = stripe_offset;
+                               lmu->lum_stripe_count = stripe_count;
+                               lmu->lum_hash_type = LMV_HASH_TYPE_FNV_1A_64;
+                               spec->u.sp_ea.eadata = lmu;
+                               spec->u.sp_ea.eadatalen = sizeof(*lmu);
+                       } else {
+                               struct lov_user_md_v3 *lum = &info->eti_lum;
+
+                               lum->lmm_magic = LOV_USER_MAGIC_V3;
+                               lum->lmm_stripe_count = stripe_count;
+                               lum->lmm_stripe_offset = stripe_offset;
+                               lum->lmm_pattern = LOV_PATTERN_NONE;
+                               spec->u.sp_ea.eadata = lum;
+                               spec->u.sp_ea.eadatalen = sizeof(*lum);
+                       }
+                       spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
+               }
+       }
+
+       ma->ma_attr.la_mode = mode;
+       ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
+       ma->ma_attr.la_ctime = ktime_get_real_seconds();
+
+       if (name) {
+               lname->ln_name = name;
+               lname->ln_namelen = namelen;
+               /* If name is specified, only create one object by name */
+               rc = echo_md_create_internal(env, ed, lu2md(new_parent), fid,
+                                            lname, spec, ma);
+               GOTO(out_put, rc);
+       }
+
+       /* Create multiple object sequenced by id */
+       for (i = 0; i < count; i++) {
+               char *tmp_name = info->eti_name;
 
-                echo_md_build_name(lname, tmp_name, id);
+               echo_md_build_name(lname, tmp_name, id);
+
+               rc = echo_md_create_internal(env, ed, lu2md(new_parent),
+                                            fid, lname, spec, ma);
+               if (rc) {
+                       CERROR("Can not create child %s: rc = %d\n", tmp_name,
+                               rc);
+                       break;
+               }
+               id++;
+               fid->f_oid++;
+       }
 
-                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
-                                             spec, ma);
-                if (rc) {
-                        CERROR("Can not create child %s: rc = %d\n", tmp_name,
-                                rc);
-                        break;
-                }
-                id++;
-                fid->f_oid++;
-        }
+out_put:
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }
 
 static struct lu_object *echo_md_lookup(const struct lu_env *env,
-                                        struct echo_device *ed,
-                                        struct md_object *parent,
-                                        struct lu_name *lname)
-{
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_fid           *fid = &info->eti_fid;
-        struct lu_object        *child;
-        int    rc;
-        ENTRY;
-
-        CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
-               PFID(fid), parent);
-        rc = mdo_lookup(env, parent, lname, fid, NULL);
-        if (rc) {
-                CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
-                RETURN(ERR_PTR(rc));
-        }
-
-       /* In the function below, .hs_keycmp resolves to
-        * lu_obj_hop_keycmp() */
+                                       struct echo_device *ed,
+                                       struct md_object *parent,
+                                       struct lu_name *lname)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_fid *fid = &info->eti_fid;
+       struct lu_object *child;
+       int rc;
+
+       ENTRY;
+       CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
+              PFID(fid), parent);
+
+       rc = mdo_lookup(env, parent, lname, fid, NULL);
+       if (rc) {
+               CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
+               RETURN(ERR_PTR(rc));
+       }
+
+       /*
+        * In the function below, .hs_keycmp resolves to
+        * lu_obj_hop_keycmp()
+        */
        /* coverity[overrun-buffer-val] */
-        child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
+       child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
 
-        RETURN(child);
+       RETURN(child);
 }
 
 static int echo_setattr_object(const struct lu_env *env,
-                               struct echo_device *ed,
-                               struct lu_object *ec_parent,
-                               __u64 id, int count)
-{
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct lu_device        *ld = ed->ed_next;
-        struct lu_buf           *buf = &info->eti_buf;
-        int                      rc = 0;
-        int                      i;
+                              struct echo_device *ed,
+                              struct lu_object *ec_parent,
+                              __u64 id, int count)
+{
+       struct lu_object *parent;
+       struct lu_object *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name *lname = &info->eti_lname;
+       char *name = info->eti_name;
+       struct lu_device *ld = ed->ed_next;
+       struct lu_buf *buf = &info->eti_buf;
+       int rc = 0;
+       int i;
 
        ENTRY;
 
-       if (ec_parent == NULL)
+       if (!ec_parent)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-       if (parent == NULL)
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
                RETURN(-ENXIO);
 
-        for (i = 0; i < count; i++) {
-                struct lu_object *ec_child, *child;
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
 
-                echo_md_build_name(lname, name, id);
+       for (i = 0; i < count; i++) {
+               struct lu_object *ec_child, *child;
 
-                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
-                if (IS_ERR(ec_child)) {
-                        CERROR("Can't find child %s: rc = %ld\n",
-                                lname->ln_name, PTR_ERR(ec_child));
-                        RETURN(PTR_ERR(ec_child));
-                }
+               echo_md_build_name(lname, name, id);
 
-                child = lu_object_locate(ec_child->lo_header, ld->ld_type);
-                if (child == NULL) {
-                        CERROR("Can not locate the child %s\n", lname->ln_name);
-                        lu_object_put(env, ec_child);
-                        rc = -EINVAL;
-                        break;
-                }
+               ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
+               if (IS_ERR(ec_child)) {
+                       rc = PTR_ERR(ec_child);
+                       CERROR("Can't find child %s: rc = %d\n",
+                               lname->ln_name, rc);
+                       break;
+               }
 
-                CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
-                       PFID(lu_object_fid(child)));
+               child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+               if (!child) {
+                       CERROR("Can not locate the child %s\n", lname->ln_name);
+                       lu_object_put(env, ec_child);
+                       rc = -EINVAL;
+                       break;
+               }
+
+               CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
+                      PFID(lu_object_fid(child)));
 
                buf->lb_buf = info->eti_xattr_buf;
                buf->lb_len = sizeof(info->eti_xattr_buf);
 
-                sprintf(name, "%s.test1", XATTR_USER_PREFIX);
-                rc = mo_xattr_set(env, lu2md(child), buf, name,
-                                  LU_XATTR_CREATE);
+               sprintf(name, "%s.test1", XATTR_USER_PREFIX);
+               rc = mo_xattr_set(env, lu2md(child), buf, name,
+                                 LU_XATTR_CREATE);
                if (rc < 0) {
-                        CERROR("Can not setattr child "DFID": rc = %d\n",
-                                PFID(lu_object_fid(child)), rc);
-                        lu_object_put(env, ec_child);
-                        break;
-                }
-                CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
-                       PFID(lu_object_fid(child)));
-                id++;
-                lu_object_put(env, ec_child);
-        }
+                       CERROR("Can not setattr child "DFID": rc = %d\n",
+                               PFID(lu_object_fid(child)), rc);
+                       lu_object_put(env, ec_child);
+                       break;
+               }
+               CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
+                      PFID(lu_object_fid(child)));
+               id++;
+               lu_object_put(env, ec_child);
+       }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
        RETURN(rc);
 }
 
 static int echo_getattr_object(const struct lu_env *env,
-                               struct echo_device *ed,
-                               struct lu_object *ec_parent,
-                               __u64 id, int count)
-{
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+                              struct echo_device *ed,
+                              struct lu_object *ec_parent,
+                              __u64 id, int count)
+{
+       struct lu_object *parent;
+       struct lu_object *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name *lname = &info->eti_lname;
+       char *name = info->eti_name;
+       struct md_attr *ma = &info->eti_ma;
+       struct lu_device *ld = ed->ed_next;
+       int rc = 0;
+       int i;
 
        ENTRY;
 
-       if (ec_parent == NULL)
+       if (!ec_parent)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-       if (parent == NULL)
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
                RETURN(-ENXIO);
 
-        memset(ma, 0, sizeof(*ma));
-        ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
-        ma->ma_acl = info->eti_xattr_buf;
-        ma->ma_acl_size = sizeof(info->eti_xattr_buf);
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
 
-        for (i = 0; i < count; i++) {
-                struct lu_object *ec_child, *child;
+       memset(ma, 0, sizeof(*ma));
+       ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
+       ma->ma_acl = info->eti_xattr_buf;
+       ma->ma_acl_size = sizeof(info->eti_xattr_buf);
 
-                ma->ma_valid = 0;
-                echo_md_build_name(lname, name, id);
+       for (i = 0; i < count; i++) {
+               struct lu_object *ec_child, *child;
+
+               ma->ma_valid = 0;
+               echo_md_build_name(lname, name, id);
                echo_set_lmm_size(env, ld, ma);
 
-                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
-                if (IS_ERR(ec_child)) {
-                        CERROR("Can't find child %s: rc = %ld\n",
-                               lname->ln_name, PTR_ERR(ec_child));
-                        RETURN(PTR_ERR(ec_child));
-                }
-
-                child = lu_object_locate(ec_child->lo_header, ld->ld_type);
-                if (child == NULL) {
-                        CERROR("Can not locate the child %s\n", lname->ln_name);
-                        lu_object_put(env, ec_child);
+               ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
+               if (IS_ERR(ec_child)) {
+                       CERROR("Can't find child %s: rc = %ld\n",
+                              lname->ln_name, PTR_ERR(ec_child));
+                       RETURN(PTR_ERR(ec_child));
+               }
+
+               child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+               if (!child) {
+                       CERROR("Can not locate the child %s\n", lname->ln_name);
+                       lu_object_put(env, ec_child);
                        RETURN(-EINVAL);
-                }
+               }
 
-                CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
-                       PFID(lu_object_fid(child)));
+               CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
+                      PFID(lu_object_fid(child)));
                rc = echo_attr_get_complex(env, lu2md(child), ma);
-                if (rc) {
-                        CERROR("Can not getattr child "DFID": rc = %d\n",
-                                PFID(lu_object_fid(child)), rc);
-                        lu_object_put(env, ec_child);
-                        break;
-                }
-                CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
-                       PFID(lu_object_fid(child)));
-                id++;
-                lu_object_put(env, ec_child);
-        }
+               if (rc) {
+                       CERROR("Can not getattr child "DFID": rc = %d\n",
+                               PFID(lu_object_fid(child)), rc);
+                       lu_object_put(env, ec_child);
+                       break;
+               }
+               CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
+                      PFID(lu_object_fid(child)));
+               id++;
+               lu_object_put(env, ec_child);
+       }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }
 
 static int echo_lookup_object(const struct lu_env *env,
-                              struct echo_device *ed,
-                              struct lu_object *ec_parent,
-                              __u64 id, int count)
-{
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct lu_fid           *fid = &info->eti_fid;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
-
-       if (ec_parent == NULL)
+                             struct echo_device *ed,
+                             struct lu_object *ec_parent,
+                             __u64 id, int count)
+{
+       struct lu_object *parent;
+       struct lu_object *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name *lname = &info->eti_lname;
+       char *name = info->eti_name;
+       struct lu_fid *fid = &info->eti_fid;
+       struct lu_device *ld = ed->ed_next;
+       int rc = 0;
+       int i;
+
+       if (!ec_parent)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-       if (parent == NULL)
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
                return -ENXIO;
 
-        /*prepare the requests*/
-        for (i = 0; i < count; i++) {
-                echo_md_build_name(lname, name, id);
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
 
-                CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
-                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+       /*prepare the requests*/
+       for (i = 0; i < count; i++) {
+               echo_md_build_name(lname, name, id);
 
-                rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
-                if (rc) {
-                        CERROR("Can not lookup child %s: rc = %d\n", name, rc);
-                        break;
-                }
-                CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
-                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+               CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
+                      PFID(lu_object_fid(new_parent)), lname->ln_name,
+                      new_parent);
 
-                id++;
-        }
-        return rc;
+               rc = mdo_lookup(env, lu2md(new_parent), lname, fid, NULL);
+               if (rc) {
+                       CERROR("Can not lookup child %s: rc = %d\n", name, rc);
+                       break;
+               }
+
+               CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
+                      PFID(lu_object_fid(new_parent)), lname->ln_name,
+                      new_parent);
+
+               id++;
+       }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
+       return rc;
 }
 
 static int echo_md_destroy_internal(const struct lu_env *env,
-                                    struct echo_device *ed,
-                                    struct md_object *parent,
-                                    struct lu_name *lname,
-                                    struct md_attr *ma)
+                                   struct echo_device *ed,
+                                   struct md_object *parent,
+                                   struct lu_name *lname,
+                                   struct md_attr *ma)
 {
-        struct lu_device   *ld = ed->ed_next;
-        struct lu_object   *ec_child;
-        struct lu_object   *child;
-        int                 rc;
+       struct lu_device   *ld = ed->ed_next;
+       struct lu_object   *ec_child;
+       struct lu_object   *child;
+       int                 rc;
 
        ENTRY;
 
-        ec_child = echo_md_lookup(env, ed, parent, lname);
-        if (IS_ERR(ec_child)) {
-                CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
-                        PTR_ERR(ec_child));
-                RETURN(PTR_ERR(ec_child));
-        }
+       ec_child = echo_md_lookup(env, ed, parent, lname);
+       if (IS_ERR(ec_child)) {
+               CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
+                       PTR_ERR(ec_child));
+               RETURN(PTR_ERR(ec_child));
+       }
 
-        child = lu_object_locate(ec_child->lo_header, ld->ld_type);
-        if (child == NULL) {
-                CERROR("Can not locate the child %s\n", lname->ln_name);
-                GOTO(out_put, rc = -EINVAL);
-        }
+       child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+       if (!child) {
+               CERROR("Can not locate the child %s\n", lname->ln_name);
+               GOTO(out_put, rc = -EINVAL);
+       }
 
        if (lu_object_remote(child)) {
                CERROR("Can not destroy remote object %s: rc = %d\n",
                       lname->ln_name, -EPERM);
                GOTO(out_put, rc = -EPERM);
        }
-        CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
-               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
+       CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
+              PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
        rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
        if (rc) {
@@ -1949,137 +2101,143 @@ static int echo_md_destroy_internal(const struct lu_env *env,
                        lname->ln_name, rc);
                GOTO(out_put, rc);
        }
-        CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
-               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
+       CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
+              PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 out_put:
-        lu_object_put(env, ec_child);
-        return rc;
+       lu_object_put(env, ec_child);
+       return rc;
 }
 
 static int echo_destroy_object(const struct lu_env *env,
-                               struct echo_device *ed,
-                               struct lu_object *ec_parent,
-                               char *name, int namelen,
-                               __u64 id, __u32 mode,
-                               int count)
-{
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        struct lu_object        *parent;
-        int                      rc = 0;
-        int                      i;
-        ENTRY;
-
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-        if (parent == NULL)
-                RETURN(-EINVAL);
-
-        memset(ma, 0, sizeof(*ma));
-        ma->ma_attr.la_mode = mode;
-        ma->ma_attr.la_valid = LA_CTIME;
-        ma->ma_attr.la_ctime = cfs_time_current_64();
-        ma->ma_need = MA_INODE;
-        ma->ma_valid = 0;
-
-        if (name != NULL) {
-                lname->ln_name = name;
-                lname->ln_namelen = namelen;
-                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
-                                              ma);
+                              struct echo_device *ed,
+                              struct lu_object *ec_parent,
+                              char *name, int namelen,
+                              __u64 id, __u32 mode,
+                              int count)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       struct md_attr          *ma = &info->eti_ma;
+       struct lu_device        *ld = ed->ed_next;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       int                      rc = 0;
+       int                      i;
+
+       ENTRY;
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
+               RETURN(-EINVAL);
+
+       rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
+                                      id, &new_parent);
+       if (rc != 0)
                RETURN(rc);
-        }
 
-        /*prepare the requests*/
-        for (i = 0; i < count; i++) {
-                char *tmp_name = info->eti_name;
+       memset(ma, 0, sizeof(*ma));
+       ma->ma_attr.la_mode = mode;
+       ma->ma_attr.la_valid = LA_CTIME;
+       ma->ma_attr.la_ctime = ktime_get_real_seconds();
+       ma->ma_need = MA_INODE;
+       ma->ma_valid = 0;
+
+       if (name) {
+               lname->ln_name = name;
+               lname->ln_namelen = namelen;
+               rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
+                                             ma);
+               GOTO(out_put, rc);
+       }
 
-                ma->ma_valid = 0;
-                echo_md_build_name(lname, tmp_name, id);
+       /*prepare the requests*/
+       for (i = 0; i < count; i++) {
+               char *tmp_name = info->eti_name;
+
+               ma->ma_valid = 0;
+               echo_md_build_name(lname, tmp_name, id);
+
+               rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
+                                             ma);
+               if (rc) {
+                       CERROR("Can not unlink child %s: rc = %d\n", name, rc);
+                       break;
+               }
+               id++;
+       }
 
-                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
-                                              ma);
-                if (rc) {
-                        CERROR("Can not unlink child %s: rc = %d\n", name, rc);
-                        break;
-                }
-                id++;
-        }
+out_put:
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }
 
 static struct lu_object *echo_resolve_path(const struct lu_env *env,
-                                           struct echo_device *ed, char *path,
-                                           int path_len)
-{
-        struct lu_device        *ld = ed->ed_next;
-        struct md_device        *md = lu2md_dev(ld);
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_fid           *fid = &info->eti_fid;
-        struct lu_name          *lname = &info->eti_lname;
-        struct lu_object        *parent = NULL;
-        struct lu_object        *child = NULL;
-        int rc = 0;
-        ENTRY;
-
-        /*Only support MDD layer right now*/
-        rc = md->md_ops->mdo_root_get(env, md, fid);
-        if (rc) {
-                CERROR("get root error: rc = %d\n", rc);
-                RETURN(ERR_PTR(rc));
-        }
-
-       /* In the function below, .hs_keycmp resolves to
-        * lu_obj_hop_keycmp() */
+                                          struct echo_device *ed, char *path,
+                                          int path_len)
+{
+       struct lu_device        *ld = ed->ed_next;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_fid           *fid = &info->eti_fid;
+       struct lu_name          *lname = &info->eti_lname;
+       struct lu_object        *parent = NULL;
+       struct lu_object        *child = NULL;
+       int                      rc = 0;
+
+       ENTRY;
+       *fid = ed->ed_root_fid;
+
+       /*
+        * In the function below, .hs_keycmp resolves to
+        * lu_obj_hop_keycmp()
+        */
        /* coverity[overrun-buffer-val] */
-        parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
-        if (IS_ERR(parent)) {
-                CERROR("Can not find the parent "DFID": rc = %ld\n",
-                        PFID(fid), PTR_ERR(parent));
-                RETURN(parent);
-        }
-
-        while (1) {
-                struct lu_object *ld_parent;
-                char *e;
-
-                e = strsep(&path, "/");
-                if (e == NULL)
-                        break;
-
-                if (e[0] == 0) {
-                        if (!path || path[0] == '\0')
-                                break;
-                        continue;
-                }
-
-                lname->ln_name = e;
-                lname->ln_namelen = strlen(e);
-
-                ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
-                if (ld_parent == NULL) {
-                        lu_object_put(env, parent);
-                        rc = -EINVAL;
-                        break;
-                }
-
-                child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
-                lu_object_put(env, parent);
-                if (IS_ERR(child)) {
-                        rc = (int)PTR_ERR(child);
-                        CERROR("lookup %s under parent "DFID": rc = %d\n",
-                                lname->ln_name, PFID(lu_object_fid(ld_parent)),
-                                rc);
-                        break;
-                }
-                parent = child;
-        }
-        if (rc)
-                RETURN(ERR_PTR(rc));
-
-        RETURN(parent);
+       parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
+       if (IS_ERR(parent)) {
+               CERROR("Can not find the parent "DFID": rc = %ld\n",
+                       PFID(fid), PTR_ERR(parent));
+               RETURN(parent);
+       }
+
+       while (1) {
+               struct lu_object *ld_parent;
+               char *e;
+
+               e = strsep(&path, "/");
+               if (!e)
+                       break;
+
+               if (e[0] == 0) {
+                       if (!path || path[0] == '\0')
+                               break;
+                       continue;
+               }
+
+               lname->ln_name = e;
+               lname->ln_namelen = strlen(e);
+
+               ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
+               if (!ld_parent) {
+                       lu_object_put(env, parent);
+                       rc = -EINVAL;
+                       break;
+               }
+
+               child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
+               lu_object_put(env, parent);
+               if (IS_ERR(child)) {
+                       rc = (int)PTR_ERR(child);
+                       CERROR("lookup %s under parent "DFID": rc = %d\n",
+                               lname->ln_name, PFID(lu_object_fid(ld_parent)),
+                               rc);
+                       break;
+               }
+               parent = child;
+       }
+       if (rc)
+               RETURN(ERR_PTR(rc));
+
+       RETURN(parent);
 }
 
 static void echo_ucred_init(struct lu_env *env)
@@ -2110,40 +2268,39 @@ static void echo_ucred_init(struct lu_env *env)
 static void echo_ucred_fini(struct lu_env *env)
 {
        struct lu_ucred *ucred = lu_ucred(env);
+
        ucred->uc_valid = UCRED_INIT;
 }
 
-#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
-#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
 static int echo_md_handler(struct echo_device *ed, int command,
                           char *path, int path_len, __u64 id, int count,
                           struct obd_ioctl_data *data)
 {
        struct echo_thread_info *info;
-        struct lu_device      *ld = ed->ed_next;
-        struct lu_env         *env;
-        int                    refcheck;
-        struct lu_object      *parent;
-        char                  *name = NULL;
-        int                    namelen = data->ioc_plen2;
-        int                    rc = 0;
-        ENTRY;
-
-        if (ld == NULL) {
-                CERROR("MD echo client is not being initialized properly\n");
-                RETURN(-EINVAL);
-        }
-
-        if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
-                CERROR("Only support MDD layer right now!\n");
-                RETURN(-EINVAL);
-        }
-
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
-
-        rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
+       struct lu_device *ld = ed->ed_next;
+       struct lu_env *env;
+       __u16 refcheck;
+       struct lu_object *parent;
+       char *name = NULL;
+       int namelen = data->ioc_plen2;
+       int rc = 0;
+
+       ENTRY;
+       if (!ld) {
+               CERROR("MD echo client is not being initialized properly\n");
+               RETURN(-EINVAL);
+       }
+
+       if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
+               CERROR("Only support MDD layer right now!\n");
+               RETURN(-EINVAL);
+       }
+
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_SES_TAG);
        if (rc != 0)
                GOTO(out_env, rc);
 
@@ -2151,479 +2308,385 @@ static int echo_md_handler(struct echo_device *ed, int command,
        info = echo_env_info(env);
        LASSERT(info->eti_big_lmm == NULL);
        OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
-       if (info->eti_big_lmm == NULL)
+       if (!info->eti_big_lmm)
                GOTO(out_env, rc = -ENOMEM);
        info->eti_big_lmmsize = MIN_MD_SIZE;
 
-        parent = echo_resolve_path(env, ed, path, path_len);
-        if (IS_ERR(parent)) {
-                CERROR("Can not resolve the path %s: rc = %ld\n", path,
-                        PTR_ERR(parent));
+       parent = echo_resolve_path(env, ed, path, path_len);
+       if (IS_ERR(parent)) {
+               CERROR("Can not resolve the path %s: rc = %ld\n", path,
+                       PTR_ERR(parent));
                GOTO(out_free, rc = PTR_ERR(parent));
-        }
+       }
 
-        if (namelen > 0) {
-                OBD_ALLOC(name, namelen + 1);
-                if (name == NULL)
+       if (namelen > 0) {
+               OBD_ALLOC(name, namelen + 1);
+               if (!name)
                        GOTO(out_put, rc = -ENOMEM);
                if (copy_from_user(name, data->ioc_pbuf2, namelen))
                        GOTO(out_name, rc = -EFAULT);
-        }
+       }
 
        echo_ucred_init(env);
 
-        switch (command) {
-        case ECHO_MD_CREATE:
-        case ECHO_MD_MKDIR: {
-                struct echo_thread_info *info = echo_env_info(env);
-                __u32 mode = data->ioc_obdo2.o_mode;
-                struct lu_fid *fid = &info->eti_fid;
-                int stripe_count = (int)data->ioc_obdo2.o_misc;
-                int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
+       switch (command) {
+       case ECHO_MD_CREATE:
+       case ECHO_MD_MKDIR: {
+               struct echo_thread_info *info = echo_env_info(env);
+               __u32 mode = data->ioc_obdo2.o_mode;
+               struct lu_fid *fid = &info->eti_fid;
+               int stripe_count = (int)data->ioc_obdo2.o_misc;
+               int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
 
                rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
                if (rc != 0)
                        break;
 
-               /* In the function below, .hs_keycmp resolves to
-                * lu_obj_hop_keycmp() */
+               /*
+                * In the function below, .hs_keycmp resolves to
+                * lu_obj_hop_keycmp()
+                */
                /* coverity[overrun-buffer-val] */
-                rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
-                                           id, mode, count, stripe_count,
-                                           stripe_index);
-                break;
-        }
-        case ECHO_MD_DESTROY:
-        case ECHO_MD_RMDIR: {
-                __u32 mode = data->ioc_obdo2.o_mode;
-
-                rc = echo_destroy_object(env, ed, parent, name, namelen,
-                                         id, mode, count);
-                break;
-        }
-        case ECHO_MD_LOOKUP:
-                rc = echo_lookup_object(env, ed, parent, id, count);
-                break;
-        case ECHO_MD_GETATTR:
-                rc = echo_getattr_object(env, ed, parent, id, count);
-                break;
-        case ECHO_MD_SETATTR:
-                rc = echo_setattr_object(env, ed, parent, id, count);
-                break;
-        default:
-                CERROR("unknown command %d\n", command);
-                rc = -EINVAL;
-                break;
-        }
+               rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
+                                          id, mode, count, stripe_count,
+                                          stripe_index);
+               break;
+       }
+       case ECHO_MD_DESTROY:
+       case ECHO_MD_RMDIR: {
+               __u32 mode = data->ioc_obdo2.o_mode;
+
+               rc = echo_destroy_object(env, ed, parent, name, namelen,
+                                        id, mode, count);
+               break;
+       }
+       case ECHO_MD_LOOKUP:
+               rc = echo_lookup_object(env, ed, parent, id, count);
+               break;
+       case ECHO_MD_GETATTR:
+               rc = echo_getattr_object(env, ed, parent, id, count);
+               break;
+       case ECHO_MD_SETATTR:
+               rc = echo_setattr_object(env, ed, parent, id, count);
+               break;
+       default:
+               CERROR("unknown command %d\n", command);
+               rc = -EINVAL;
+               break;
+       }
        echo_ucred_fini(env);
 
 out_name:
-        if (name != NULL)
-                OBD_FREE(name, namelen + 1);
+       if (name)
+               OBD_FREE(name, namelen + 1);
 out_put:
-        lu_object_put(env, parent);
+       lu_object_put(env, parent);
 out_free:
        LASSERT(info->eti_big_lmm);
        OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
        info->eti_big_lmm = NULL;
        info->eti_big_lmmsize = 0;
 out_env:
-        cl_env_put(env, &refcheck);
-        return rc;
+       cl_env_put(env, &refcheck);
+       return rc;
 }
 #endif /* HAVE_SERVER_SUPPORT */
 
 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
-                              int on_target, struct obdo *oa, void *ulsm,
-                              int ulsm_nob, struct obd_trans_info *oti)
-{
-        struct echo_object     *eco;
-        struct echo_client_obd *ec = ed->ed_ec;
-        struct lov_stripe_md   *lsm = NULL;
-        int                     rc;
-        int                     created = 0;
-        ENTRY;
-
-        if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
-            (on_target ||                       /* set_stripe */
-             ec->ec_nstripes != 0)) {           /* LOV */
-                CERROR ("No valid oid\n");
-                RETURN(-EINVAL);
-        }
-
-       rc = echo_alloc_memmd(ed, &lsm);
-        if (rc < 0) {
-                CERROR("Cannot allocate md: rc = %d\n", rc);
-                GOTO(failed, rc);
-        }
-
-        if (ulsm != NULL) {
-                int i, idx;
-
-                rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob);
-                if (rc != 0)
-                        GOTO(failed, rc);
-
-                if (lsm->lsm_stripe_count == 0)
-                        lsm->lsm_stripe_count = ec->ec_nstripes;
-
-                if (lsm->lsm_stripe_size == 0)
-                       lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
-
-                idx = cfs_rand();
-
-               /* setup stripes: indices + default ids if required */
-               for (i = 0; i < lsm->lsm_stripe_count; i++) {
-                       if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
-                               lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
-
-                       lsm->lsm_oinfo[i]->loi_ost_idx =
-                               (idx + i) % ec->ec_nstripes;
-               }
-        }
+                             struct obdo *oa)
+{
+       struct echo_object      *eco;
+       struct echo_client_obd  *ec = ed->ed_ec;
+       int created = 0;
+       int rc;
 
-       /* setup object ID here for !on_target and LOV hint */
-       if (oa->o_valid & OBD_MD_FLID) {
-               LASSERT(oa->o_valid & OBD_MD_FLGROUP);
-               lsm->lsm_oi = oa->o_oi;
+       ENTRY;
+       if (!(oa->o_valid & OBD_MD_FLID) ||
+           !(oa->o_valid & OBD_MD_FLGROUP) ||
+           !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
+               CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
+               RETURN(-EINVAL);
        }
 
-       if (ostid_id(&lsm->lsm_oi) == 0)
-               ostid_set_id(&lsm->lsm_oi, ++last_object_id);
-
-        rc = 0;
-       if (on_target) {
-               /* Only echo objects are allowed to be created */
-               LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
-                       (ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
-               rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
-               if (rc != 0) {
-                       CERROR("Cannot create objects: rc = %d\n", rc);
+       if (ostid_id(&oa->o_oi) == 0) {
+               rc = ostid_set_id(&oa->o_oi, ++last_object_id);
+               if (rc)
                        GOTO(failed, rc);
-               }
-               created = 1;
        }
 
-        /* See what object ID we were given */
-       oa->o_oi = lsm->lsm_oi;
-        oa->o_valid |= OBD_MD_FLID;
+       rc = obd_create(env, ec->ec_exp, oa);
+       if (rc != 0) {
+               CERROR("Cannot create objects: rc = %d\n", rc);
+               GOTO(failed, rc);
+       }
 
-        eco = cl_echo_object_find(ed, &lsm);
-        if (IS_ERR(eco))
-                GOTO(failed, rc = PTR_ERR(eco));
-        cl_echo_object_put(eco);
+       created = 1;
 
-        CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
-        EXIT;
+       oa->o_valid |= OBD_MD_FLID;
 
- failed:
-        if (created && rc)
-                obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL);
-        if (lsm)
-               echo_free_memmd(ed, &lsm);
-        if (rc)
-                CERROR("create object failed with: rc = %d\n", rc);
-        return (rc);
-}
+       eco = cl_echo_object_find(ed, &oa->o_oi);
+       if (IS_ERR(eco))
+               GOTO(failed, rc = PTR_ERR(eco));
+       cl_echo_object_put(eco);
 
-static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
-                           struct obdo *oa)
-{
-        struct lov_stripe_md   *lsm = NULL;
-        struct echo_object     *eco;
-        int                     rc;
-        ENTRY;
-
-        if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
-               /* disallow use of object id 0 */
-                CERROR ("No valid oid\n");
-                RETURN(-EINVAL);
-        }
-
-       rc = echo_alloc_memmd(ed, &lsm);
-        if (rc < 0)
-                RETURN(rc);
-
-       lsm->lsm_oi = oa->o_oi;
-       if (!(oa->o_valid & OBD_MD_FLGROUP))
-               ostid_set_seq_echo(&lsm->lsm_oi);
-
-        rc = 0;
-        eco = cl_echo_object_find(ed, &lsm);
-        if (!IS_ERR(eco))
-                *ecop = eco;
-        else
-                rc = PTR_ERR(eco);
-        if (lsm)
-               echo_free_memmd(ed, &lsm);
-        RETURN(rc);
-}
+       CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
+       EXIT;
 
-static void echo_put_object(struct echo_object *eco)
-{
-        if (cl_echo_object_put(eco))
-                CERROR("echo client: drop an object failed");
+failed:
+       if (created && rc != 0)
+               obd_destroy(env, ec->ec_exp, oa);
+
+       if (rc != 0)
+               CERROR("create object failed with: rc = %d\n", rc);
+
+       return rc;
 }
 
-static void
-echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
+static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
+                          struct obdo *oa)
 {
-        unsigned long stripe_count;
-        unsigned long stripe_size;
-        unsigned long width;
-        unsigned long woffset;
-        int           stripe_index;
-        obd_off       offset;
-
-        if (lsm->lsm_stripe_count <= 1)
-                return;
+       struct echo_object *eco;
+       int rc;
 
-        offset       = *offp;
-        stripe_size  = lsm->lsm_stripe_size;
-        stripe_count = lsm->lsm_stripe_count;
+       ENTRY;
+       if (!(oa->o_valid & OBD_MD_FLID) ||
+           !(oa->o_valid & OBD_MD_FLGROUP) ||
+           ostid_id(&oa->o_oi) == 0) {
+               CERROR("invalid oid "DOSTID"\n", POSTID(&oa->o_oi));
+               RETURN(-EINVAL);
+       }
 
-        /* width = # bytes in all stripes */
-        width = stripe_size * stripe_count;
+       rc = 0;
+       eco = cl_echo_object_find(ed, &oa->o_oi);
+       if (!IS_ERR(eco))
+               *ecop = eco;
+       else
+               rc = PTR_ERR(eco);
 
-        /* woffset = offset within a width; offset = whole number of widths */
-        woffset = do_div (offset, width);
+       RETURN(rc);
+}
 
-        stripe_index = woffset / stripe_size;
+static void echo_put_object(struct echo_object *eco)
+{
+       int rc;
 
-       *idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
-       *offp = offset * stripe_size + woffset % stripe_size;
+       rc = cl_echo_object_put(eco);
+       if (rc)
+               CERROR("%s: echo client drop an object failed: rc = %d\n",
+                      eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
 }
 
-static void
-echo_client_page_debug_setup(struct lov_stripe_md *lsm,
-                            struct page *page, int rw, obd_id id,
-                             obd_off offset, obd_off count)
+static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
+                                        u64 offset, u64 count)
 {
-        char    *addr;
-        obd_off  stripe_off;
-        obd_id   stripe_id;
-        int      delta;
+       char *addr;
+       u64 stripe_off;
+       u64 stripe_id;
+       int delta;
 
-        /* no partial pages on the client */
-       LASSERT(count == PAGE_CACHE_SIZE);
+       /* no partial pages on the client */
+       LASSERT(count == PAGE_SIZE);
 
        addr = kmap(page);
 
-       for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
-                if (rw == OBD_BRW_WRITE) {
-                        stripe_off = offset + delta;
-                        stripe_id = id;
-                        echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
-                } else {
-                        stripe_off = 0xdeadbeef00c0ffeeULL;
-                        stripe_id = 0xdeadbeef00c0ffeeULL;
-                }
-                block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
-                                  stripe_off, stripe_id);
-        }
+       for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+               if (rw == OBD_BRW_WRITE) {
+                       stripe_off = offset + delta;
+                       stripe_id = id;
+               } else {
+                       stripe_off = 0xdeadbeef00c0ffeeULL;
+                       stripe_id = 0xdeadbeef00c0ffeeULL;
+               }
+               block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
+                                 stripe_off, stripe_id);
+       }
 
        kunmap(page);
 }
 
-static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
-                                       struct page *page, obd_id id,
-                                        obd_off offset, obd_off count)
+static int
+echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
 {
-        obd_off stripe_off;
-        obd_id  stripe_id;
-        char   *addr;
-        int     delta;
-        int     rc;
-        int     rc2;
+       u64 stripe_off;
+       u64 stripe_id;
+       char *addr;
+       int delta;
+       int rc;
+       int rc2;
 
-        /* no partial pages on the client */
-       LASSERT(count == PAGE_CACHE_SIZE);
+       /* no partial pages on the client */
+       LASSERT(count == PAGE_SIZE);
 
        addr = kmap(page);
 
-       for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
-                stripe_off = offset + delta;
-                stripe_id = id;
-                echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
+       for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+               stripe_off = offset + delta;
+               stripe_id = id;
 
-                rc2 = block_debug_check("test_brw",
-                                        addr + delta, OBD_ECHO_BLOCK_SIZE,
-                                        stripe_off, stripe_id);
-                if (rc2 != 0) {
-                        CERROR ("Error in echo object "LPX64"\n", id);
-                        rc = rc2;
-                }
-        }
+               rc2 = block_debug_check("test_brw",
+                                       addr + delta, OBD_ECHO_BLOCK_SIZE,
+                                       stripe_off, stripe_id);
+               if (rc2 != 0) {
+                       CERROR("Error in echo object %#llx\n", id);
+                       rc = rc2;
+               }
+       }
 
        kunmap(page);
-        return rc;
+       return rc;
 }
 
 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
-                            struct echo_object *eco, obd_off offset,
-                            obd_size count, int async,
-                            struct obd_trans_info *oti)
-{
-        struct lov_stripe_md   *lsm = eco->eo_lsm;
-        obd_count               npages;
-        struct brw_page        *pga;
-        struct brw_page        *pgp;
-       struct page            **pages;
-        obd_off                 off;
-        int                     i;
-        int                     rc;
-        int                     verify;
-        int                     gfp_mask;
-        int                     brw_flags = 0;
-        ENTRY;
-
-        verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
-                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
-                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
-
-       gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
+                           struct echo_object *eco, u64 offset,
+                           u64 count, int async)
+{
+       size_t npages;
+       struct brw_page *pga;
+       struct brw_page *pgp;
+       struct page **pages;
+       u64 off;
+       size_t i;
+       int rc;
+       int verify;
+       gfp_t gfp_mask;
+       u32 brw_flags = 0;
+
+       ENTRY;
+       verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
+                 (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+
+       gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
 
        LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
-       LASSERT(lsm != NULL);
-       LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
 
-        if (count <= 0 ||
-            (count & (~CFS_PAGE_MASK)) != 0)
-                RETURN(-EINVAL);
+       if ((count & (~PAGE_MASK)) != 0)
+               RETURN(-EINVAL);
 
-        /* XXX think again with misaligned I/O */
-       npages = count >> PAGE_CACHE_SHIFT;
+       /* XXX think again with misaligned I/O */
+       npages = count >> PAGE_SHIFT;
 
-        if (rw == OBD_BRW_WRITE)
-                brw_flags = OBD_BRW_ASYNC;
+       if (rw == OBD_BRW_WRITE)
+               brw_flags = OBD_BRW_ASYNC;
 
-        OBD_ALLOC(pga, npages * sizeof(*pga));
-        if (pga == NULL)
-                RETURN(-ENOMEM);
+       OBD_ALLOC(pga, npages * sizeof(*pga));
+       if (!pga)
+               RETURN(-ENOMEM);
 
-        OBD_ALLOC(pages, npages * sizeof(*pages));
-        if (pages == NULL) {
-                OBD_FREE(pga, npages * sizeof(*pga));
-                RETURN(-ENOMEM);
-        }
+       OBD_ALLOC(pages, npages * sizeof(*pages));
+       if (!pages) {
+               OBD_FREE(pga, npages * sizeof(*pga));
+               RETURN(-ENOMEM);
+       }
 
-        for (i = 0, pgp = pga, off = offset;
-             i < npages;
-            i++, pgp++, off += PAGE_CACHE_SIZE) {
+       for (i = 0, pgp = pga, off = offset;
+            i < npages;
+            i++, pgp++, off += PAGE_SIZE) {
 
-                LASSERT (pgp->pg == NULL);      /* for cleanup */
+               LASSERT(pgp->pg == NULL);       /* for cleanup */
 
-                rc = -ENOMEM;
-                OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
-                if (pgp->pg == NULL)
-                        goto out;
+               rc = -ENOMEM;
+               pgp->pg = alloc_page(gfp_mask);
+               if (!pgp->pg)
+                       goto out;
 
-                pages[i] = pgp->pg;
-               pgp->count = PAGE_CACHE_SIZE;
-                pgp->off = off;
-                pgp->flag = brw_flags;
+               pages[i] = pgp->pg;
+               pgp->count = PAGE_SIZE;
+               pgp->off = off;
+               pgp->flag = brw_flags;
 
                if (verify)
-                       echo_client_page_debug_setup(lsm, pgp->pg, rw,
+                       echo_client_page_debug_setup(pgp->pg, rw,
                                                     ostid_id(&oa->o_oi), off,
                                                     pgp->count);
-        }
+       }
 
-        /* brw mode can only be used at client */
-        LASSERT(ed->ed_next != NULL);
-        rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
+       /* brw mode can only be used at client */
+       LASSERT(ed->ed_next != NULL);
+       rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
 
  out:
-        if (rc != 0 || rw != OBD_BRW_READ)
-                verify = 0;
+       if (rc != 0 || rw != OBD_BRW_READ)
+               verify = 0;
 
-        for (i = 0, pgp = pga; i < npages; i++, pgp++) {
-                if (pgp->pg == NULL)
-                        continue;
+       for (i = 0, pgp = pga; i < npages; i++, pgp++) {
+               if (!pgp->pg)
+                       continue;
 
                if (verify) {
                        int vrc;
-                       vrc = echo_client_page_debug_check(lsm, pgp->pg,
+
+                       vrc = echo_client_page_debug_check(pgp->pg,
                                                           ostid_id(&oa->o_oi),
-                                                          pgp->off, pgp->count);
+                                                          pgp->off,
+                                                          pgp->count);
                        if (vrc != 0 && rc == 0)
                                rc = vrc;
                }
-               OBD_PAGE_FREE(pgp->pg);
-        }
-        OBD_FREE(pga, npages * sizeof(*pga));
-        OBD_FREE(pages, npages * sizeof(*pages));
-        RETURN(rc);
+               __free_page(pgp->pg);
+       }
+       OBD_FREE(pga, npages * sizeof(*pga));
+       OBD_FREE(pages, npages * sizeof(*pages));
+       RETURN(rc);
 }
 
 static int echo_client_prep_commit(const struct lu_env *env,
                                   struct obd_export *exp, int rw,
                                   struct obdo *oa, struct echo_object *eco,
-                                  obd_off offset, obd_size count,
-                                  obd_size batch, struct obd_trans_info *oti,
-                                  int async)
-{
-        struct lov_stripe_md *lsm = eco->eo_lsm;
-        struct obd_ioobj ioo;
-        struct niobuf_local *lnb;
-        struct niobuf_remote *rnb;
-        obd_off off;
-        obd_size npages, tot_pages;
+                                  u64 offset, u64 count,
+                                  u64 batch, int async)
+{
+       struct obd_ioobj ioo;
+       struct niobuf_local *lnb;
+       struct niobuf_remote rnb;
+       u64 off;
+       u64 npages, tot_pages, apc;
        int i, ret = 0, brw_flags = 0;
 
-        ENTRY;
-
-       if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
-           (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
+       ENTRY;
+       if (count <= 0 || (count & ~PAGE_MASK) != 0)
                RETURN(-EINVAL);
 
-       npages = batch >> PAGE_CACHE_SHIFT;
-       tot_pages = count >> PAGE_CACHE_SHIFT;
+       apc = npages = batch >> PAGE_SHIFT;
+       tot_pages = count >> PAGE_SHIFT;
 
-        OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
-        OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
-
-        if (lnb == NULL || rnb == NULL)
-                GOTO(out, ret = -ENOMEM);
+       OBD_ALLOC_LARGE(lnb, apc * sizeof(struct niobuf_local));
+       if (!lnb)
+               RETURN(-ENOMEM);
 
        if (rw == OBD_BRW_WRITE && async)
                brw_flags |= OBD_BRW_ASYNC;
 
-        obdo_to_ioobj(oa, &ioo);
-
-        off = offset;
+       obdo_to_ioobj(oa, &ioo);
 
-        for(; tot_pages; tot_pages -= npages) {
-                int lpages;
+       off = offset;
 
-                if (tot_pages < npages)
-                        npages = tot_pages;
+       for (; tot_pages > 0; tot_pages -= npages) {
+               int lpages;
 
-               for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
-                        rnb[i].offset = off;
-                       rnb[i].len = PAGE_CACHE_SIZE;
-                       rnb[i].flags = brw_flags;
-                }
+               if (tot_pages < npages)
+                       npages = tot_pages;
 
-                ioo.ioo_bufcnt = npages;
-                oti->oti_transno = 0;
+               rnb.rnb_offset = off;
+               rnb.rnb_len = npages * PAGE_SIZE;
+               rnb.rnb_flags = brw_flags;
+               ioo.ioo_bufcnt = 1;
+               off += npages * PAGE_SIZE;
 
-                lpages = npages;
-               ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
-                                 lnb, oti, NULL);
-                if (ret != 0)
-                        GOTO(out, ret);
-                LASSERT(lpages == npages);
+               lpages = npages;
+               ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
+               if (ret != 0)
+                       GOTO(out, ret);
 
-                for (i = 0; i < lpages; i++) {
-                       struct page *page = lnb[i].page;
+               for (i = 0; i < lpages; i++) {
+                       struct page *page = lnb[i].lnb_page;
 
-                        /* read past eof? */
-                        if (page == NULL && lnb[i].rc == 0)
-                                continue;
+                       /* read past eof? */
+                       if (!page && lnb[i].lnb_rc == 0)
+                               continue;
 
-                        if (async)
-                                lnb[i].flags |= OBD_BRW_ASYNC;
+                       if (async)
+                               lnb[i].lnb_flags |= OBD_BRW_ASYNC;
 
                        if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
                            (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
@@ -2631,209 +2694,153 @@ static int echo_client_prep_commit(const struct lu_env *env,
                                continue;
 
                        if (rw == OBD_BRW_WRITE)
-                               echo_client_page_debug_setup(lsm, page, rw,
-                                                           ostid_id(&oa->o_oi),
-                                                            rnb[i].offset,
-                                                            rnb[i].len);
+                               echo_client_page_debug_setup(page, rw,
+                                                       ostid_id(&oa->o_oi),
+                                                       lnb[i].lnb_file_offset,
+                                                       lnb[i].lnb_len);
                        else
-                               echo_client_page_debug_check(lsm, page,
-                                                           ostid_id(&oa->o_oi),
-                                                            rnb[i].offset,
-                                                            rnb[i].len);
+                               echo_client_page_debug_check(page,
+                                                       ostid_id(&oa->o_oi),
+                                                       lnb[i].lnb_file_offset,
+                                                       lnb[i].lnb_len);
                }
 
-               ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
-                                  rnb, npages, lnb, oti, ret);
-                if (ret != 0)
-                        GOTO(out, ret);
-
-                /* Reset oti otherwise it would confuse ldiskfs. */
-                memset(oti, 0, sizeof(*oti));
+               ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
+                                  ret);
+               if (ret != 0)
+                       break;
 
                /* Reuse env context. */
                lu_context_exit((struct lu_context *)&env->le_ctx);
                lu_context_enter((struct lu_context *)&env->le_ctx);
-        }
+       }
 
 out:
-        if (lnb)
-                OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
-        if (rnb)
-                OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
-        RETURN(ret);
+       OBD_FREE_LARGE(lnb, apc * sizeof(struct niobuf_local));
+
+       RETURN(ret);
 }
 
 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
                                 struct obd_export *exp,
-                                struct obd_ioctl_data *data,
-                                struct obd_trans_info *dummy_oti)
+                                struct obd_ioctl_data *data)
 {
-        struct obd_device *obd = class_exp2obd(exp);
-        struct echo_device *ed = obd2echo_dev(obd);
-        struct echo_client_obd *ec = ed->ed_ec;
-        struct obdo *oa = &data->ioc_obdo1;
-        struct echo_object *eco;
-        int rc;
-        int async = 1;
-        long test_mode;
-        ENTRY;
+       struct obd_device *obd = class_exp2obd(exp);
+       struct echo_device *ed = obd2echo_dev(obd);
+       struct echo_client_obd *ec = ed->ed_ec;
+       struct obdo *oa = &data->ioc_obdo1;
+       struct echo_object *eco;
+       int rc;
+       int async = 0;
+       long test_mode;
 
-        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+       ENTRY;
+       LASSERT(oa->o_valid & OBD_MD_FLGROUP);
 
-        rc = echo_get_object(&eco, ed, oa);
-        if (rc)
-                RETURN(rc);
+       rc = echo_get_object(&eco, ed, oa);
+       if (rc)
+               RETURN(rc);
 
-        oa->o_valid &= ~OBD_MD_FLHANDLE;
+       oa->o_valid &= ~OBD_MD_FLHANDLE;
 
        /* OFD/obdfilter works only via prep/commit */
-        test_mode = (long)data->ioc_pbuf1;
-        if (test_mode == 1)
-                async = 0;
-
-        if (ed->ed_next == NULL && test_mode != 3) {
-                test_mode = 3;
-                data->ioc_plen1 = data->ioc_count;
-        }
-
-        /* Truncate batch size to maximum */
-        if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
-                data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
-
-        switch (test_mode) {
-        case 1:
-                /* fall through */
-        case 2:
-                rc = echo_client_kbrw(ed, rw, oa,
-                                      eco, data->ioc_offset,
-                                     data->ioc_count, async, dummy_oti);
-                break;
-        case 3:
-               rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
-                                            eco, data->ioc_offset,
-                                            data->ioc_count, data->ioc_plen1,
-                                            dummy_oti, async);
-                break;
-        default:
-                rc = -EINVAL;
-        }
-        echo_put_object(eco);
-        RETURN(rc);
-}
-
-static int
-echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
-                    int mode, obd_off offset, obd_size nob)
-{
-        struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
-        struct lustre_handle   *ulh = &oa->o_handle;
-        struct echo_object     *eco;
-        obd_off                 end;
-        int                     rc;
-        ENTRY;
-
-        if (ed->ed_next == NULL)
-                RETURN(-EOPNOTSUPP);
-
-        if (!(mode == LCK_PR || mode == LCK_PW))
-                RETURN(-EINVAL);
-
-        if ((offset & (~CFS_PAGE_MASK)) != 0 ||
-            (nob & (~CFS_PAGE_MASK)) != 0)
-                RETURN(-EINVAL);
-
-        rc = echo_get_object (&eco, ed, oa);
-        if (rc != 0)
-                RETURN(rc);
-
-        end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
-        rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
-        if (rc == 0) {
-                oa->o_valid |= OBD_MD_FLHANDLE;
-                CDEBUG(D_INFO, "Cookie is "LPX64"\n", ulh->cookie);
-        }
-        echo_put_object(eco);
-        RETURN(rc);
-}
+       test_mode = (long)data->ioc_pbuf1;
+       if (!ed->ed_next && test_mode != 3) {
+               test_mode = 3;
+               data->ioc_plen1 = data->ioc_count;
+       }
 
-static int
-echo_client_cancel(struct obd_export *exp, struct obdo *oa)
-{
-        struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
-        __u64               cookie = oa->o_handle.cookie;
+       if (test_mode == 3)
+               async = 1;
+
+       /* Truncate batch size to maximum */
+       if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
+               data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
+
+       switch (test_mode) {
+       case 1:
+               /* fall through */
+       case 2:
+               rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
+                                     data->ioc_count, async);
+               break;
+       case 3:
+               rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, eco,
+                                            data->ioc_offset, data->ioc_count,
+                                            data->ioc_plen1, async);
+               break;
+       default:
+               rc = -EINVAL;
+       }
 
-        if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
-                return -EINVAL;
+       echo_put_object(eco);
 
-        CDEBUG(D_INFO, "Cookie is "LPX64"\n", cookie);
-        return cl_echo_cancel(ed, cookie);
+       RETURN(rc);
 }
 
 static int
 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
-                      void *karg, void *uarg)
+                     void *karg, void __user *uarg)
 {
 #ifdef HAVE_SERVER_SUPPORT
        struct tgt_session_info *tsi;
 #endif
-        struct obd_device      *obd = exp->exp_obd;
-        struct echo_device     *ed = obd2echo_dev(obd);
-        struct echo_client_obd *ec = ed->ed_ec;
-        struct echo_object     *eco;
-        struct obd_ioctl_data  *data = karg;
-        struct obd_trans_info   dummy_oti;
-        struct lu_env          *env;
-        struct oti_req_ack_lock *ack_lock;
-        struct obdo            *oa;
-        struct lu_fid           fid;
-        int                     rw = OBD_BRW_READ;
-        int                     rc = 0;
-        int                     i;
-#ifdef HAVE_SERVER_SUPPORT
-       struct lu_context        echo_session;
-#endif
-        ENTRY;
-
-        memset(&dummy_oti, 0, sizeof(dummy_oti));
+       struct obd_device      *obd = exp->exp_obd;
+       struct echo_device     *ed = obd2echo_dev(obd);
+       struct echo_client_obd *ec = ed->ed_ec;
+       struct echo_object     *eco;
+       struct obd_ioctl_data  *data = karg;
+       struct lu_env          *env;
+       unsigned long           env_tags = 0;
+       __u16                   refcheck;
+       struct obdo            *oa;
+       struct lu_fid           fid;
+       int                     rw = OBD_BRW_READ;
+       int                     rc = 0;
 
+       ENTRY;
        oa = &data->ioc_obdo1;
        if (!(oa->o_valid & OBD_MD_FLGROUP)) {
                oa->o_valid |= OBD_MD_FLGROUP;
                ostid_set_seq_echo(&oa->o_oi);
        }
 
-        /* This FID is unpacked just for validation at this point */
-        rc = ostid_to_fid(&fid, &oa->o_oi, 0);
-        if (rc < 0)
-                RETURN(rc);
+       /* This FID is unpacked just for validation at this point */
+       rc = ostid_to_fid(&fid, &oa->o_oi, 0);
+       if (rc < 0)
+               RETURN(rc);
 
-        OBD_ALLOC_PTR(env);
-        if (env == NULL)
-                RETURN(-ENOMEM);
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
 
-       rc = lu_env_init(env, LCT_DT_THREAD);
-       if (rc)
-               GOTO(out_alloc, rc = -ENOMEM);
+       lu_env_add(env);
 
 #ifdef HAVE_SERVER_SUPPORT
-       env->le_ses = &echo_session;
-       rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
-       if (unlikely(rc < 0))
-               GOTO(out_env, rc);
-       lu_context_enter(env->le_ses);
+       if (cmd == OBD_IOC_ECHO_MD || cmd == OBD_IOC_ECHO_ALLOC_SEQ)
+               env_tags = ECHO_MD_CTX_TAG;
+       else
+#endif
+               env_tags = ECHO_DT_CTX_TAG;
+
+       rc = lu_env_refill_by_tags(env, env_tags, ECHO_SES_TAG);
+       if (rc != 0)
+               GOTO(out, rc);
 
+#ifdef HAVE_SERVER_SUPPORT
        tsi = tgt_ses_info(env);
-       tsi->tsi_exp = ec->ec_exp;
+       /* treat as local operation */
+       tsi->tsi_exp = NULL;
        tsi->tsi_jobid = NULL;
 #endif
-        switch (cmd) {
-        case OBD_IOC_CREATE:                    /* may create echo object */
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
 
-                rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
-                                        data->ioc_plen1, &dummy_oti);
-                GOTO(out, rc);
+       switch (cmd) {
+       case OBD_IOC_CREATE:                    /* may create echo object */
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
+
+               rc = echo_create_object(env, ed, oa);
+               GOTO(out, rc);
 
 #ifdef HAVE_SERVER_SUPPORT
        case OBD_IOC_ECHO_MD: {
@@ -2852,7 +2859,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                id = data->ioc_obdo2.o_oi.oi.oi_id;
                dirlen = data->ioc_plen1;
                OBD_ALLOC(dir, dirlen + 1);
-               if (dir == NULL)
+               if (!dir)
                        GOTO(out, rc = -ENOMEM);
 
                if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
@@ -2864,386 +2871,291 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                OBD_FREE(dir, dirlen + 1);
                GOTO(out, rc);
        }
-        case OBD_IOC_ECHO_ALLOC_SEQ: {
-                struct lu_env   *cl_env;
-                int              refcheck;
-                __u64            seq;
-                int              max_count;
-
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO(out, rc = -EPERM);
-
-                cl_env = cl_env_get(&refcheck);
-                if (IS_ERR(cl_env))
-                        GOTO(out, rc = PTR_ERR(cl_env));
-
-                rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
-                                            ECHO_MD_SES_TAG);
-                if (rc != 0) {
-                        cl_env_put(cl_env, &refcheck);
-                        GOTO(out, rc);
-                }
-
-                rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
-                cl_env_put(cl_env, &refcheck);
-                if (rc < 0) {
-                        CERROR("%s: Can not alloc seq: rc = %d\n",
-                               obd->obd_name, rc);
-                        GOTO(out, rc);
-                }
+       case OBD_IOC_ECHO_ALLOC_SEQ: {
+               __u64            seq;
+               int              max_count;
+
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
+
+               rc = seq_client_get_seq(env, ed->ed_cl_seq, &seq);
+               if (rc < 0) {
+                       CERROR("%s: Can not alloc seq: rc = %d\n",
+                              obd->obd_name, rc);
+                       GOTO(out, rc);
+               }
 
                if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
-                        return -EFAULT;
+                       return -EFAULT;
 
                max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
                if (copy_to_user(data->ioc_pbuf2, &max_count,
                                     data->ioc_plen2))
                        return -EFAULT;
                GOTO(out, rc);
-        }
+       }
 #endif /* HAVE_SERVER_SUPPORT */
-        case OBD_IOC_DESTROY:
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
-
-                rc = echo_get_object(&eco, ed, oa);
-                if (rc == 0) {
-                        rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
-                                         &dummy_oti, NULL, NULL);
-                        if (rc == 0)
-                                eco->eo_deleted = 1;
-                        echo_put_object(eco);
-                }
-                GOTO(out, rc);
-
-        case OBD_IOC_GETATTR:
-                rc = echo_get_object(&eco, ed, oa);
-                if (rc == 0) {
-                        struct obd_info oinfo = { { { 0 } } };
-                        oinfo.oi_md = eco->eo_lsm;
-                        oinfo.oi_oa = oa;
-                        rc = obd_getattr(env, ec->ec_exp, &oinfo);
-                        echo_put_object(eco);
-                }
-                GOTO(out, rc);
-
-        case OBD_IOC_SETATTR:
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
-
-                rc = echo_get_object(&eco, ed, oa);
-                if (rc == 0) {
-                        struct obd_info oinfo = { { { 0 } } };
-                        oinfo.oi_oa = oa;
-                        oinfo.oi_md = eco->eo_lsm;
-
-                        rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
-                        echo_put_object(eco);
-                }
-                GOTO(out, rc);
-
-        case OBD_IOC_BRW_WRITE:
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
-
-                rw = OBD_BRW_WRITE;
-                /* fall through */
-        case OBD_IOC_BRW_READ:
-               rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
-                GOTO(out, rc);
-
-        case ECHO_IOC_GET_STRIPE:
-                rc = echo_get_object(&eco, ed, oa);
-                if (rc == 0) {
-                        rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
-                                              data->ioc_plen1);
-                        echo_put_object(eco);
-                }
-                GOTO(out, rc);
-
-        case ECHO_IOC_SET_STRIPE:
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
-
-                if (data->ioc_pbuf1 == NULL) {  /* unset */
-                        rc = echo_get_object(&eco, ed, oa);
-                        if (rc == 0) {
-                                eco->eo_deleted = 1;
-                                echo_put_object(eco);
-                        }
-                } else {
-                        rc = echo_create_object(env, ed, 0, oa,
-                                                data->ioc_pbuf1,
-                                                data->ioc_plen1, &dummy_oti);
-                }
-                GOTO (out, rc);
-
-        case ECHO_IOC_ENQUEUE:
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
-
-                rc = echo_client_enqueue(exp, oa,
-                                         data->ioc_conn1, /* lock mode */
-                                         data->ioc_offset,
-                                         data->ioc_count);/*extent*/
-                GOTO (out, rc);
-
-        case ECHO_IOC_CANCEL:
-                rc = echo_client_cancel(exp, oa);
-                GOTO (out, rc);
-
-        default:
-                CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
-                GOTO (out, rc = -ENOTTY);
-        }
-
-        EXIT;
-out:
-#ifdef HAVE_SERVER_SUPPORT
-       lu_context_exit(env->le_ses);
-       lu_context_fini(env->le_ses);
-out_env:
-#endif
-        lu_env_fini(env);
-out_alloc:
-        OBD_FREE_PTR(env);
+       case OBD_IOC_DESTROY:
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
 
-        /* XXX this should be in a helper also called by target_send_reply */
-        for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
-             i++, ack_lock++) {
-                if (!ack_lock->mode)
-                        break;
-                ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
-        }
+               rc = echo_get_object(&eco, ed, oa);
+               if (rc == 0) {
+                       rc = obd_destroy(env, ec->ec_exp, oa);
+                       if (rc == 0)
+                               eco->eo_deleted = 1;
+                       echo_put_object(eco);
+               }
+               GOTO(out, rc);
 
-        return rc;
+       case OBD_IOC_GETATTR:
+               rc = echo_get_object(&eco, ed, oa);
+               if (rc == 0) {
+                       rc = obd_getattr(env, ec->ec_exp, oa);
+                       echo_put_object(eco);
+               }
+               GOTO(out, rc);
+
+       case OBD_IOC_SETATTR:
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
+
+               rc = echo_get_object(&eco, ed, oa);
+               if (rc == 0) {
+                       rc = obd_setattr(env, ec->ec_exp, oa);
+                       echo_put_object(eco);
+               }
+               GOTO(out, rc);
+
+       case OBD_IOC_BRW_WRITE:
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
+
+               rw = OBD_BRW_WRITE;
+               /* fall through */
+       case OBD_IOC_BRW_READ:
+               rc = echo_client_brw_ioctl(env, rw, exp, data);
+               GOTO(out, rc);
+
+       default:
+               CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
+               GOTO(out, rc = -ENOTTY);
+       }
+
+       EXIT;
+out:
+       lu_env_remove(env);
+       cl_env_put(env, &refcheck);
+
+       return rc;
 }
 
 static int echo_client_setup(const struct lu_env *env,
-                             struct obd_device *obddev, struct lustre_cfg *lcfg)
-{
-        struct echo_client_obd *ec = &obddev->u.echo_client;
-        struct obd_device *tgt;
-        struct obd_uuid echo_uuid = { "ECHO_UUID" };
-        struct obd_connect_data *ocd = NULL;
-        int rc;
-        ENTRY;
-
-        if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
-                CERROR("requires a TARGET OBD name\n");
-                RETURN(-EINVAL);
-        }
-
-        tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
-        if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
-                CERROR("device not attached or not set up (%s)\n",
-                       lustre_cfg_string(lcfg, 1));
-                RETURN(-EINVAL);
-        }
+                            struct obd_device *obddev, struct lustre_cfg *lcfg)
+{
+       struct echo_client_obd *ec = &obddev->u.echo_client;
+       struct obd_device *tgt;
+       struct obd_uuid echo_uuid = { "ECHO_UUID" };
+       struct obd_connect_data *ocd = NULL;
+       int rc;
+
+       ENTRY;
+       if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
+               CERROR("requires a TARGET OBD name\n");
+               RETURN(-EINVAL);
+       }
+
+       tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
+       if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
+               CERROR("device not attached or not set up (%s)\n",
+                      lustre_cfg_string(lcfg, 1));
+               RETURN(-EINVAL);
+       }
 
        spin_lock_init(&ec->ec_lock);
-        CFS_INIT_LIST_HEAD (&ec->ec_objects);
-        CFS_INIT_LIST_HEAD (&ec->ec_locks);
-        ec->ec_unique = 0;
-        ec->ec_nstripes = 0;
+       INIT_LIST_HEAD(&ec->ec_objects);
+       INIT_LIST_HEAD(&ec->ec_locks);
+       ec->ec_unique = 0;
+
+       lu_context_tags_update(ECHO_DT_CTX_TAG);
+       lu_session_tags_update(ECHO_SES_TAG);
 
        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
 #ifdef HAVE_SERVER_SUPPORT
                lu_context_tags_update(ECHO_MD_CTX_TAG);
-               lu_session_tags_update(ECHO_MD_SES_TAG);
 #else
-               CERROR("Local operations are NOT supported on client side. "
-                      "Only remote operations are supported. Metadata client "
-                      "must be run on server side.\n");
+               CERROR(
+                      "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
 #endif
                RETURN(0);
        }
 
-        OBD_ALLOC(ocd, sizeof(*ocd));
-        if (ocd == NULL) {
-                CERROR("Can't alloc ocd connecting to %s\n",
-                       lustre_cfg_string(lcfg, 1));
-                return -ENOMEM;
-        }
+       OBD_ALLOC(ocd, sizeof(*ocd));
+       if (!ocd) {
+               CERROR("Can't alloc ocd connecting to %s\n",
+                      lustre_cfg_string(lcfg, 1));
+               return -ENOMEM;
+       }
 
-        ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
+       ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
                                 OBD_CONNECT_BRW_SIZE |
-                                 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
+                                OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
                                 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
                                 OBD_CONNECT_FID;
        ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
-        ocd->ocd_version = LUSTRE_VERSION_CODE;
-        ocd->ocd_group = FID_SEQ_ECHO;
+       ocd->ocd_version = LUSTRE_VERSION_CODE;
+       ocd->ocd_group = FID_SEQ_ECHO;
 
-        rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
-        if (rc == 0) {
-                /* Turn off pinger because it connects to tgt obd directly. */
+       rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
+       if (rc == 0) {
+               /* Turn off pinger because it connects to tgt obd directly. */
                spin_lock(&tgt->obd_dev_lock);
-               cfs_list_del_init(&ec->ec_exp->exp_obd_chain_timed);
+               list_del_init(&ec->ec_exp->exp_obd_chain_timed);
                spin_unlock(&tgt->obd_dev_lock);
-        }
+       }
 
-        OBD_FREE(ocd, sizeof(*ocd));
+       OBD_FREE(ocd, sizeof(*ocd));
 
-        if (rc != 0) {
-                CERROR("fail to connect to device %s\n",
-                       lustre_cfg_string(lcfg, 1));
-                return (rc);
-        }
+       if (rc != 0) {
+               CERROR("fail to connect to device %s\n",
+                      lustre_cfg_string(lcfg, 1));
+               return rc;
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int echo_client_cleanup(struct obd_device *obddev)
 {
-        struct echo_device *ed = obd2echo_dev(obddev);
-        struct echo_client_obd *ec = &obddev->u.echo_client;
-        int rc;
-        ENTRY;
+       struct echo_device *ed = obd2echo_dev(obddev);
+       struct echo_client_obd *ec = &obddev->u.echo_client;
+       int rc;
 
-        /*Do nothing for Metadata echo client*/
-        if (ed == NULL )
-                RETURN(0);
+       ENTRY;
+       /*Do nothing for Metadata echo client*/
+       if (!ed)
+               RETURN(0);
 
-        if (ed->ed_next_ismd) {
+       lu_session_tags_clear(ECHO_SES_TAG & ~LCT_SESSION);
+       lu_context_tags_clear(ECHO_DT_CTX_TAG);
+       if (ed->ed_next_ismd) {
 #ifdef HAVE_SERVER_SUPPORT
                lu_context_tags_clear(ECHO_MD_CTX_TAG);
-               lu_session_tags_clear(ECHO_MD_SES_TAG);
 #else
-               CERROR("This is client-side only module, does not support "
-                       "metadata echo client.\n");
+               CERROR(
+                      "This is client-side only module, does not support metadata echo client.\n");
 #endif
-                RETURN(0);
-        }
+               RETURN(0);
+       }
 
-        if (!cfs_list_empty(&obddev->obd_exports)) {
-                CERROR("still has clients!\n");
-                RETURN(-EBUSY);
-        }
+       if (!list_empty(&obddev->obd_exports)) {
+               CERROR("still has clients!\n");
+               RETURN(-EBUSY);
+       }
 
        LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
-        rc = obd_disconnect(ec->ec_exp);
-        if (rc != 0)
-                CERROR("fail to disconnect device: %d\n", rc);
+       rc = obd_disconnect(ec->ec_exp);
+       if (rc != 0)
+               CERROR("fail to disconnect device: %d\n", rc);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int echo_client_connect(const struct lu_env *env,
-                               struct obd_export **exp,
-                               struct obd_device *src, struct obd_uuid *cluuid,
-                               struct obd_connect_data *data, void *localdata)
+                              struct obd_export **exp,
+                              struct obd_device *src, struct obd_uuid *cluuid,
+                              struct obd_connect_data *data, void *localdata)
 {
-        int                rc;
-        struct lustre_handle conn = { 0 };
+       int rc;
+       struct lustre_handle conn = { 0 };
 
-        ENTRY;
-        rc = class_connect(&conn, src, cluuid);
-        if (rc == 0) {
-                *exp = class_conn2export(&conn);
-        }
+       ENTRY;
+       rc = class_connect(&conn, src, cluuid);
+       if (rc == 0)
+               *exp = class_conn2export(&conn);
 
-        RETURN (rc);
+       RETURN(rc);
 }
 
 static int echo_client_disconnect(struct obd_export *exp)
 {
-        int                     rc;
-        ENTRY;
+       int rc;
 
-        if (exp == NULL)
-                GOTO(out, rc = -EINVAL);
+       ENTRY;
+       if (!exp)
+               GOTO(out, rc = -EINVAL);
 
-        rc = class_disconnect(exp);
-        GOTO(out, rc);
- out:
-        return rc;
+       rc = class_disconnect(exp);
+       GOTO(out, rc);
+out:
+       return rc;
 }
 
 static struct obd_ops echo_client_obd_ops = {
-        .o_owner       = THIS_MODULE,
-        .o_iocontrol   = echo_client_iocontrol,
-        .o_connect     = echo_client_connect,
-        .o_disconnect  = echo_client_disconnect
+       .o_owner       = THIS_MODULE,
+       .o_iocontrol   = echo_client_iocontrol,
+       .o_connect     = echo_client_connect,
+       .o_disconnect  = echo_client_disconnect
 };
 
-int echo_client_init(void)
-{
-        int rc;
-
-       rc = lu_kmem_init(echo_caches);
-       if (rc == 0) {
-               rc = class_register_type(&echo_client_obd_ops, NULL, NULL,
-#ifndef HAVE_ONLY_PROCFS_SEQ
-                                       NULL,
-#endif
-                                       LUSTRE_ECHO_CLIENT_NAME,
-                                       &echo_device_type);
-               if (rc)
-                       lu_kmem_fini(echo_caches);
-       }
-       return rc;
-}
-
-void echo_client_exit(void)
-{
-        class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
-        lu_kmem_fini(echo_caches);
-}
-
-#ifdef __KERNEL__
 static int __init obdecho_init(void)
 {
-        int rc;
+       int rc;
 
-        ENTRY;
-        LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
+       ENTRY;
+       LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
 
-       LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
+       LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
 
 # ifdef HAVE_SERVER_SUPPORT
-        rc = echo_persistent_pages_init();
-        if (rc != 0)
-                goto failed_0;
+       rc = echo_persistent_pages_init();
+       if (rc != 0)
+               goto failed_0;
 
-       rc = class_register_type(&echo_obd_ops, NULL, NULL,
-#ifndef HAVE_ONLY_PROCFS_SEQ
-                               NULL,
-#endif
-                               LUSTRE_ECHO_NAME, NULL);
+       rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
+                                LUSTRE_ECHO_NAME, &echo_srv_type);
        if (rc != 0)
                goto failed_1;
 # endif
 
-        rc = echo_client_init();
+       rc = lu_kmem_init(echo_caches);
+       if (rc == 0) {
+               rc = class_register_type(&echo_client_obd_ops, NULL, false,
+                                        NULL, LUSTRE_ECHO_CLIENT_NAME,
+                                        &echo_device_type);
+               if (rc)
+                       lu_kmem_fini(echo_caches);
+       }
 
 # ifdef HAVE_SERVER_SUPPORT
-        if (rc == 0)
-                RETURN(0);
+       if (rc == 0)
+               RETURN(0);
 
-        class_unregister_type(LUSTRE_ECHO_NAME);
+       class_unregister_type(LUSTRE_ECHO_NAME);
 failed_1:
-        echo_persistent_pages_fini();
+       echo_persistent_pages_fini();
 failed_0:
 # endif
-        RETURN(rc);
+       RETURN(rc);
 }
 
-static void /*__exit*/ obdecho_exit(void)
+static void __exit obdecho_exit(void)
 {
-        echo_client_exit();
+       class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
+       lu_kmem_fini(echo_caches);
 
-# ifdef HAVE_SERVER_SUPPORT
-        class_unregister_type(LUSTRE_ECHO_NAME);
-        echo_persistent_pages_fini();
-# endif
+#ifdef HAVE_SERVER_SUPPORT
+       class_unregister_type(LUSTRE_ECHO_NAME);
+       echo_persistent_pages_fini();
+#endif
 }
 
-MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
-MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
+MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
+MODULE_DESCRIPTION("Lustre Echo Client test driver");
+MODULE_VERSION(LUSTRE_VERSION_STRING);
 MODULE_LICENSE("GPL");
 
-cfs_module(obdecho, LUSTRE_VERSION_STRING, obdecho_init, obdecho_exit);
-#endif /* __KERNEL__ */
+module_init(obdecho_init);
+module_exit(obdecho_exit);
 
 /** @} echo_client */