Whamcloud - gitweb
LU-11997 ptlrpc: Properly swab ll_fiemap_info_key
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
index 303ce27..07ed6b7 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2014, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <lprocfs_status.h>
 #include <cl_object.h>
 #include <lustre_fid.h>
+#include <lustre_lmv.h>
 #include <lustre_acl.h>
-#include <lustre_ioctl.h>
+#include <uapi/linux/lustre/lustre_ioctl.h>
 #include <lustre_net.h>
 #ifdef HAVE_SERVER_SUPPORT
 # include <md_object.h>
+
+#define ETI_NAME_LEN   20
+
 #endif /* HAVE_SERVER_SUPPORT */
 
 #include "echo_internal.h"
  * @{
  */
 
+/* echo thread key have a CL_THREAD flag, which set cl_env function directly */
+#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
+#define ECHO_DT_CTX_TAG (LCT_REMEMBER | LCT_DT_THREAD)
+#define ECHO_SES_TAG    (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
+
 struct echo_device {
        struct cl_device          ed_cl;
        struct echo_client_obd   *ed_ec;
@@ -94,7 +99,7 @@ struct echo_object_conf {
 
 struct echo_page {
        struct cl_page_slice    ep_cl;
-       struct mutex            ep_lock;
+       unsigned long           ep_lock;
 };
 
 struct echo_lock {
@@ -123,67 +128,68 @@ struct echo_md_device {
 #endif /* HAVE_SERVER_SUPPORT */
 
 static int echo_client_setup(const struct lu_env *env,
-                             struct obd_device *obddev,
-                             struct lustre_cfg *lcfg);
+                            struct obd_device *obddev,
+                            struct lustre_cfg *lcfg);
 static int echo_client_cleanup(struct obd_device *obddev);
 
-
 /** \defgroup echo_helpers Helper functions
  * @{
  */
 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
 {
-        return container_of0(dev, struct echo_device, ed_cl);
+       return container_of0(dev, struct echo_device, ed_cl);
 }
 
 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
 {
-        return &d->ed_cl;
+       return &d->ed_cl;
 }
 
 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
 {
-        return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
+       return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
 }
 
 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
 {
-        return &eco->eo_cl;
+       return &eco->eo_cl;
 }
 
 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
 {
-        return container_of(o, struct echo_object, eo_cl);
+       return container_of(o, struct echo_object, eo_cl);
 }
 
 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
 {
-        return container_of(s, struct echo_page, ep_cl);
+       return container_of(s, struct echo_page, ep_cl);
 }
 
 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
 {
-        return container_of(s, struct echo_lock, el_cl);
+       return container_of(s, struct echo_lock, el_cl);
 }
 
 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
 {
-        return ecl->el_cl.cls_lock;
+       return ecl->el_cl.cls_lock;
 }
 
 static struct lu_context_key echo_thread_key;
+
 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
 {
-        struct echo_thread_info *info;
-        info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
-        LASSERT(info != NULL);
-        return info;
+       struct echo_thread_info *info;
+
+       info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
+       LASSERT(info != NULL);
+       return info;
 }
 
 static inline
 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
 {
-        return container_of(c, struct echo_object_conf, eoc_cl);
+       return container_of(c, struct echo_object_conf, eoc_cl);
 }
 
 #ifdef HAVE_SERVER_SUPPORT
@@ -217,7 +223,6 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
 struct echo_thread_info {
        struct echo_object_conf eti_conf;
        struct lustre_md        eti_md;
-
        struct cl_2queue        eti_queue;
        struct cl_io            eti_io;
        struct cl_lock          eti_lock;
@@ -230,17 +235,18 @@ struct echo_thread_info {
        struct md_attr          eti_ma;
        struct lu_name          eti_lname;
        /* per-thread values, can be re-used */
-       void                    *eti_big_lmm;
+       void                    *eti_big_lmm; /* may be vmalloc'd */
        int                     eti_big_lmmsize;
-       char                    eti_name[20];
+       char                    eti_name[ETI_NAME_LEN];
        struct lu_buf           eti_buf;
-       char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
+       /* If we want to test large ACL, then need to enlarge the buffer. */
+       char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE_OLD];
 #endif
 };
 
 /* No session used right now */
 struct echo_session_info {
-        unsigned long dummy;
+       unsigned long dummy;
 };
 
 static struct kmem_cache *echo_lock_kmem;
@@ -250,29 +256,29 @@ static struct kmem_cache *echo_session_kmem;
 /* static struct kmem_cache *echo_req_kmem; */
 
 static struct lu_kmem_descr echo_caches[] = {
-        {
-                .ckd_cache = &echo_lock_kmem,
-                .ckd_name  = "echo_lock_kmem",
-                .ckd_size  = sizeof (struct echo_lock)
-        },
-        {
-                .ckd_cache = &echo_object_kmem,
-                .ckd_name  = "echo_object_kmem",
-                .ckd_size  = sizeof (struct echo_object)
-        },
-        {
-                .ckd_cache = &echo_thread_kmem,
-                .ckd_name  = "echo_thread_kmem",
-                .ckd_size  = sizeof (struct echo_thread_info)
-        },
-        {
-                .ckd_cache = &echo_session_kmem,
-                .ckd_name  = "echo_session_kmem",
-                .ckd_size  = sizeof (struct echo_session_info)
-        },
-        {
-                .ckd_cache = NULL
-        }
+       {
+               .ckd_cache = &echo_lock_kmem,
+               .ckd_name  = "echo_lock_kmem",
+               .ckd_size  = sizeof(struct echo_lock)
+       },
+       {
+               .ckd_cache = &echo_object_kmem,
+               .ckd_name  = "echo_object_kmem",
+               .ckd_size  = sizeof(struct echo_object)
+       },
+       {
+               .ckd_cache = &echo_thread_kmem,
+               .ckd_name  = "echo_thread_kmem",
+               .ckd_size  = sizeof(struct echo_thread_info)
+       },
+       {
+               .ckd_cache = &echo_session_kmem,
+               .ckd_name  = "echo_session_kmem",
+               .ckd_size  = sizeof(struct echo_session_info)
+       },
+       {
+               .ckd_cache = NULL
+       }
 };
 
 /** \defgroup echo_page Page operations
@@ -282,98 +288,103 @@ static struct lu_kmem_descr echo_caches[] = {
  * @{
  */
 static int echo_page_own(const struct lu_env *env,
-                         const struct cl_page_slice *slice,
-                         struct cl_io *io, int nonblock)
+                        const struct cl_page_slice *slice,
+                        struct cl_io *io, int nonblock)
 {
-        struct echo_page *ep = cl2echo_page(slice);
+       struct echo_page *ep = cl2echo_page(slice);
 
-        if (!nonblock)
-               mutex_lock(&ep->ep_lock);
-       else if (!mutex_trylock(&ep->ep_lock))
-                return -EAGAIN;
-        return 0;
+       if (!nonblock) {
+               if (test_and_set_bit(0, &ep->ep_lock))
+                       return -EAGAIN;
+       } else {
+               while (test_and_set_bit(0, &ep->ep_lock))
+                       wait_on_bit(&ep->ep_lock, 0, TASK_UNINTERRUPTIBLE);
+       }
+       return 0;
 }
 
 static void echo_page_disown(const struct lu_env *env,
-                             const struct cl_page_slice *slice,
-                             struct cl_io *io)
+                            const struct cl_page_slice *slice,
+                            struct cl_io *io)
 {
-        struct echo_page *ep = cl2echo_page(slice);
+       struct echo_page *ep = cl2echo_page(slice);
 
-       LASSERT(mutex_is_locked(&ep->ep_lock));
-       mutex_unlock(&ep->ep_lock);
+       LASSERT(test_bit(0, &ep->ep_lock));
+       clear_and_wake_up_bit(0, &ep->ep_lock);
 }
 
 static void echo_page_discard(const struct lu_env *env,
-                              const struct cl_page_slice *slice,
-                              struct cl_io *unused)
+                             const struct cl_page_slice *slice,
+                             struct cl_io *unused)
 {
-        cl_page_delete(env, slice->cpl_page);
+       cl_page_delete(env, slice->cpl_page);
 }
 
 static int echo_page_is_vmlocked(const struct lu_env *env,
-                                 const struct cl_page_slice *slice)
+                                const struct cl_page_slice *slice)
 {
-       if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
-                return -EBUSY;
-        return -ENODATA;
+       if (test_bit(0, &cl2echo_page(slice)->ep_lock))
+               return -EBUSY;
+       return -ENODATA;
 }
 
 static void echo_page_completion(const struct lu_env *env,
-                                 const struct cl_page_slice *slice,
-                                 int ioret)
+                                const struct cl_page_slice *slice,
+                                int ioret)
 {
-        LASSERT(slice->cpl_page->cp_sync_io != NULL);
+       LASSERT(slice->cpl_page->cp_sync_io != NULL);
 }
 
 static void echo_page_fini(const struct lu_env *env,
-                          struct cl_page_slice *slice)
+                          struct cl_page_slice *slice,
+                          struct pagevec *pvec)
 {
        struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
-       ENTRY;
 
+       ENTRY;
        atomic_dec(&eco->eo_npages);
-       page_cache_release(slice->cpl_page->cp_vmpage);
+       put_page(slice->cpl_page->cp_vmpage);
        EXIT;
 }
 
 static int echo_page_prep(const struct lu_env *env,
-                          const struct cl_page_slice *slice,
-                          struct cl_io *unused)
+                         const struct cl_page_slice *slice,
+                         struct cl_io *unused)
 {
-        return 0;
+       return 0;
 }
 
 static int echo_page_print(const struct lu_env *env,
-                           const struct cl_page_slice *slice,
-                           void *cookie, lu_printer_t printer)
+                          const struct cl_page_slice *slice,
+                          void *cookie, lu_printer_t printer)
 {
        struct echo_page *ep = cl2echo_page(slice);
 
        (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
-                  ep, mutex_is_locked(&ep->ep_lock),
+                  ep, test_bit(0, &ep->ep_lock),
                   slice->cpl_page->cp_vmpage);
        return 0;
 }
 
 static const struct cl_page_operations echo_page_ops = {
-        .cpo_own           = echo_page_own,
-        .cpo_disown        = echo_page_disown,
-        .cpo_discard       = echo_page_discard,
-        .cpo_fini          = echo_page_fini,
-        .cpo_print         = echo_page_print,
-        .cpo_is_vmlocked   = echo_page_is_vmlocked,
-        .io = {
-                [CRT_READ] = {
-                        .cpo_prep        = echo_page_prep,
-                        .cpo_completion  = echo_page_completion,
-                },
-                [CRT_WRITE] = {
-                        .cpo_prep        = echo_page_prep,
-                        .cpo_completion  = echo_page_completion,
-                }
-        }
+       .cpo_own           = echo_page_own,
+       .cpo_disown        = echo_page_disown,
+       .cpo_discard       = echo_page_discard,
+       .cpo_fini          = echo_page_fini,
+       .cpo_print         = echo_page_print,
+       .cpo_is_vmlocked   = echo_page_is_vmlocked,
+       .io = {
+               [CRT_READ] = {
+                       .cpo_prep        = echo_page_prep,
+                       .cpo_completion  = echo_page_completion,
+               },
+               [CRT_WRITE] = {
+                       .cpo_prep        = echo_page_prep,
+                       .cpo_completion  = echo_page_completion,
+               }
+       }
 };
+
 /** @} echo_page */
 
 /** \defgroup echo_lock Locking
@@ -383,16 +394,16 @@ static const struct cl_page_operations echo_page_ops = {
  * @{
  */
 static void echo_lock_fini(const struct lu_env *env,
-                           struct cl_lock_slice *slice)
+                          struct cl_lock_slice *slice)
 {
-        struct echo_lock *ecl = cl2echo_lock(slice);
+       struct echo_lock *ecl = cl2echo_lock(slice);
 
        LASSERT(list_empty(&ecl->el_chain));
-        OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
+       OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
 }
 
 static struct cl_lock_operations echo_lock_ops = {
-        .clo_fini      = echo_lock_fini,
+       .clo_fini      = echo_lock_fini,
 };
 
 /** @} echo_lock */
@@ -408,19 +419,25 @@ static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
 {
        struct echo_page *ep = cl_object_page_slice(obj, page);
        struct echo_object *eco = cl2echo_obj(obj);
-       ENTRY;
 
-       page_cache_get(page->cp_vmpage);
-       mutex_init(&ep->ep_lock);
+       ENTRY;
+       get_page(page->cp_vmpage);
+       /*
+        * ep_lock is similar to the lock_page() lock, and
+        * cannot usefully be monitored by lockdep.
+        * So just use a bit in an "unsigned long" and use the
+        * wait_on_bit() interface to wait for the bit to be clear.
+        */
+       ep->ep_lock = 0;
        cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
        atomic_inc(&eco->eo_npages);
        RETURN(0);
 }
 
 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
-                        struct cl_io *io)
+                       struct cl_io *io)
 {
-        return 0;
+       return 0;
 }
 
 static int echo_lock_init(const struct lu_env *env,
@@ -428,29 +445,29 @@ static int echo_lock_init(const struct lu_env *env,
                          const struct cl_io *unused)
 {
        struct echo_lock *el;
-       ENTRY;
 
+       ENTRY;
        OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
-       if (el != NULL) {
+       if (el) {
                cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
                el->el_object = cl2echo_obj(obj);
                INIT_LIST_HEAD(&el->el_chain);
                atomic_set(&el->el_refcount, 0);
        }
-       RETURN(el == NULL ? -ENOMEM : 0);
+       RETURN(el ? 0 : -ENOMEM);
 }
 
 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
-                         const struct cl_object_conf *conf)
+                        const struct cl_object_conf *conf)
 {
-        return 0;
+       return 0;
 }
 
 static const struct cl_object_operations echo_cl_obj_ops = {
-        .coo_page_init = echo_page_init,
-        .coo_lock_init = echo_lock_init,
-        .coo_io_init   = echo_io_init,
-        .coo_conf_set  = echo_conf_set
+       .coo_page_init = echo_page_init,
+       .coo_lock_init = echo_lock_init,
+       .coo_io_init   = echo_io_init,
+       .coo_conf_set  = echo_conf_set
 };
 /** @} echo_cl_ops */
 
@@ -461,40 +478,42 @@ static const struct cl_object_operations echo_cl_obj_ops = {
  * @{
  */
 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
-                            const struct lu_object_conf *conf)
+                           const struct lu_object_conf *conf)
 {
-        struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
-        struct echo_client_obd *ec     = ed->ed_ec;
-        struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
-        ENTRY;
-
-        if (ed->ed_next) {
-                struct lu_object  *below;
-                struct lu_device  *under;
+       struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
+       struct echo_client_obd *ec     = ed->ed_ec;
+       struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
 
-                under = ed->ed_next;
-                below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
-                                                        under);
-                if (below == NULL)
-                        RETURN(-ENOMEM);
-                lu_object_add(obj, below);
-        }
+       ENTRY;
+       if (ed->ed_next) {
+               struct lu_object  *below;
+               struct lu_device  *under;
+
+               under = ed->ed_next;
+               below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
+                                                       under);
+               if (!below)
+                       RETURN(-ENOMEM);
+               lu_object_add(obj, below);
+       }
 
-        if (!ed->ed_next_ismd) {
-                const struct cl_object_conf *cconf = lu2cl_conf(conf);
-                struct echo_object_conf *econf = cl2echo_conf(cconf);
+       if (!ed->ed_next_ismd) {
+               const struct cl_object_conf *cconf = lu2cl_conf(conf);
+               struct echo_object_conf *econf = cl2echo_conf(cconf);
 
                LASSERT(econf->eoc_oinfo != NULL);
 
-               /* Transfer the oinfo pointer to eco that it won't be
-                * freed. */
+               /*
+                * Transfer the oinfo pointer to eco that it won't be
+                * freed.
+                */
                eco->eo_oinfo = *econf->eoc_oinfo;
                *econf->eoc_oinfo = NULL;
        } else {
                eco->eo_oinfo = NULL;
        }
 
-        eco->eo_dev = ed;
+       eco->eo_dev = ed;
        atomic_set(&eco->eo_npages, 0);
        cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
 
@@ -505,11 +524,18 @@ static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
        RETURN(0);
 }
 
-static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
+static void echo_object_delete(const struct lu_env *env, struct lu_object *obj)
 {
-        struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
-        struct echo_client_obd *ec = eco->eo_dev->ed_ec;
-        ENTRY;
+       struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
+       struct echo_client_obd *ec;
+
+       ENTRY;
+
+       /* object delete called unconditolally - layer init or not */
+       if (eco->eo_dev == NULL)
+               return;
+
+       ec = eco->eo_dev->ed_ec;
 
        LASSERT(atomic_read(&eco->eo_npages) == 0);
 
@@ -517,31 +543,38 @@ static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
        list_del_init(&eco->eo_obj_chain);
        spin_unlock(&ec->ec_lock);
 
-        lu_object_fini(obj);
-        lu_object_header_fini(obj->lo_header);
-
-       if (eco->eo_oinfo != NULL)
+       if (eco->eo_oinfo)
                OBD_FREE_PTR(eco->eo_oinfo);
+}
+
+static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
+{
+       struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
+
+       ENTRY;
+
+       lu_object_fini(obj);
+       lu_object_header_fini(obj->lo_header);
 
        OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
        EXIT;
 }
 
 static int echo_object_print(const struct lu_env *env, void *cookie,
-                            lu_printer_t p, const struct lu_object *o)
+                            lu_printer_t p, const struct lu_object *o)
 {
-        struct echo_object *obj = cl2echo_obj(lu2cl(o));
+       struct echo_object *obj = cl2echo_obj(lu2cl(o));
 
-        return (*p)(env, cookie, "echoclient-object@%p", obj);
+       return (*p)(env, cookie, "echoclient-object@%p", obj);
 }
 
 static const struct lu_object_operations echo_lu_obj_ops = {
-        .loo_object_init      = echo_object_init,
-        .loo_object_delete    = NULL,
-        .loo_object_release   = NULL,
-        .loo_object_free      = echo_object_free,
-        .loo_object_print     = echo_object_print,
-        .loo_object_invariant = NULL
+       .loo_object_init      = echo_object_init,
+       .loo_object_delete    = echo_object_delete,
+       .loo_object_release   = NULL,
+       .loo_object_free      = echo_object_free,
+       .loo_object_print     = echo_object_print,
+       .loo_object_invariant = NULL
 };
 /** @} echo_lu_ops */
 
@@ -557,12 +590,12 @@ static struct lu_object *echo_object_alloc(const struct lu_env *env,
 {
        struct echo_object *eco;
        struct lu_object *obj = NULL;
-       ENTRY;
 
+       ENTRY;
        /* we're the top dev. */
        LASSERT(hdr == NULL);
        OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
-       if (eco != NULL) {
+       if (eco) {
                struct cl_object_header *hdr = &eco->eo_hdr;
 
                obj = &echo_obj2cl(eco)->co_lu;
@@ -579,7 +612,7 @@ static struct lu_object *echo_object_alloc(const struct lu_env *env,
 }
 
 static struct lu_device_operations echo_device_lu_ops = {
-        .ldo_object_alloc   = echo_object_alloc,
+       .ldo_object_alloc   = echo_object_alloc,
 };
 
 /** @} echo_lu_dev_ops */
@@ -592,15 +625,15 @@ static struct lu_device_operations echo_device_lu_ops = {
  */
 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
 {
-        struct cl_site *site = &ed->ed_site_myself;
-        int rc;
+       struct cl_site *site = &ed->ed_site_myself;
+       int rc;
 
        /* initialize site */
-        rc = cl_site_init(site, &ed->ed_cl);
-        if (rc) {
+       rc = cl_site_init(site, &ed->ed_cl);
+       if (rc) {
                CERROR("Cannot initialize site for echo client(%d)\n", rc);
-                return rc;
-        }
+               return rc;
+       }
 
        rc = lu_site_init_finish(&site->cs_lu);
        if (rc) {
@@ -627,28 +660,23 @@ static void *echo_thread_key_init(const struct lu_context *ctx,
        struct echo_thread_info *info;
 
        OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
-       if (info == NULL)
+       if (!info)
                info = ERR_PTR(-ENOMEM);
        return info;
 }
 
 static void echo_thread_key_fini(const struct lu_context *ctx,
-                         struct lu_context_key *key, void *data)
+                                struct lu_context_key *key, void *data)
 {
-        struct echo_thread_info *info = data;
-        OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
-}
+       struct echo_thread_info *info = data;
 
-static void echo_thread_key_exit(const struct lu_context *ctx,
-                         struct lu_context_key *key, void *data)
-{
+       OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
 }
 
 static struct lu_context_key echo_thread_key = {
-        .lct_tags = LCT_CL_THREAD,
-        .lct_init = echo_thread_key_init,
-        .lct_fini = echo_thread_key_fini,
-        .lct_exit = echo_thread_key_exit
+       .lct_tags = LCT_CL_THREAD,
+       .lct_init = echo_thread_key_init,
+       .lct_fini = echo_thread_key_fini,
 };
 
 static void *echo_session_key_init(const struct lu_context *ctx,
@@ -657,28 +685,23 @@ static void *echo_session_key_init(const struct lu_context *ctx,
        struct echo_session_info *session;
 
        OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
-       if (session == NULL)
+       if (!session)
                session = ERR_PTR(-ENOMEM);
        return session;
 }
 
 static void echo_session_key_fini(const struct lu_context *ctx,
-                                 struct lu_context_key *key, void *data)
+                                 struct lu_context_key *key, void *data)
 {
-        struct echo_session_info *session = data;
-        OBD_SLAB_FREE_PTR(session, echo_session_kmem);
-}
+       struct echo_session_info *session = data;
 
-static void echo_session_key_exit(const struct lu_context *ctx,
-                                 struct lu_context_key *key, void *data)
-{
+       OBD_SLAB_FREE_PTR(session, echo_session_kmem);
 }
 
 static struct lu_context_key echo_session_key = {
-        .lct_tags = LCT_SESSION,
-        .lct_init = echo_session_key_init,
-        .lct_fini = echo_session_key_fini,
-        .lct_exit = echo_session_key_exit
+       .lct_tags = LCT_SESSION,
+       .lct_init = echo_session_key_init,
+       .lct_fini = echo_session_key_fini,
 };
 
 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
@@ -688,55 +711,54 @@ LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
 static int echo_fid_init(struct echo_device *ed, char *obd_name,
                         struct seq_server_site *ss)
 {
-        char *prefix;
-        int rc;
-        ENTRY;
+       char *prefix;
+       int rc;
 
-        OBD_ALLOC_PTR(ed->ed_cl_seq);
-        if (ed->ed_cl_seq == NULL)
-                RETURN(-ENOMEM);
+       ENTRY;
+       OBD_ALLOC_PTR(ed->ed_cl_seq);
+       if (!ed->ed_cl_seq)
+               RETURN(-ENOMEM);
 
-        OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
-        if (prefix == NULL)
-                GOTO(out_free_seq, rc = -ENOMEM);
+       OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+       if (!prefix)
+               GOTO(out_free_seq, rc = -ENOMEM);
 
-        snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
+       snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
 
        /* Init client side sequence-manager */
        rc = seq_client_init(ed->ed_cl_seq, NULL,
                             LUSTRE_SEQ_METADATA,
                             prefix, ss->ss_server_seq);
-        ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
-        OBD_FREE(prefix, MAX_OBD_NAME + 5);
-        if (rc)
-                GOTO(out_free_seq, rc);
+       ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
+       OBD_FREE(prefix, MAX_OBD_NAME + 5);
+       if (rc)
+               GOTO(out_free_seq, rc);
 
-        RETURN(0);
+       RETURN(0);
 
 out_free_seq:
-        OBD_FREE_PTR(ed->ed_cl_seq);
-        ed->ed_cl_seq = NULL;
-        RETURN(rc);
+       OBD_FREE_PTR(ed->ed_cl_seq);
+       ed->ed_cl_seq = NULL;
+       RETURN(rc);
 }
 
 static int echo_fid_fini(struct obd_device *obddev)
 {
-        struct echo_device *ed = obd2echo_dev(obddev);
-        ENTRY;
+       struct echo_device *ed = obd2echo_dev(obddev);
 
-        if (ed->ed_cl_seq != NULL) {
-                seq_client_fini(ed->ed_cl_seq);
-                OBD_FREE_PTR(ed->ed_cl_seq);
-                ed->ed_cl_seq = NULL;
-        }
+       ENTRY;
+       if (ed->ed_cl_seq) {
+               seq_client_fini(ed->ed_cl_seq);
+               OBD_FREE_PTR(ed->ed_cl_seq);
+               ed->ed_cl_seq = NULL;
+       }
 
-        RETURN(0);
+       RETURN(0);
 }
 
 static void echo_ed_los_fini(const struct lu_env *env, struct echo_device *ed)
 {
        ENTRY;
-
        if (ed != NULL && ed->ed_next_ismd && ed->ed_los != NULL) {
                local_oid_storage_fini(env, ed->ed_los);
                ed->ed_los = NULL;
@@ -752,8 +774,8 @@ echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
        struct dt_object        *parent = NULL;
        struct dt_object        *dto = NULL;
        int                      rc = 0;
-       ENTRY;
 
+       ENTRY;
        LASSERT(!fid_is_zero(pfid));
        parent = dt_locate(env, emd->emd_bottom, pfid);
        if (unlikely(IS_ERR(parent)))
@@ -766,13 +788,15 @@ echo_md_local_file_create(const struct lu_env *env, struct echo_md_device *emd,
                GOTO(out_put, rc = PTR_ERR(dto));
 
        *fid = *lu_object_fid(&dto->do_lu);
-       /* since stack is not fully set up the local_storage uses own stack
-        * and we should drop its object from cache */
-       lu_object_put_nocache(env, &dto->do_lu);
+       /*
+        * since stack is not fully set up the local_storage uses own stack
+        * and we should drop its object from cache
+        */
+       dt_object_put_nocache(env, dto);
 
        EXIT;
 out_put:
-       lu_object_put(env, &parent->do_lu);
+       dt_object_put(env, parent);
        RETURN(rc);
 }
 
@@ -780,10 +804,10 @@ static int
 echo_md_root_get(const struct lu_env *env, struct echo_md_device *emd,
                 struct echo_device *ed)
 {
-       struct lu_fid                    fid;
-       int                              rc = 0;
-       ENTRY;
+       struct lu_fid fid;
+       int rc = 0;
 
+       ENTRY;
        /* Setup local dirs */
        fid.f_seq = FID_SEQ_LOCAL_NAME;
        fid.f_oid = 1;
@@ -816,44 +840,44 @@ out_los:
 #endif /* HAVE_SERVER_SUPPORT */
 
 static struct lu_device *echo_device_alloc(const struct lu_env *env,
-                                           struct lu_device_type *t,
-                                           struct lustre_cfg *cfg)
-{
-        struct lu_device   *next;
-        struct echo_device *ed;
-        struct cl_device   *cd;
-        struct obd_device  *obd = NULL; /* to keep compiler happy */
-        struct obd_device  *tgt;
-        const char *tgt_type_name;
-        int rc;
-        int cleanup = 0;
-        ENTRY;
-
-        OBD_ALLOC_PTR(ed);
-        if (ed == NULL)
-                GOTO(out, rc = -ENOMEM);
-
-        cleanup = 1;
-        cd = &ed->ed_cl;
-        rc = cl_device_init(cd, t);
-        if (rc)
-                GOTO(out, rc);
-
-        cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
-
-        cleanup = 2;
-        obd = class_name2obd(lustre_cfg_string(cfg, 0));
-        LASSERT(obd != NULL);
-        LASSERT(env != NULL);
-
-        tgt = class_name2obd(lustre_cfg_string(cfg, 1));
-        if (tgt == NULL) {
-                CERROR("Can not find tgt device %s\n",
-                        lustre_cfg_string(cfg, 1));
-                GOTO(out, rc = -ENODEV);
-        }
-
-        next = tgt->obd_lu_dev;
+                                          struct lu_device_type *t,
+                                          struct lustre_cfg *cfg)
+{
+       struct lu_device   *next;
+       struct echo_device *ed;
+       struct cl_device   *cd;
+       struct obd_device  *obd = NULL; /* to keep compiler happy */
+       struct obd_device  *tgt;
+       const char *tgt_type_name;
+       int rc;
+       int cleanup = 0;
+
+       ENTRY;
+       OBD_ALLOC_PTR(ed);
+       if (!ed)
+               GOTO(out, rc = -ENOMEM);
+
+       cleanup = 1;
+       cd = &ed->ed_cl;
+       rc = cl_device_init(cd, t);
+       if (rc)
+               GOTO(out, rc);
+
+       cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
+
+       cleanup = 2;
+       obd = class_name2obd(lustre_cfg_string(cfg, 0));
+       LASSERT(obd != NULL);
+       LASSERT(env != NULL);
+
+       tgt = class_name2obd(lustre_cfg_string(cfg, 1));
+       if (!tgt) {
+               CERROR("Can not find tgt device %s\n",
+                       lustre_cfg_string(cfg, 1));
+               GOTO(out, rc = -ENODEV);
+       }
+
+       next = tgt->obd_lu_dev;
 
        if (strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME) == 0) {
                ed->ed_next_ismd = 1;
@@ -867,16 +891,16 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
                GOTO(out, rc = -EINVAL);
        }
 
-        cleanup = 3;
+       cleanup = 3;
 
-        rc = echo_client_setup(env, obd, cfg);
-        if (rc)
-                GOTO(out, rc);
+       rc = echo_client_setup(env, obd, cfg);
+       if (rc)
+               GOTO(out, rc);
 
-        ed->ed_ec = &obd->u.echo_client;
-        cleanup = 4;
+       ed->ed_ec = &obd->u.echo_client;
+       cleanup = 4;
 
-        if (ed->ed_next_ismd) {
+       if (ed->ed_next_ismd) {
 #ifdef HAVE_SERVER_SUPPORT
                /* Suppose to connect to some Metadata layer */
                struct lu_site          *ls = NULL;
@@ -885,21 +909,21 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
                struct echo_md_device   *emd = NULL;
                int                      found = 0;
 
-                if (next == NULL) {
-                        CERROR("%s is not lu device type!\n",
-                               lustre_cfg_string(cfg, 1));
-                        GOTO(out, rc = -EINVAL);
-                }
+               if (!next) {
+                       CERROR("%s is not lu device type!\n",
+                              lustre_cfg_string(cfg, 1));
+                       GOTO(out, rc = -EINVAL);
+               }
 
-                tgt_type_name = lustre_cfg_string(cfg, 2);
-                if (!tgt_type_name) {
-                        CERROR("%s no type name for echo %s setup\n",
-                                lustre_cfg_string(cfg, 1),
-                                tgt->obd_type->typ_name);
-                        GOTO(out, rc = -EINVAL);
-                }
+               tgt_type_name = lustre_cfg_string(cfg, 2);
+               if (!tgt_type_name) {
+                       CERROR("%s no type name for echo %s setup\n",
+                               lustre_cfg_string(cfg, 1),
+                               tgt->obd_type->typ_name);
+                       GOTO(out, rc = -EINVAL);
+               }
 
-                ls = next->ld_site;
+               ls = next->ld_site;
 
                spin_lock(&ls->ls_ld_lock);
                list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
@@ -910,11 +934,11 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
                }
                spin_unlock(&ls->ls_ld_lock);
 
-                if (found == 0) {
-                        CERROR("%s is not lu device type!\n",
-                               lustre_cfg_string(cfg, 1));
-                        GOTO(out, rc = -EINVAL);
-                }
+               if (found == 0) {
+                       CERROR("%s is not lu device type!\n",
+                              lustre_cfg_string(cfg, 1));
+                       GOTO(out, rc = -EINVAL);
+               }
 
                next = ld;
                /* For MD echo client, it will use the site in MDS stack */
@@ -935,79 +959,86 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
                        GOTO(out, rc);
                }
 #else /* !HAVE_SERVER_SUPPORT */
-               CERROR("Local operations are NOT supported on client side. "
-                      "Only remote operations are supported. Metadata client "
-                      "must be run on server side.\n");
+               CERROR(
+                      "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
                GOTO(out, rc = -EOPNOTSUPP);
 #endif /* HAVE_SERVER_SUPPORT */
-        } else {
-                 /* if echo client is to be stacked upon ost device, the next is
-                  * NULL since ost is not a clio device so far */
-                if (next != NULL && !lu_device_is_cl(next))
-                        next = NULL;
-
-                tgt_type_name = tgt->obd_type->typ_name;
-                if (next != NULL) {
-                        LASSERT(next != NULL);
-                        if (next->ld_site != NULL)
-                                GOTO(out, rc = -EBUSY);
-
-                        next->ld_site = ed->ed_site;
-                        rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
-                                                     next->ld_type->ldt_name,
-                                                     NULL);
-                        if (rc)
-                                GOTO(out, rc);
-                } else
-                        LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
-        }
-
-        ed->ed_next = next;
-        RETURN(&cd->cd_lu_dev);
+       } else {
+               /*
+                * if echo client is to be stacked upon ost device, the next is
+                * NULL since ost is not a clio device so far
+                */
+               if (next != NULL && !lu_device_is_cl(next))
+                       next = NULL;
+
+               tgt_type_name = tgt->obd_type->typ_name;
+               if (next) {
+                       LASSERT(next != NULL);
+                       if (next->ld_site)
+                               GOTO(out, rc = -EBUSY);
+
+                       next->ld_site = ed->ed_site;
+                       rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
+                                                       next->ld_type->ldt_name,
+                                                       NULL);
+                       if (rc)
+                               GOTO(out, rc);
+               } else {
+                       LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
+               }
+       }
+
+       ed->ed_next = next;
+       RETURN(&cd->cd_lu_dev);
 out:
-        switch(cleanup) {
-        case 4: {
-                int rc2;
-                rc2 = echo_client_cleanup(obd);
-                if (rc2)
-                        CERROR("Cleanup obd device %s error(%d)\n",
-                               obd->obd_name, rc2);
-        }
-
-        case 3:
-                echo_site_fini(env, ed);
-        case 2:
-                cl_device_fini(&ed->ed_cl);
-        case 1:
-                OBD_FREE_PTR(ed);
-        case 0:
-        default:
-                break;
-        }
-        return(ERR_PTR(rc));
+       switch (cleanup) {
+       case 4: {
+               int rc2;
+
+               rc2 = echo_client_cleanup(obd);
+               if (rc2)
+                       CERROR("Cleanup obd device %s error(%d)\n",
+                              obd->obd_name, rc2);
+       }
+       /* fallthrough */
+
+       case 3:
+               echo_site_fini(env, ed);
+               /* fallthrough */
+       case 2:
+               cl_device_fini(&ed->ed_cl);
+               /* fallthrough */
+       case 1:
+               OBD_FREE_PTR(ed);
+               /* fallthrough */
+       case 0:
+       default:
+               break;
+       }
+       return ERR_PTR(rc);
 }
 
 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
-                          const char *name, struct lu_device *next)
+                           const char *name, struct lu_device *next)
 {
-        LBUG();
-        return 0;
+       LBUG();
+       return 0;
 }
 
 static struct lu_device *echo_device_fini(const struct lu_env *env,
-                                          struct lu_device *d)
+                                         struct lu_device *d)
 {
-        struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
-        struct lu_device *next = ed->ed_next;
+       struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
+       struct lu_device *next = ed->ed_next;
 
-        while (next && !ed->ed_next_ismd)
-                next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
-        return NULL;
+       while (next && !ed->ed_next_ismd)
+               next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
+       return NULL;
 }
 
 static void echo_lock_release(const struct lu_env *env,
-                              struct echo_lock *ecl,
-                              int still_used)
+                             struct echo_lock *ecl,
+                             int still_used)
 {
        struct cl_lock *clk = echo_lock2cl(ecl);
 
@@ -1015,23 +1046,24 @@ static void echo_lock_release(const struct lu_env *env,
 }
 
 static struct lu_device *echo_device_free(const struct lu_env *env,
-                                          struct lu_device *d)
+                                         struct lu_device *d)
 {
-        struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
-        struct echo_client_obd *ec   = ed->ed_ec;
-        struct echo_object     *eco;
-        struct lu_device       *next = ed->ed_next;
+       struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
+       struct echo_client_obd *ec   = ed->ed_ec;
+       struct echo_object     *eco;
+       struct lu_device       *next = ed->ed_next;
 
-        CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
-               ed, next);
+       CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
+              ed, next);
 
        lu_site_purge(env, ed->ed_site, -1);
 
-        /* check if there are objects still alive.
-         * It shouldn't have any object because lu_site_purge would cleanup
-         * all of cached objects. Anyway, probably the echo device is being
-         * parallelly accessed.
-         */
+       /*
+        * check if there are objects still alive.
+        * It shouldn't have any object because lu_site_purge would cleanup
+        * all of cached objects. Anyway, probably the echo device is being
+        * parallelly accessed.
+        */
        spin_lock(&ec->ec_lock);
        list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
                eco->eo_deleted = 1;
@@ -1047,8 +1079,8 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
        spin_lock(&ec->ec_lock);
        while (!list_empty(&ec->ec_objects)) {
                spin_unlock(&ec->ec_lock);
-               CERROR("echo_client still has objects at cleanup time, "
-                      "wait for 1 second\n");
+               CERROR(
+                      "echo_client still has objects at cleanup time, wait for 1 second\n");
                set_current_state(TASK_UNINTERRUPTIBLE);
                schedule_timeout(cfs_time_seconds(1));
                lu_site_purge(env, ed->ed_site, -1);
@@ -1068,32 +1100,34 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
        while (next && !ed->ed_next_ismd)
                next = next->ld_type->ldt_ops->ldto_device_free(env, next);
 
-        LASSERT(ed->ed_site == d->ld_site);
-        echo_site_fini(env, ed);
-        cl_device_fini(&ed->ed_cl);
-        OBD_FREE_PTR(ed);
+       LASSERT(ed->ed_site == d->ld_site);
+       echo_site_fini(env, ed);
+       cl_device_fini(&ed->ed_cl);
+       OBD_FREE_PTR(ed);
 
-        return NULL;
+       cl_env_cache_purge(~0);
+
+       return NULL;
 }
 
 static const struct lu_device_type_operations echo_device_type_ops = {
-        .ldto_init = echo_type_init,
-        .ldto_fini = echo_type_fini,
+       .ldto_init = echo_type_init,
+       .ldto_fini = echo_type_fini,
 
-        .ldto_start = echo_type_start,
-        .ldto_stop  = echo_type_stop,
+       .ldto_start = echo_type_start,
+       .ldto_stop  = echo_type_stop,
 
-        .ldto_device_alloc = echo_device_alloc,
-        .ldto_device_free  = echo_device_free,
-        .ldto_device_init  = echo_device_init,
-        .ldto_device_fini  = echo_device_fini
+       .ldto_device_alloc = echo_device_alloc,
+       .ldto_device_free  = echo_device_free,
+       .ldto_device_init  = echo_device_init,
+       .ldto_device_fini  = echo_device_fini
 };
 
 static struct lu_device_type echo_device_type = {
-        .ldt_tags     = LU_DEVICE_CL,
-        .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
-        .ldt_ops      = &echo_device_type_ops,
-        .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
+       .ldt_tags     = LU_DEVICE_CL,
+       .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
+       .ldt_ops      = &echo_device_type_ops,
+       .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
 };
 /** @} echo_init */
 
@@ -1115,34 +1149,36 @@ cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
        struct cl_object *obj;
        struct lov_oinfo *oinfo = NULL;
        struct lu_fid *fid;
-       int refcheck;
+       __u16  refcheck;
        int rc;
-       ENTRY;
 
+       ENTRY;
        LASSERTF(ostid_id(oi) != 0, DOSTID"\n", POSTID(oi));
        LASSERTF(ostid_seq(oi) == FID_SEQ_ECHO, DOSTID"\n", POSTID(oi));
 
-        /* Never return an object if the obd is to be freed. */
-        if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
-                RETURN(ERR_PTR(-ENODEV));
+       /* Never return an object if the obd is to be freed. */
+       if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
+               RETURN(ERR_PTR(-ENODEV));
 
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN((void *)env);
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN((void *)env);
 
-        info = echo_env_info(env);
-        conf = &info->eti_conf;
-        if (d->ed_next) {
+       info = echo_env_info(env);
+       conf = &info->eti_conf;
+       if (d->ed_next) {
                OBD_ALLOC_PTR(oinfo);
-               if (oinfo == NULL)
+               if (!oinfo)
                        GOTO(out, eco = ERR_PTR(-ENOMEM));
 
                oinfo->loi_oi = *oi;
                conf->eoc_cl.u.coc_oinfo = oinfo;
        }
 
-       /* If echo_object_init() is successful then ownership of oinfo
-        * is transferred to the object. */
+       /*
+        * If echo_object_init() is successful then ownership of oinfo
+        * is transferred to the object.
+        */
        conf->eoc_oinfo = &oinfo;
 
        fid = &info->eti_fid;
@@ -1150,75 +1186,78 @@ cl_echo_object_find(struct echo_device *d, const struct ost_id *oi)
        if (rc != 0)
                GOTO(out, eco = ERR_PTR(rc));
 
-       /* In the function below, .hs_keycmp resolves to
-        * lu_obj_hop_keycmp() */
+       /*
+        * In the function below, .hs_keycmp resolves to
+        * lu_obj_hop_keycmp()
+        */
        /* coverity[overrun-buffer-val] */
-        obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
-        if (IS_ERR(obj))
-                GOTO(out, eco = (void*)obj);
-
-        eco = cl2echo_obj(obj);
-        if (eco->eo_deleted) {
-                cl_object_put(env, obj);
-                eco = ERR_PTR(-EAGAIN);
-        }
+       obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
+       if (IS_ERR(obj))
+               GOTO(out, eco = (void *)obj);
+
+       eco = cl2echo_obj(obj);
+       if (eco->eo_deleted) {
+               cl_object_put(env, obj);
+               eco = ERR_PTR(-EAGAIN);
+       }
 
 out:
-       if (oinfo != NULL)
+       if (oinfo)
                OBD_FREE_PTR(oinfo);
 
-        cl_env_put(env, &refcheck);
-        RETURN(eco);
+       cl_env_put(env, &refcheck);
+       RETURN(eco);
 }
 
 static int cl_echo_object_put(struct echo_object *eco)
 {
-        struct lu_env *env;
-        struct cl_object *obj = echo_obj2cl(eco);
-        int refcheck;
-        ENTRY;
+       struct lu_env *env;
+       struct cl_object *obj = echo_obj2cl(eco);
+       __u16  refcheck;
+
+       ENTRY;
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
 
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
+       /* an external function to kill an object? */
+       if (eco->eo_deleted) {
+               struct lu_object_header *loh = obj->co_lu.lo_header;
 
-        /* an external function to kill an object? */
-        if (eco->eo_deleted) {
-                struct lu_object_header *loh = obj->co_lu.lo_header;
-                LASSERT(&eco->eo_hdr == luh2coh(loh));
+               LASSERT(&eco->eo_hdr == luh2coh(loh));
                set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
-        }
+       }
 
-        cl_object_put(env, obj);
-        cl_env_put(env, &refcheck);
-        RETURN(0);
+       cl_object_put(env, obj);
+       cl_env_put(env, &refcheck);
+       RETURN(0);
 }
 
 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
                            u64 start, u64 end, int mode,
-                           __u64 *cookie , __u32 enqflags)
-{
-        struct cl_io *io;
-        struct cl_lock *lck;
-        struct cl_object *obj;
-        struct cl_lock_descr *descr;
-        struct echo_thread_info *info;
-        int rc = -ENOMEM;
-        ENTRY;
-
-        info = echo_env_info(env);
-        io = &info->eti_io;
+                           __u64 *cookie, __u32 enqflags)
+{
+       struct cl_io *io;
+       struct cl_lock *lck;
+       struct cl_object *obj;
+       struct cl_lock_descr *descr;
+       struct echo_thread_info *info;
+       int rc = -ENOMEM;
+
+       ENTRY;
+       info = echo_env_info(env);
+       io = &info->eti_io;
        lck = &info->eti_lock;
        obj = echo_obj2cl(eco);
 
        memset(lck, 0, sizeof(*lck));
        descr = &lck->cll_descr;
-        descr->cld_obj   = obj;
-        descr->cld_start = cl_index(obj, start);
-        descr->cld_end   = cl_index(obj, end);
-        descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
-        descr->cld_enq_flags = enqflags;
-        io->ci_obj = obj;
+       descr->cld_obj   = obj;
+       descr->cld_start = cl_index(obj, start);
+       descr->cld_end   = cl_index(obj, end);
+       descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
+       descr->cld_enq_flags = enqflags;
+       io->ci_obj = obj;
 
        rc = cl_lock_request(env, io, lck);
        if (rc == 0) {
@@ -1239,155 +1278,162 @@ static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
 }
 
 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
-                           __u64 cookie)
+                          __u64 cookie)
 {
-        struct echo_client_obd *ec = ed->ed_ec;
-        struct echo_lock       *ecl = NULL;
-       struct list_head        *el;
-        int found = 0, still_used = 0;
-        ENTRY;
+       struct echo_client_obd *ec = ed->ed_ec;
+       struct echo_lock *ecl = NULL;
+       struct list_head *el;
+       int found = 0, still_used = 0;
 
-        LASSERT(ec != NULL);
+       ENTRY;
+       LASSERT(ec != NULL);
        spin_lock(&ec->ec_lock);
        list_for_each(el, &ec->ec_locks) {
                ecl = list_entry(el, struct echo_lock, el_chain);
-                CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
-                found = (ecl->el_cookie == cookie);
-                if (found) {
+               CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
+               found = (ecl->el_cookie == cookie);
+               if (found) {
                        if (atomic_dec_and_test(&ecl->el_refcount))
                                list_del_init(&ecl->el_chain);
-                        else
-                                still_used = 1;
-                        break;
-                }
-        }
+                       else
+                               still_used = 1;
+                       break;
+               }
+       }
        spin_unlock(&ec->ec_lock);
 
-        if (!found)
-                RETURN(-ENOENT);
+       if (!found)
+               RETURN(-ENOENT);
 
-        echo_lock_release(env, ecl, still_used);
-        RETURN(0);
+       echo_lock_release(env, ecl, still_used);
+       RETURN(0);
 }
 
 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
-                               struct cl_page *page)
+                                struct pagevec *pvec)
 {
        struct echo_thread_info *info;
        struct cl_2queue        *queue;
+       int i = 0;
 
        info = echo_env_info(env);
        LASSERT(io == &info->eti_io);
 
        queue = &info->eti_queue;
-       cl_page_list_add(&queue->c2_qout, page);
+
+       for (i = 0; i < pagevec_count(pvec); i++) {
+               struct page *vmpage = pvec->pages[i];
+               struct cl_page *page = (struct cl_page *)vmpage->private;
+
+               cl_page_list_add(&queue->c2_qout, page);
+       }
 }
 
 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
                              struct page **pages, int npages, int async)
 {
-        struct lu_env           *env;
-        struct echo_thread_info *info;
-        struct cl_object        *obj = echo_obj2cl(eco);
-        struct echo_device      *ed  = eco->eo_dev;
-        struct cl_2queue        *queue;
-        struct cl_io            *io;
-        struct cl_page          *clp;
-        struct lustre_handle    lh = { 0 };
-        int page_size = cl_page_size(obj);
-        int refcheck;
-        int rc;
-        int i;
-        ENTRY;
+       struct lu_env           *env;
+       struct echo_thread_info *info;
+       struct cl_object        *obj = echo_obj2cl(eco);
+       struct echo_device      *ed  = eco->eo_dev;
+       struct cl_2queue        *queue;
+       struct cl_io            *io;
+       struct cl_page          *clp;
+       struct lustre_handle    lh = { 0 };
+       int page_size = cl_page_size(obj);
+       int rc;
+       int i;
+       __u16 refcheck;
 
+       ENTRY;
        LASSERT((offset & ~PAGE_MASK) == 0);
-        LASSERT(ed->ed_next != NULL);
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
+       LASSERT(ed->ed_next != NULL);
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
 
-        info    = echo_env_info(env);
-        io      = &info->eti_io;
-        queue   = &info->eti_queue;
+       info    = echo_env_info(env);
+       io      = &info->eti_io;
+       queue   = &info->eti_queue;
 
-        cl_2queue_init(queue);
+       cl_2queue_init(queue);
 
        io->ci_ignore_layout = 1;
-        rc = cl_io_init(env, io, CIT_MISC, obj);
-        if (rc < 0)
-                GOTO(out, rc);
-        LASSERT(rc == 0);
-
-
-        rc = cl_echo_enqueue0(env, eco, offset,
-                             offset + npages * PAGE_CACHE_SIZE - 1,
-                              rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
-                              CEF_NEVER);
-        if (rc < 0)
-                GOTO(error_lock, rc);
-
-        for (i = 0; i < npages; i++) {
-                LASSERT(pages[i]);
-                clp = cl_page_find(env, obj, cl_index(obj, offset),
-                                   pages[i], CPT_TRANSIENT);
-                if (IS_ERR(clp)) {
-                        rc = PTR_ERR(clp);
-                        break;
-                }
-                LASSERT(clp->cp_type == CPT_TRANSIENT);
-
-                rc = cl_page_own(env, io, clp);
-                if (rc) {
-                        LASSERT(clp->cp_state == CPS_FREEING);
-                        cl_page_put(env, clp);
-                        break;
-                }
-
-                cl_2queue_add(queue, clp);
-
-                /* drop the reference count for cl_page_find, so that the page
-                 * will be freed in cl_2queue_fini. */
-                cl_page_put(env, clp);
-                cl_page_clip(env, clp, 0, page_size);
-
-                offset += page_size;
-        }
-
-        if (rc == 0) {
-                enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
-
-                async = async && (typ == CRT_WRITE);
-                if (async)
+       rc = cl_io_init(env, io, CIT_MISC, obj);
+       if (rc < 0)
+               GOTO(out, rc);
+       LASSERT(rc == 0);
+
+       rc = cl_echo_enqueue0(env, eco, offset,
+                             offset + npages * PAGE_SIZE - 1,
+                             rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
+                             CEF_NEVER);
+       if (rc < 0)
+               GOTO(error_lock, rc);
+
+       for (i = 0; i < npages; i++) {
+               LASSERT(pages[i]);
+               clp = cl_page_find(env, obj, cl_index(obj, offset),
+                                  pages[i], CPT_TRANSIENT);
+               if (IS_ERR(clp)) {
+                       rc = PTR_ERR(clp);
+                       break;
+               }
+               LASSERT(clp->cp_type == CPT_TRANSIENT);
+
+               rc = cl_page_own(env, io, clp);
+               if (rc) {
+                       LASSERT(clp->cp_state == CPS_FREEING);
+                       cl_page_put(env, clp);
+                       break;
+               }
+
+               cl_2queue_add(queue, clp);
+
+               /*
+                * drop the reference count for cl_page_find, so that the page
+                * will be freed in cl_2queue_fini.
+                */
+               cl_page_put(env, clp);
+               cl_page_clip(env, clp, 0, page_size);
+
+               offset += page_size;
+       }
+
+       if (rc == 0) {
+               enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
+
+               async = async && (typ == CRT_WRITE);
+               if (async)
                        rc = cl_io_commit_async(env, io, &queue->c2_qin,
                                                0, PAGE_SIZE,
                                                echo_commit_callback);
                else
                        rc = cl_io_submit_sync(env, io, typ, queue, 0);
-                CDEBUG(D_INFO, "echo_client %s write returns %d\n",
-                       async ? "async" : "sync", rc);
-        }
+               CDEBUG(D_INFO, "echo_client %s write returns %d\n",
+                      async ? "async" : "sync", rc);
+       }
 
-        cl_echo_cancel0(env, ed, lh.cookie);
-        EXIT;
+       cl_echo_cancel0(env, ed, lh.cookie);
+       EXIT;
 error_lock:
-        cl_2queue_discard(env, io, queue);
-        cl_2queue_disown(env, io, queue);
-        cl_2queue_fini(env, queue);
-        cl_io_fini(env, io);
+       cl_2queue_discard(env, io, queue);
+       cl_2queue_disown(env, io, queue);
+       cl_2queue_fini(env, queue);
+       cl_io_fini(env, io);
 out:
-        cl_env_put(env, &refcheck);
-        return rc;
+       cl_env_put(env, &refcheck);
+       return rc;
 }
 /** @} echo_exports */
 
-
 static u64 last_object_id;
 
 #ifdef HAVE_SERVER_SUPPORT
 static inline void echo_md_build_name(struct lu_name *lname, char *name,
                                      __u64 id)
 {
-       sprintf(name, LPU64, id);
+       snprintf(name, ETI_NAME_LEN, "%llu", id);
        lname->ln_name = name;
        lname->ln_namelen = strlen(name);
 }
@@ -1396,14 +1442,19 @@ static inline void echo_md_build_name(struct lu_name *lname, char *name,
 static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
                            struct md_attr *ma)
 {
-       struct echo_thread_info *info = echo_env_info(env);
-       int                      rc;
+       struct echo_thread_info *info = echo_env_info(env);
+       int rc;
 
        ENTRY;
 
        LASSERT(ma->ma_lmm_size > 0);
 
-       rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
+       LASSERT(ma->ma_need & (MA_LOV | MA_LMV));
+       if (ma->ma_need & MA_LOV)
+               rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
+       else
+               rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LMV);
+
        if (rc < 0)
                RETURN(rc);
 
@@ -1421,7 +1472,7 @@ static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
                }
 
                OBD_ALLOC_LARGE(info->eti_big_lmm, size);
-               if (info->eti_big_lmm == NULL)
+               if (!info->eti_big_lmm)
                        RETURN(-ENOMEM);
                info->eti_big_lmmsize = size;
        }
@@ -1429,11 +1480,18 @@ static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
 
        info->eti_buf.lb_buf = info->eti_big_lmm;
        info->eti_buf.lb_len = info->eti_big_lmmsize;
-       rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
+       if (ma->ma_need & MA_LOV)
+               rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
+       else
+               rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LMV);
        if (rc < 0)
                RETURN(rc);
 
-       ma->ma_valid |= MA_LOV;
+       if (ma->ma_need & MA_LOV)
+               ma->ma_valid |= MA_LOV;
+       else
+               ma->ma_valid |= MA_LMV;
+
        ma->ma_lmm = info->eti_big_lmm;
        ma->ma_lmm_size = rc;
 
@@ -1446,46 +1504,62 @@ static int echo_attr_get_complex(const struct lu_env *env,
 {
        struct echo_thread_info *info = echo_env_info(env);
        struct lu_buf           *buf = &info->eti_buf;
-       umode_t          mode = lu_object_attr(&next->mo_lu);
-       int                      need = ma->ma_need;
+       umode_t                  mode = lu_object_attr(&next->mo_lu);
        int                      rc = 0, rc2;
 
        ENTRY;
 
        ma->ma_valid = 0;
 
-       if (need & MA_INODE) {
-               ma->ma_need = MA_INODE;
+       if (ma->ma_need & MA_INODE) {
                rc = mo_attr_get(env, next, ma);
                if (rc)
                        GOTO(out, rc);
                ma->ma_valid |= MA_INODE;
        }
 
-       if (need & MA_LOV) {
-               if (S_ISREG(mode) || S_ISDIR(mode)) {
-                       LASSERT(ma->ma_lmm_size > 0);
-                       buf->lb_buf = ma->ma_lmm;
-                       buf->lb_len = ma->ma_lmm_size;
-                       rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
-                       if (rc2 > 0) {
-                               ma->ma_lmm_size = rc2;
-                               ma->ma_valid |= MA_LOV;
-                       } else if (rc2 == -ENODATA) {
-                               /* no LOV EA */
-                               ma->ma_lmm_size = 0;
-                       } else if (rc2 == -ERANGE) {
-                               rc2 = echo_big_lmm_get(env, next, ma);
-                               if (rc2 < 0)
-                                       GOTO(out, rc = rc2);
-                       } else {
+       if ((ma->ma_need & MA_LOV) && (S_ISREG(mode) || S_ISDIR(mode))) {
+               LASSERT(ma->ma_lmm_size > 0);
+               buf->lb_buf = ma->ma_lmm;
+               buf->lb_len = ma->ma_lmm_size;
+               rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
+               if (rc2 > 0) {
+                       ma->ma_lmm_size = rc2;
+                       ma->ma_valid |= MA_LOV;
+               } else if (rc2 == -ENODATA) {
+                       /* no LOV EA */
+                       ma->ma_lmm_size = 0;
+               } else if (rc2 == -ERANGE) {
+                       rc2 = echo_big_lmm_get(env, next, ma);
+                       if (rc2 < 0)
                                GOTO(out, rc = rc2);
-                       }
+               } else {
+                       GOTO(out, rc = rc2);
                }
        }
 
-#ifdef CONFIG_FS_POSIX_ACL
-       if (need & MA_ACL_DEF && S_ISDIR(mode)) {
+       if ((ma->ma_need & MA_LMV) && S_ISDIR(mode)) {
+               LASSERT(ma->ma_lmm_size > 0);
+               buf->lb_buf = ma->ma_lmm;
+               buf->lb_len = ma->ma_lmm_size;
+               rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV);
+               if (rc2 > 0) {
+                       ma->ma_lmm_size = rc2;
+                       ma->ma_valid |= MA_LMV;
+               } else if (rc2 == -ENODATA) {
+                       /* no LMV EA */
+                       ma->ma_lmm_size = 0;
+               } else if (rc2 == -ERANGE) {
+                       rc2 = echo_big_lmm_get(env, next, ma);
+                       if (rc2 < 0)
+                               GOTO(out, rc = rc2);
+               } else {
+                       GOTO(out, rc = rc2);
+               }
+       }
+
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
+       if ((ma->ma_need & MA_ACL_DEF) && S_ISDIR(mode)) {
                buf->lb_buf = ma->ma_acl;
                buf->lb_len = ma->ma_acl_size;
                rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
@@ -1501,8 +1575,7 @@ static int echo_attr_get_complex(const struct lu_env *env,
        }
 #endif
 out:
-       ma->ma_need = need;
-       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
+       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
               rc, ma->ma_valid, ma->ma_lmm);
        RETURN(rc);
 }
@@ -1530,36 +1603,36 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
 
        ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
                                     fid, &conf);
-        if (IS_ERR(ec_child)) {
-                CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
-                        PTR_ERR(ec_child));
+       if (IS_ERR(ec_child)) {
+               CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
+                       PTR_ERR(ec_child));
                RETURN(PTR_ERR(ec_child));
-        }
+       }
 
-        child = lu_object_locate(ec_child->lo_header, ld->ld_type);
-        if (child == NULL) {
-                CERROR("Can not locate the child "DFID"\n", PFID(fid));
-                GOTO(out_put, rc = -EINVAL);
-        }
+       child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+       if (!child) {
+               CERROR("Can not locate the child "DFID"\n", PFID(fid));
+               GOTO(out_put, rc = -EINVAL);
+       }
 
-        CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
-               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
+       CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
+              PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
        /*
         * Do not perform lookup sanity check. We know that name does not exist.
         */
        spec->sp_cr_lookup = 0;
-        rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
-        if (rc) {
-                CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
-                GOTO(out_put, rc);
-        }
-        CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
-               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
+       rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
+       if (rc) {
+               CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
+               GOTO(out_put, rc);
+       }
+       CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
+              PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
        EXIT;
 out_put:
-        lu_object_put(env, ec_child);
-        return rc;
+       lu_object_put(env, ec_child);
+       return rc;
 }
 
 static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
@@ -1579,313 +1652,448 @@ static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
        return 0;
 }
 
+static int
+echo_md_dir_stripe_choose(const struct lu_env *env, struct echo_device *ed,
+                         struct lu_object *obj, const char *name,
+                         unsigned int namelen, __u64 id,
+                         struct lu_object **new_parent)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       struct md_attr          *ma = &info->eti_ma;
+       struct lmv_mds_md_v1    *lmv;
+       struct lu_device        *ld = ed->ed_next;
+       unsigned int            idx;
+       struct lu_name          tmp_ln_name;
+       struct lu_fid           stripe_fid;
+       struct lu_object        *stripe_obj;
+       int                     rc;
+
+       LASSERT(obj != NULL);
+       LASSERT(S_ISDIR(obj->lo_header->loh_attr));
+
+       memset(ma, 0, sizeof(*ma));
+       echo_set_lmm_size(env, ld, ma);
+       ma->ma_need = MA_LMV;
+       rc = echo_attr_get_complex(env, lu2md(obj), ma);
+       if (rc) {
+               CERROR("Can not getattr child "DFID": rc = %d\n",
+                       PFID(lu_object_fid(obj)), rc);
+               return rc;
+       }
+
+       if (!(ma->ma_valid & MA_LMV)) {
+               *new_parent = obj;
+               return 0;
+       }
+
+       lmv = (struct lmv_mds_md_v1 *)ma->ma_lmm;
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) {
+               rc = -EINVAL;
+               CERROR("Invalid mds md magic %x "DFID": rc = %d\n",
+                      le32_to_cpu(lmv->lmv_magic), PFID(lu_object_fid(obj)),
+                      rc);
+               return rc;
+       }
+
+       if (name) {
+               tmp_ln_name.ln_name = name;
+               tmp_ln_name.ln_namelen = namelen;
+       } else {
+               LASSERT(id != -1);
+               echo_md_build_name(&tmp_ln_name, info->eti_name, id);
+       }
+
+       idx = lmv_name_to_stripe_index(LMV_HASH_TYPE_FNV_1A_64,
+                               le32_to_cpu(lmv->lmv_stripe_count),
+                               tmp_ln_name.ln_name, tmp_ln_name.ln_namelen);
+
+       LASSERT(idx < le32_to_cpu(lmv->lmv_stripe_count));
+       fid_le_to_cpu(&stripe_fid, &lmv->lmv_stripe_fids[idx]);
+
+       stripe_obj = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, &stripe_fid,
+                                      NULL);
+       if (IS_ERR(stripe_obj)) {
+               rc = PTR_ERR(stripe_obj);
+               CERROR("Can not find the parent "DFID": rc = %d\n",
+                      PFID(&stripe_fid), rc);
+               return rc;
+       }
+
+       *new_parent = lu_object_locate(stripe_obj->lo_header, ld->ld_type);
+       if (!*new_parent) {
+               lu_object_put(env, stripe_obj);
+               RETURN(-ENXIO);
+       }
+
+       return rc;
+}
+
 static int echo_create_md_object(const struct lu_env *env,
-                                 struct echo_device *ed,
-                                 struct lu_object *ec_parent,
-                                 struct lu_fid *fid,
-                                 char *name, int namelen,
-                                 __u64 id, __u32 mode, int count,
-                                 int stripe_count, int stripe_offset)
-{
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        struct md_op_spec       *spec = &info->eti_spec;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+                                struct echo_device *ed,
+                                struct lu_object *ec_parent,
+                                struct lu_fid *fid,
+                                char *name, int namelen,
+                                 __u64 id, __u32 mode, int count,
+                                int stripe_count, int stripe_offset)
+{
+       struct lu_object *parent;
+       struct lu_object *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name *lname = &info->eti_lname;
+       struct md_op_spec *spec = &info->eti_spec;
+       struct md_attr *ma = &info->eti_ma;
+       struct lu_device *ld = ed->ed_next;
+       int rc = 0;
+       int i;
 
        ENTRY;
 
-       if (ec_parent == NULL)
+       if (!ec_parent)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-       if (parent == NULL)
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
                RETURN(-ENXIO);
 
-        memset(ma, 0, sizeof(*ma));
-        memset(spec, 0, sizeof(*spec));
-        if (stripe_count != 0) {
-                spec->sp_cr_flags |= FMODE_WRITE;
-               echo_set_lmm_size(env, ld, ma);
-                if (stripe_count != -1) {
-                        struct lov_user_md_v3 *lum = &info->eti_lum;
-
-                        lum->lmm_magic = LOV_USER_MAGIC_V3;
-                        lum->lmm_stripe_count = stripe_count;
-                        lum->lmm_stripe_offset = stripe_offset;
-                        lum->lmm_pattern = 0;
-                        spec->u.sp_ea.eadata = lum;
-                       spec->u.sp_ea.eadatalen = sizeof(*lum);
-                        spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
-                }
-        }
-
-        ma->ma_attr.la_mode = mode;
-       ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
-        ma->ma_attr.la_ctime = cfs_time_current_64();
-
-        if (name != NULL) {
-                lname->ln_name = name;
-                lname->ln_namelen = namelen;
-                /* If name is specified, only create one object by name */
-                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
-                                             spec, ma);
+       rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
+                                      id, &new_parent);
+       if (rc != 0)
                RETURN(rc);
-        }
 
-        /* Create multiple object sequenced by id */
-        for (i = 0; i < count; i++) {
-                char *tmp_name = info->eti_name;
+       LASSERT(new_parent != NULL);
+       memset(ma, 0, sizeof(*ma));
+       memset(spec, 0, sizeof(*spec));
+       echo_set_lmm_size(env, ld, ma);
+       if (stripe_count != 0) {
+               spec->sp_cr_flags |= MDS_FMODE_WRITE;
+               if (stripe_count != -1) {
+                       if (S_ISDIR(mode)) {
+                               struct lmv_user_md *lmu;
+
+                               lmu = (struct lmv_user_md *)&info->eti_lum;
+                               lmu->lum_magic = LMV_USER_MAGIC;
+                               lmu->lum_stripe_offset = stripe_offset;
+                               lmu->lum_stripe_count = stripe_count;
+                               lmu->lum_hash_type = LMV_HASH_TYPE_FNV_1A_64;
+                               spec->u.sp_ea.eadata = lmu;
+                               spec->u.sp_ea.eadatalen = sizeof(*lmu);
+                       } else {
+                               struct lov_user_md_v3 *lum = &info->eti_lum;
+
+                               lum->lmm_magic = LOV_USER_MAGIC_V3;
+                               lum->lmm_stripe_count = stripe_count;
+                               lum->lmm_stripe_offset = stripe_offset;
+                               lum->lmm_pattern = LOV_PATTERN_NONE;
+                               spec->u.sp_ea.eadata = lum;
+                               spec->u.sp_ea.eadatalen = sizeof(*lum);
+                       }
+                       spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
+               }
+       }
+
+       ma->ma_attr.la_mode = mode;
+       ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
+       ma->ma_attr.la_ctime = ktime_get_real_seconds();
+
+       if (name) {
+               lname->ln_name = name;
+               lname->ln_namelen = namelen;
+               /* If name is specified, only create one object by name */
+               rc = echo_md_create_internal(env, ed, lu2md(new_parent), fid,
+                                            lname, spec, ma);
+               GOTO(out_put, rc);
+       }
+
+       /* Create multiple object sequenced by id */
+       for (i = 0; i < count; i++) {
+               char *tmp_name = info->eti_name;
 
-                echo_md_build_name(lname, tmp_name, id);
+               echo_md_build_name(lname, tmp_name, id);
+
+               rc = echo_md_create_internal(env, ed, lu2md(new_parent),
+                                            fid, lname, spec, ma);
+               if (rc) {
+                       CERROR("Can not create child %s: rc = %d\n", tmp_name,
+                               rc);
+                       break;
+               }
+               id++;
+               fid->f_oid++;
+       }
 
-                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
-                                             spec, ma);
-                if (rc) {
-                        CERROR("Can not create child %s: rc = %d\n", tmp_name,
-                                rc);
-                        break;
-                }
-                id++;
-                fid->f_oid++;
-        }
+out_put:
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }
 
 static struct lu_object *echo_md_lookup(const struct lu_env *env,
-                                        struct echo_device *ed,
-                                        struct md_object *parent,
-                                        struct lu_name *lname)
-{
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_fid           *fid = &info->eti_fid;
-        struct lu_object        *child;
-        int    rc;
-        ENTRY;
-
-        CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
-               PFID(fid), parent);
-        rc = mdo_lookup(env, parent, lname, fid, NULL);
-        if (rc) {
-                CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
-                RETURN(ERR_PTR(rc));
-        }
-
-       /* In the function below, .hs_keycmp resolves to
-        * lu_obj_hop_keycmp() */
+                                       struct echo_device *ed,
+                                       struct md_object *parent,
+                                       struct lu_name *lname)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_fid *fid = &info->eti_fid;
+       struct lu_object *child;
+       int rc;
+
+       ENTRY;
+       CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
+              PFID(fid), parent);
+
+       rc = mdo_lookup(env, parent, lname, fid, NULL);
+       if (rc) {
+               CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
+               RETURN(ERR_PTR(rc));
+       }
+
+       /*
+        * In the function below, .hs_keycmp resolves to
+        * lu_obj_hop_keycmp()
+        */
        /* coverity[overrun-buffer-val] */
-        child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
+       child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
 
-        RETURN(child);
+       RETURN(child);
 }
 
 static int echo_setattr_object(const struct lu_env *env,
-                               struct echo_device *ed,
-                               struct lu_object *ec_parent,
-                               __u64 id, int count)
-{
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct lu_device        *ld = ed->ed_next;
-        struct lu_buf           *buf = &info->eti_buf;
-        int                      rc = 0;
-        int                      i;
+                              struct echo_device *ed,
+                              struct lu_object *ec_parent,
+                              __u64 id, int count)
+{
+       struct lu_object *parent;
+       struct lu_object *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name *lname = &info->eti_lname;
+       char *name = info->eti_name;
+       struct lu_device *ld = ed->ed_next;
+       struct lu_buf *buf = &info->eti_buf;
+       int rc = 0;
+       int i;
 
        ENTRY;
 
-       if (ec_parent == NULL)
+       if (!ec_parent)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-       if (parent == NULL)
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
                RETURN(-ENXIO);
 
-        for (i = 0; i < count; i++) {
-                struct lu_object *ec_child, *child;
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
+       for (i = 0; i < count; i++) {
+               struct lu_object *ec_child, *child;
 
-                echo_md_build_name(lname, name, id);
+               echo_md_build_name(lname, name, id);
 
-                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
-                if (IS_ERR(ec_child)) {
-                        CERROR("Can't find child %s: rc = %ld\n",
-                                lname->ln_name, PTR_ERR(ec_child));
-                        RETURN(PTR_ERR(ec_child));
-                }
+               ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
+               if (IS_ERR(ec_child)) {
+                       rc = PTR_ERR(ec_child);
+                       CERROR("Can't find child %s: rc = %d\n",
+                               lname->ln_name, rc);
+                       break;
+               }
 
-                child = lu_object_locate(ec_child->lo_header, ld->ld_type);
-                if (child == NULL) {
-                        CERROR("Can not locate the child %s\n", lname->ln_name);
-                        lu_object_put(env, ec_child);
-                        rc = -EINVAL;
-                        break;
-                }
+               child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+               if (!child) {
+                       CERROR("Can not locate the child %s\n", lname->ln_name);
+                       lu_object_put(env, ec_child);
+                       rc = -EINVAL;
+                       break;
+               }
 
-                CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
-                       PFID(lu_object_fid(child)));
+               CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
+                      PFID(lu_object_fid(child)));
 
                buf->lb_buf = info->eti_xattr_buf;
                buf->lb_len = sizeof(info->eti_xattr_buf);
 
-                sprintf(name, "%s.test1", XATTR_USER_PREFIX);
-                rc = mo_xattr_set(env, lu2md(child), buf, name,
-                                  LU_XATTR_CREATE);
+               sprintf(name, "%s.test1", XATTR_USER_PREFIX);
+               rc = mo_xattr_set(env, lu2md(child), buf, name,
+                                 LU_XATTR_CREATE);
                if (rc < 0) {
-                        CERROR("Can not setattr child "DFID": rc = %d\n",
-                                PFID(lu_object_fid(child)), rc);
-                        lu_object_put(env, ec_child);
-                        break;
-                }
-                CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
-                       PFID(lu_object_fid(child)));
-                id++;
-                lu_object_put(env, ec_child);
-        }
+                       CERROR("Can not setattr child "DFID": rc = %d\n",
+                               PFID(lu_object_fid(child)), rc);
+                       lu_object_put(env, ec_child);
+                       break;
+               }
+               CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
+                      PFID(lu_object_fid(child)));
+               id++;
+               lu_object_put(env, ec_child);
+       }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
        RETURN(rc);
 }
 
 static int echo_getattr_object(const struct lu_env *env,
-                               struct echo_device *ed,
-                               struct lu_object *ec_parent,
-                               __u64 id, int count)
-{
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
+                              struct echo_device *ed,
+                              struct lu_object *ec_parent,
+                              __u64 id, int count)
+{
+       struct lu_object *parent;
+       struct lu_object *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name *lname = &info->eti_lname;
+       char *name = info->eti_name;
+       struct md_attr *ma = &info->eti_ma;
+       struct lu_device *ld = ed->ed_next;
+       int rc = 0;
+       int i;
 
        ENTRY;
 
-       if (ec_parent == NULL)
+       if (!ec_parent)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-       if (parent == NULL)
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
                RETURN(-ENXIO);
 
-        memset(ma, 0, sizeof(*ma));
-        ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
-        ma->ma_acl = info->eti_xattr_buf;
-        ma->ma_acl_size = sizeof(info->eti_xattr_buf);
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
 
-        for (i = 0; i < count; i++) {
-                struct lu_object *ec_child, *child;
+       memset(ma, 0, sizeof(*ma));
+       ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
+       ma->ma_acl = info->eti_xattr_buf;
+       ma->ma_acl_size = sizeof(info->eti_xattr_buf);
 
-                ma->ma_valid = 0;
-                echo_md_build_name(lname, name, id);
+       for (i = 0; i < count; i++) {
+               struct lu_object *ec_child, *child;
+
+               ma->ma_valid = 0;
+               echo_md_build_name(lname, name, id);
                echo_set_lmm_size(env, ld, ma);
 
-                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
-                if (IS_ERR(ec_child)) {
-                        CERROR("Can't find child %s: rc = %ld\n",
-                               lname->ln_name, PTR_ERR(ec_child));
-                        RETURN(PTR_ERR(ec_child));
-                }
-
-                child = lu_object_locate(ec_child->lo_header, ld->ld_type);
-                if (child == NULL) {
-                        CERROR("Can not locate the child %s\n", lname->ln_name);
-                        lu_object_put(env, ec_child);
+               ec_child = echo_md_lookup(env, ed, lu2md(new_parent), lname);
+               if (IS_ERR(ec_child)) {
+                       CERROR("Can't find child %s: rc = %ld\n",
+                              lname->ln_name, PTR_ERR(ec_child));
+                       RETURN(PTR_ERR(ec_child));
+               }
+
+               child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+               if (!child) {
+                       CERROR("Can not locate the child %s\n", lname->ln_name);
+                       lu_object_put(env, ec_child);
                        RETURN(-EINVAL);
-                }
+               }
 
-                CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
-                       PFID(lu_object_fid(child)));
+               CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
+                      PFID(lu_object_fid(child)));
                rc = echo_attr_get_complex(env, lu2md(child), ma);
-                if (rc) {
-                        CERROR("Can not getattr child "DFID": rc = %d\n",
-                                PFID(lu_object_fid(child)), rc);
-                        lu_object_put(env, ec_child);
-                        break;
-                }
-                CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
-                       PFID(lu_object_fid(child)));
-                id++;
-                lu_object_put(env, ec_child);
-        }
+               if (rc) {
+                       CERROR("Can not getattr child "DFID": rc = %d\n",
+                               PFID(lu_object_fid(child)), rc);
+                       lu_object_put(env, ec_child);
+                       break;
+               }
+               CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
+                      PFID(lu_object_fid(child)));
+               id++;
+               lu_object_put(env, ec_child);
+       }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }
 
 static int echo_lookup_object(const struct lu_env *env,
-                              struct echo_device *ed,
-                              struct lu_object *ec_parent,
-                              __u64 id, int count)
-{
-        struct lu_object        *parent;
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        char                    *name = info->eti_name;
-        struct lu_fid           *fid = &info->eti_fid;
-        struct lu_device        *ld = ed->ed_next;
-        int                      rc = 0;
-        int                      i;
-
-       if (ec_parent == NULL)
+                             struct echo_device *ed,
+                             struct lu_object *ec_parent,
+                             __u64 id, int count)
+{
+       struct lu_object *parent;
+       struct lu_object *new_parent;
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name *lname = &info->eti_lname;
+       char *name = info->eti_name;
+       struct lu_fid *fid = &info->eti_fid;
+       struct lu_device *ld = ed->ed_next;
+       int rc = 0;
+       int i;
+
+       if (!ec_parent)
                return -1;
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-       if (parent == NULL)
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
                return -ENXIO;
 
-        /*prepare the requests*/
-        for (i = 0; i < count; i++) {
-                echo_md_build_name(lname, name, id);
+       rc = echo_md_dir_stripe_choose(env, ed, parent, NULL, 0, id,
+                                      &new_parent);
+       if (rc != 0)
+               RETURN(rc);
+
+       /*prepare the requests*/
+       for (i = 0; i < count; i++) {
+               echo_md_build_name(lname, name, id);
 
-                CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
-                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+               CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
+                      PFID(lu_object_fid(new_parent)), lname->ln_name,
+                      new_parent);
+
+               rc = mdo_lookup(env, lu2md(new_parent), lname, fid, NULL);
+               if (rc) {
+                       CERROR("Can not lookup child %s: rc = %d\n", name, rc);
+                       break;
+               }
 
-                rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
-                if (rc) {
-                        CERROR("Can not lookup child %s: rc = %d\n", name, rc);
-                        break;
-                }
-                CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
-                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+               CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
+                      PFID(lu_object_fid(new_parent)), lname->ln_name,
+                      new_parent);
 
-                id++;
-        }
-        return rc;
+               id++;
+       }
+
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
+
+       return rc;
 }
 
 static int echo_md_destroy_internal(const struct lu_env *env,
-                                    struct echo_device *ed,
-                                    struct md_object *parent,
-                                    struct lu_name *lname,
-                                    struct md_attr *ma)
+                                   struct echo_device *ed,
+                                   struct md_object *parent,
+                                   struct lu_name *lname,
+                                   struct md_attr *ma)
 {
-        struct lu_device   *ld = ed->ed_next;
-        struct lu_object   *ec_child;
-        struct lu_object   *child;
-        int                 rc;
+       struct lu_device   *ld = ed->ed_next;
+       struct lu_object   *ec_child;
+       struct lu_object   *child;
+       int                 rc;
 
        ENTRY;
 
-        ec_child = echo_md_lookup(env, ed, parent, lname);
-        if (IS_ERR(ec_child)) {
-                CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
-                        PTR_ERR(ec_child));
-                RETURN(PTR_ERR(ec_child));
-        }
+       ec_child = echo_md_lookup(env, ed, parent, lname);
+       if (IS_ERR(ec_child)) {
+               CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
+                       PTR_ERR(ec_child));
+               RETURN(PTR_ERR(ec_child));
+       }
 
-        child = lu_object_locate(ec_child->lo_header, ld->ld_type);
-        if (child == NULL) {
-                CERROR("Can not locate the child %s\n", lname->ln_name);
-                GOTO(out_put, rc = -EINVAL);
-        }
+       child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+       if (!child) {
+               CERROR("Can not locate the child %s\n", lname->ln_name);
+               GOTO(out_put, rc = -EINVAL);
+       }
 
        if (lu_object_remote(child)) {
                CERROR("Can not destroy remote object %s: rc = %d\n",
                       lname->ln_name, -EPERM);
                GOTO(out_put, rc = -EPERM);
        }
-        CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
-               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
+       CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
+              PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
        rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
        if (rc) {
@@ -1893,70 +2101,80 @@ static int echo_md_destroy_internal(const struct lu_env *env,
                        lname->ln_name, rc);
                GOTO(out_put, rc);
        }
-        CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
-               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
+       CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
+              PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 out_put:
-        lu_object_put(env, ec_child);
-        return rc;
+       lu_object_put(env, ec_child);
+       return rc;
 }
 
 static int echo_destroy_object(const struct lu_env *env,
-                               struct echo_device *ed,
-                               struct lu_object *ec_parent,
-                               char *name, int namelen,
-                               __u64 id, __u32 mode,
-                               int count)
-{
-        struct echo_thread_info *info = echo_env_info(env);
-        struct lu_name          *lname = &info->eti_lname;
-        struct md_attr          *ma = &info->eti_ma;
-        struct lu_device        *ld = ed->ed_next;
-        struct lu_object        *parent;
-        int                      rc = 0;
-        int                      i;
-        ENTRY;
-
-        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
-        if (parent == NULL)
-                RETURN(-EINVAL);
-
-        memset(ma, 0, sizeof(*ma));
-        ma->ma_attr.la_mode = mode;
-        ma->ma_attr.la_valid = LA_CTIME;
-        ma->ma_attr.la_ctime = cfs_time_current_64();
-        ma->ma_need = MA_INODE;
-        ma->ma_valid = 0;
-
-        if (name != NULL) {
-                lname->ln_name = name;
-                lname->ln_namelen = namelen;
-                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
-                                              ma);
+                              struct echo_device *ed,
+                              struct lu_object *ec_parent,
+                              char *name, int namelen,
+                              __u64 id, __u32 mode,
+                              int count)
+{
+       struct echo_thread_info *info = echo_env_info(env);
+       struct lu_name          *lname = &info->eti_lname;
+       struct md_attr          *ma = &info->eti_ma;
+       struct lu_device        *ld = ed->ed_next;
+       struct lu_object        *parent;
+       struct lu_object        *new_parent;
+       int                      rc = 0;
+       int                      i;
+
+       ENTRY;
+       parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+       if (!parent)
+               RETURN(-EINVAL);
+
+       rc = echo_md_dir_stripe_choose(env, ed, parent, name, namelen,
+                                      id, &new_parent);
+       if (rc != 0)
                RETURN(rc);
-        }
 
-        /*prepare the requests*/
-        for (i = 0; i < count; i++) {
-                char *tmp_name = info->eti_name;
+       memset(ma, 0, sizeof(*ma));
+       ma->ma_attr.la_mode = mode;
+       ma->ma_attr.la_valid = LA_CTIME;
+       ma->ma_attr.la_ctime = ktime_get_real_seconds();
+       ma->ma_need = MA_INODE;
+       ma->ma_valid = 0;
 
-                ma->ma_valid = 0;
-                echo_md_build_name(lname, tmp_name, id);
+       if (name) {
+               lname->ln_name = name;
+               lname->ln_namelen = namelen;
+               rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
+                                             ma);
+               GOTO(out_put, rc);
+       }
+
+       /*prepare the requests*/
+       for (i = 0; i < count; i++) {
+               char *tmp_name = info->eti_name;
+
+               ma->ma_valid = 0;
+               echo_md_build_name(lname, tmp_name, id);
 
-                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
-                                              ma);
-                if (rc) {
-                        CERROR("Can not unlink child %s: rc = %d\n", name, rc);
-                        break;
-                }
-                id++;
-        }
+               rc = echo_md_destroy_internal(env, ed, lu2md(new_parent), lname,
+                                             ma);
+               if (rc) {
+                       CERROR("Can not unlink child %s: rc = %d\n", name, rc);
+                       break;
+               }
+               id++;
+       }
+
+out_put:
+       if (new_parent != parent)
+               lu_object_put(env, new_parent);
 
        RETURN(rc);
 }
 
 static struct lu_object *echo_resolve_path(const struct lu_env *env,
-                                           struct echo_device *ed, char *path,
-                                           int path_len)
+                                          struct echo_device *ed, char *path,
+                                          int path_len)
 {
        struct lu_device        *ld = ed->ed_next;
        struct echo_thread_info *info = echo_env_info(env);
@@ -1965,59 +2183,61 @@ static struct lu_object *echo_resolve_path(const struct lu_env *env,
        struct lu_object        *parent = NULL;
        struct lu_object        *child = NULL;
        int                      rc = 0;
-       ENTRY;
 
+       ENTRY;
        *fid = ed->ed_root_fid;
 
-       /* In the function below, .hs_keycmp resolves to
-        * lu_obj_hop_keycmp() */
+       /*
+        * In the function below, .hs_keycmp resolves to
+        * lu_obj_hop_keycmp()
+        */
        /* coverity[overrun-buffer-val] */
-        parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
-        if (IS_ERR(parent)) {
-                CERROR("Can not find the parent "DFID": rc = %ld\n",
-                        PFID(fid), PTR_ERR(parent));
-                RETURN(parent);
-        }
-
-        while (1) {
-                struct lu_object *ld_parent;
-                char *e;
-
-                e = strsep(&path, "/");
-                if (e == NULL)
-                        break;
-
-                if (e[0] == 0) {
-                        if (!path || path[0] == '\0')
-                                break;
-                        continue;
-                }
-
-                lname->ln_name = e;
-                lname->ln_namelen = strlen(e);
-
-                ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
-                if (ld_parent == NULL) {
-                        lu_object_put(env, parent);
-                        rc = -EINVAL;
-                        break;
-                }
-
-                child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
-                lu_object_put(env, parent);
-                if (IS_ERR(child)) {
-                        rc = (int)PTR_ERR(child);
-                        CERROR("lookup %s under parent "DFID": rc = %d\n",
-                                lname->ln_name, PFID(lu_object_fid(ld_parent)),
-                                rc);
-                        break;
-                }
-                parent = child;
-        }
-        if (rc)
-                RETURN(ERR_PTR(rc));
-
-        RETURN(parent);
+       parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
+       if (IS_ERR(parent)) {
+               CERROR("Can not find the parent "DFID": rc = %ld\n",
+                       PFID(fid), PTR_ERR(parent));
+               RETURN(parent);
+       }
+
+       while (1) {
+               struct lu_object *ld_parent;
+               char *e;
+
+               e = strsep(&path, "/");
+               if (!e)
+                       break;
+
+               if (e[0] == 0) {
+                       if (!path || path[0] == '\0')
+                               break;
+                       continue;
+               }
+
+               lname->ln_name = e;
+               lname->ln_namelen = strlen(e);
+
+               ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
+               if (!ld_parent) {
+                       lu_object_put(env, parent);
+                       rc = -EINVAL;
+                       break;
+               }
+
+               child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
+               lu_object_put(env, parent);
+               if (IS_ERR(child)) {
+                       rc = (int)PTR_ERR(child);
+                       CERROR("lookup %s under parent "DFID": rc = %d\n",
+                               lname->ln_name, PFID(lu_object_fid(ld_parent)),
+                               rc);
+                       break;
+               }
+               parent = child;
+       }
+       if (rc)
+               RETURN(ERR_PTR(rc));
+
+       RETURN(parent);
 }
 
 static void echo_ucred_init(struct lu_env *env)
@@ -2048,40 +2268,39 @@ static void echo_ucred_init(struct lu_env *env)
 static void echo_ucred_fini(struct lu_env *env)
 {
        struct lu_ucred *ucred = lu_ucred(env);
+
        ucred->uc_valid = UCRED_INIT;
 }
 
-#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
-#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
 static int echo_md_handler(struct echo_device *ed, int command,
                           char *path, int path_len, __u64 id, int count,
                           struct obd_ioctl_data *data)
 {
        struct echo_thread_info *info;
-        struct lu_device      *ld = ed->ed_next;
-        struct lu_env         *env;
-        int                    refcheck;
-        struct lu_object      *parent;
-        char                  *name = NULL;
-        int                    namelen = data->ioc_plen2;
-        int                    rc = 0;
-        ENTRY;
-
-        if (ld == NULL) {
-                CERROR("MD echo client is not being initialized properly\n");
-                RETURN(-EINVAL);
-        }
-
-        if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
-                CERROR("Only support MDD layer right now!\n");
-                RETURN(-EINVAL);
-        }
-
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
-
-        rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
+       struct lu_device *ld = ed->ed_next;
+       struct lu_env *env;
+       __u16 refcheck;
+       struct lu_object *parent;
+       char *name = NULL;
+       int namelen = data->ioc_plen2;
+       int rc = 0;
+
+       ENTRY;
+       if (!ld) {
+               CERROR("MD echo client is not being initialized properly\n");
+               RETURN(-EINVAL);
+       }
+
+       if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
+               CERROR("Only support MDD layer right now!\n");
+               RETURN(-EINVAL);
+       }
+
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
+       rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_SES_TAG);
        if (rc != 0)
                GOTO(out_env, rc);
 
@@ -2089,85 +2308,87 @@ static int echo_md_handler(struct echo_device *ed, int command,
        info = echo_env_info(env);
        LASSERT(info->eti_big_lmm == NULL);
        OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
-       if (info->eti_big_lmm == NULL)
+       if (!info->eti_big_lmm)
                GOTO(out_env, rc = -ENOMEM);
        info->eti_big_lmmsize = MIN_MD_SIZE;
 
-        parent = echo_resolve_path(env, ed, path, path_len);
-        if (IS_ERR(parent)) {
-                CERROR("Can not resolve the path %s: rc = %ld\n", path,
-                        PTR_ERR(parent));
+       parent = echo_resolve_path(env, ed, path, path_len);
+       if (IS_ERR(parent)) {
+               CERROR("Can not resolve the path %s: rc = %ld\n", path,
+                       PTR_ERR(parent));
                GOTO(out_free, rc = PTR_ERR(parent));
-        }
+       }
 
-        if (namelen > 0) {
-                OBD_ALLOC(name, namelen + 1);
-                if (name == NULL)
+       if (namelen > 0) {
+               OBD_ALLOC(name, namelen + 1);
+               if (!name)
                        GOTO(out_put, rc = -ENOMEM);
                if (copy_from_user(name, data->ioc_pbuf2, namelen))
                        GOTO(out_name, rc = -EFAULT);
-        }
+       }
 
        echo_ucred_init(env);
 
-        switch (command) {
-        case ECHO_MD_CREATE:
-        case ECHO_MD_MKDIR: {
-                struct echo_thread_info *info = echo_env_info(env);
-                __u32 mode = data->ioc_obdo2.o_mode;
-                struct lu_fid *fid = &info->eti_fid;
-                int stripe_count = (int)data->ioc_obdo2.o_misc;
-                int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
+       switch (command) {
+       case ECHO_MD_CREATE:
+       case ECHO_MD_MKDIR: {
+               struct echo_thread_info *info = echo_env_info(env);
+               __u32 mode = data->ioc_obdo2.o_mode;
+               struct lu_fid *fid = &info->eti_fid;
+               int stripe_count = (int)data->ioc_obdo2.o_misc;
+               int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
 
                rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
                if (rc != 0)
                        break;
 
-               /* In the function below, .hs_keycmp resolves to
-                * lu_obj_hop_keycmp() */
+               /*
+                * In the function below, .hs_keycmp resolves to
+                * lu_obj_hop_keycmp()
+                */
                /* coverity[overrun-buffer-val] */
-                rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
-                                           id, mode, count, stripe_count,
-                                           stripe_index);
-                break;
-        }
-        case ECHO_MD_DESTROY:
-        case ECHO_MD_RMDIR: {
-                __u32 mode = data->ioc_obdo2.o_mode;
-
-                rc = echo_destroy_object(env, ed, parent, name, namelen,
-                                         id, mode, count);
-                break;
-        }
-        case ECHO_MD_LOOKUP:
-                rc = echo_lookup_object(env, ed, parent, id, count);
-                break;
-        case ECHO_MD_GETATTR:
-                rc = echo_getattr_object(env, ed, parent, id, count);
-                break;
-        case ECHO_MD_SETATTR:
-                rc = echo_setattr_object(env, ed, parent, id, count);
-                break;
-        default:
-                CERROR("unknown command %d\n", command);
-                rc = -EINVAL;
-                break;
-        }
+               rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
+                                          id, mode, count, stripe_count,
+                                          stripe_index);
+               break;
+       }
+       case ECHO_MD_DESTROY:
+       case ECHO_MD_RMDIR: {
+               __u32 mode = data->ioc_obdo2.o_mode;
+
+               rc = echo_destroy_object(env, ed, parent, name, namelen,
+                                        id, mode, count);
+               break;
+       }
+       case ECHO_MD_LOOKUP:
+               rc = echo_lookup_object(env, ed, parent, id, count);
+               break;
+       case ECHO_MD_GETATTR:
+               rc = echo_getattr_object(env, ed, parent, id, count);
+               break;
+       case ECHO_MD_SETATTR:
+               rc = echo_setattr_object(env, ed, parent, id, count);
+               break;
+       default:
+               CERROR("unknown command %d\n", command);
+               rc = -EINVAL;
+               break;
+       }
        echo_ucred_fini(env);
 
 out_name:
-        if (name != NULL)
-                OBD_FREE(name, namelen + 1);
+       if (name)
+               OBD_FREE(name, namelen + 1);
 out_put:
-        lu_object_put(env, parent);
+       lu_object_put(env, parent);
 out_free:
        LASSERT(info->eti_big_lmm);
        OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
        info->eti_big_lmm = NULL;
        info->eti_big_lmmsize = 0;
 out_env:
-        cl_env_put(env, &refcheck);
-        return rc;
+       cl_env_put(env, &refcheck);
+       return rc;
 }
 #endif /* HAVE_SERVER_SUPPORT */
 
@@ -2178,8 +2399,8 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
        struct echo_client_obd  *ec = ed->ed_ec;
        int created = 0;
        int rc;
-       ENTRY;
 
+       ENTRY;
        if (!(oa->o_valid & OBD_MD_FLID) ||
            !(oa->o_valid & OBD_MD_FLGROUP) ||
            !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
@@ -2187,8 +2408,11 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
                RETURN(-EINVAL);
        }
 
-       if (ostid_id(&oa->o_oi) == 0)
-               ostid_set_id(&oa->o_oi, ++last_object_id);
+       if (ostid_id(&oa->o_oi) == 0) {
+               rc = ostid_set_id(&oa->o_oi, ++last_object_id);
+               if (rc)
+                       GOTO(failed, rc);
+       }
 
        rc = obd_create(env, ec->ec_exp, oa);
        if (rc != 0) {
@@ -2201,12 +2425,12 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
        oa->o_valid |= OBD_MD_FLID;
 
        eco = cl_echo_object_find(ed, &oa->o_oi);
-        if (IS_ERR(eco))
-                GOTO(failed, rc = PTR_ERR(eco));
-        cl_echo_object_put(eco);
+       if (IS_ERR(eco))
+               GOTO(failed, rc = PTR_ERR(eco));
+       cl_echo_object_put(eco);
 
-        CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
-        EXIT;
+       CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
+       EXIT;
 
 failed:
        if (created && rc != 0)
@@ -2223,8 +2447,8 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
 {
        struct echo_object *eco;
        int rc;
-       ENTRY;
 
+       ENTRY;
        if (!(oa->o_valid & OBD_MD_FLID) ||
            !(oa->o_valid & OBD_MD_FLGROUP) ||
            ostid_id(&oa->o_oi) == 0) {
@@ -2255,27 +2479,27 @@ static void echo_put_object(struct echo_object *eco)
 static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
                                         u64 offset, u64 count)
 {
-       char    *addr;
-       u64      stripe_off;
-       u64      stripe_id;
-       int      delta;
+       char *addr;
+       u64 stripe_off;
+       u64 stripe_id;
+       int delta;
 
-        /* no partial pages on the client */
-       LASSERT(count == PAGE_CACHE_SIZE);
+       /* no partial pages on the client */
+       LASSERT(count == PAGE_SIZE);
 
        addr = kmap(page);
 
-       for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
-                if (rw == OBD_BRW_WRITE) {
-                        stripe_off = offset + delta;
-                        stripe_id = id;
-                } else {
-                        stripe_off = 0xdeadbeef00c0ffeeULL;
-                        stripe_id = 0xdeadbeef00c0ffeeULL;
-                }
-                block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
-                                  stripe_off, stripe_id);
-        }
+       for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+               if (rw == OBD_BRW_WRITE) {
+                       stripe_off = offset + delta;
+                       stripe_id = id;
+               } else {
+                       stripe_off = 0xdeadbeef00c0ffeeULL;
+                       stripe_id = 0xdeadbeef00c0ffeeULL;
+               }
+               block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
+                                 stripe_off, stripe_id);
+       }
 
        kunmap(page);
 }
@@ -2283,91 +2507,91 @@ static void echo_client_page_debug_setup(struct page *page, int rw, u64 id,
 static int
 echo_client_page_debug_check(struct page *page, u64 id, u64 offset, u64 count)
 {
-       u64      stripe_off;
-       u64      stripe_id;
-        char   *addr;
-        int     delta;
-        int     rc;
-        int     rc2;
+       u64 stripe_off;
+       u64 stripe_id;
+       char *addr;
+       int delta;
+       int rc;
+       int rc2;
 
-        /* no partial pages on the client */
-       LASSERT(count == PAGE_CACHE_SIZE);
+       /* no partial pages on the client */
+       LASSERT(count == PAGE_SIZE);
 
        addr = kmap(page);
 
-       for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
-                stripe_off = offset + delta;
-                stripe_id = id;
+       for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+               stripe_off = offset + delta;
+               stripe_id = id;
 
-                rc2 = block_debug_check("test_brw",
-                                        addr + delta, OBD_ECHO_BLOCK_SIZE,
-                                        stripe_off, stripe_id);
-                if (rc2 != 0) {
-                        CERROR ("Error in echo object "LPX64"\n", id);
-                        rc = rc2;
-                }
-        }
+               rc2 = block_debug_check("test_brw",
+                                       addr + delta, OBD_ECHO_BLOCK_SIZE,
+                                       stripe_off, stripe_id);
+               if (rc2 != 0) {
+                       CERROR("Error in echo object %#llx\n", id);
+                       rc = rc2;
+               }
+       }
 
        kunmap(page);
-        return rc;
+       return rc;
 }
 
 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
                            struct echo_object *eco, u64 offset,
                            u64 count, int async)
 {
-       size_t                  npages;
-        struct brw_page        *pga;
-        struct brw_page        *pgp;
-       struct page            **pages;
-       u64                      off;
-       size_t                  i;
-        int                     rc;
-        int                     verify;
-       gfp_t                   gfp_mask;
-       u32                     brw_flags = 0;
-        ENTRY;
+       size_t npages;
+       struct brw_page *pga;
+       struct brw_page *pgp;
+       struct page **pages;
+       u64 off;
+       size_t i;
+       int rc;
+       int verify;
+       gfp_t gfp_mask;
+       u32 brw_flags = 0;
 
-        verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
-                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
-                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+       ENTRY;
+       verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
+                 (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+                 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
 
-       gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
+       gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
 
        LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
 
        if ((count & (~PAGE_MASK)) != 0)
                RETURN(-EINVAL);
 
-        /* XXX think again with misaligned I/O */
-       npages = count >> PAGE_CACHE_SHIFT;
+       /* XXX think again with misaligned I/O */
+       npages = count >> PAGE_SHIFT;
 
-        if (rw == OBD_BRW_WRITE)
-                brw_flags = OBD_BRW_ASYNC;
+       if (rw == OBD_BRW_WRITE)
+               brw_flags = OBD_BRW_ASYNC;
 
-        OBD_ALLOC(pga, npages * sizeof(*pga));
-        if (pga == NULL)
-                RETURN(-ENOMEM);
+       OBD_ALLOC(pga, npages * sizeof(*pga));
+       if (!pga)
+               RETURN(-ENOMEM);
 
-        OBD_ALLOC(pages, npages * sizeof(*pages));
-        if (pages == NULL) {
-                OBD_FREE(pga, npages * sizeof(*pga));
-                RETURN(-ENOMEM);
-        }
+       OBD_ALLOC(pages, npages * sizeof(*pages));
+       if (!pages) {
+               OBD_FREE(pga, npages * sizeof(*pga));
+               RETURN(-ENOMEM);
+       }
 
        for (i = 0, pgp = pga, off = offset;
             i < npages;
-            i++, pgp++, off += PAGE_CACHE_SIZE) {
+            i++, pgp++, off += PAGE_SIZE) {
 
                LASSERT(pgp->pg == NULL);       /* for cleanup */
 
                rc = -ENOMEM;
                pgp->pg = alloc_page(gfp_mask);
-               if (pgp->pg == NULL)
+               if (!pgp->pg)
                        goto out;
 
                pages[i] = pgp->pg;
-               pgp->count = PAGE_CACHE_SIZE;
+               pgp->count = PAGE_SIZE;
                pgp->off = off;
                pgp->flag = brw_flags;
 
@@ -2377,31 +2601,33 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
                                                     pgp->count);
        }
 
-        /* brw mode can only be used at client */
-        LASSERT(ed->ed_next != NULL);
-        rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
+       /* brw mode can only be used at client */
+       LASSERT(ed->ed_next != NULL);
+       rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
 
  out:
-        if (rc != 0 || rw != OBD_BRW_READ)
-                verify = 0;
+       if (rc != 0 || rw != OBD_BRW_READ)
+               verify = 0;
 
-        for (i = 0, pgp = pga; i < npages; i++, pgp++) {
-                if (pgp->pg == NULL)
-                        continue;
+       for (i = 0, pgp = pga; i < npages; i++, pgp++) {
+               if (!pgp->pg)
+                       continue;
 
                if (verify) {
                        int vrc;
+
                        vrc = echo_client_page_debug_check(pgp->pg,
                                                           ostid_id(&oa->o_oi),
-                                                          pgp->off, pgp->count);
+                                                          pgp->off,
+                                                          pgp->count);
                        if (vrc != 0 && rc == 0)
                                rc = vrc;
                }
                __free_page(pgp->pg);
-        }
-        OBD_FREE(pga, npages * sizeof(*pga));
-        OBD_FREE(pages, npages * sizeof(*pages));
-        RETURN(rc);
+       }
+       OBD_FREE(pga, npages * sizeof(*pga));
+       OBD_FREE(pages, npages * sizeof(*pages));
+       RETURN(rc);
 }
 
 static int echo_client_prep_commit(const struct lu_env *env,
@@ -2410,24 +2636,23 @@ static int echo_client_prep_commit(const struct lu_env *env,
                                   u64 offset, u64 count,
                                   u64 batch, int async)
 {
-       struct obd_ioobj         ioo;
-       struct niobuf_local     *lnb;
-       struct niobuf_remote     rnb;
-       u64                      off;
-       u64                      npages, tot_pages;
+       struct obd_ioobj ioo;
+       struct niobuf_local *lnb;
+       struct niobuf_remote rnb;
+       u64 off;
+       u64 npages, tot_pages, apc;
        int i, ret = 0, brw_flags = 0;
 
-        ENTRY;
-
-       if (count <= 0 || (count & ~PAGE_CACHE_MASK) != 0)
+       ENTRY;
+       if (count <= 0 || (count & ~PAGE_MASK) != 0)
                RETURN(-EINVAL);
 
-       npages = batch >> PAGE_CACHE_SHIFT;
-       tot_pages = count >> PAGE_CACHE_SHIFT;
+       apc = npages = batch >> PAGE_SHIFT;
+       tot_pages = count >> PAGE_SHIFT;
 
-       OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
-       if (lnb == NULL)
-               GOTO(out, ret = -ENOMEM);
+       OBD_ALLOC_LARGE(lnb, apc * sizeof(struct niobuf_local));
+       if (!lnb)
+               RETURN(-ENOMEM);
 
        if (rw == OBD_BRW_WRITE && async)
                brw_flags |= OBD_BRW_ASYNC;
@@ -2443,21 +2668,21 @@ static int echo_client_prep_commit(const struct lu_env *env,
                        npages = tot_pages;
 
                rnb.rnb_offset = off;
-               rnb.rnb_len = npages * PAGE_CACHE_SIZE;
+               rnb.rnb_len = npages * PAGE_SIZE;
                rnb.rnb_flags = brw_flags;
                ioo.ioo_bufcnt = 1;
-               off += npages * PAGE_CACHE_SIZE;
+               off += npages * PAGE_SIZE;
 
-                lpages = npages;
+               lpages = npages;
                ret = obd_preprw(env, rw, exp, oa, 1, &ioo, &rnb, &lpages, lnb);
-                if (ret != 0)
-                        GOTO(out, ret);
+               if (ret != 0)
+                       GOTO(out, ret);
 
                for (i = 0; i < lpages; i++) {
                        struct page *page = lnb[i].lnb_page;
 
                        /* read past eof? */
-                       if (page == NULL && lnb[i].lnb_rc == 0)
+                       if (!page && lnb[i].lnb_rc == 0)
                                continue;
 
                        if (async)
@@ -2482,8 +2707,8 @@ static int echo_client_prep_commit(const struct lu_env *env,
 
                ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, &rnb, npages, lnb,
                                   ret);
-                if (ret != 0)
-                        GOTO(out, ret);
+               if (ret != 0)
+                       break;
 
                /* Reuse env context. */
                lu_context_exit((struct lu_context *)&env->le_ctx);
@@ -2491,8 +2716,8 @@ static int echo_client_prep_commit(const struct lu_env *env,
        }
 
 out:
-       if (lnb)
-               OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
+       OBD_FREE_LARGE(lnb, apc * sizeof(struct niobuf_local));
+
        RETURN(ret);
 }
 
@@ -2500,42 +2725,42 @@ static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
                                 struct obd_export *exp,
                                 struct obd_ioctl_data *data)
 {
-        struct obd_device *obd = class_exp2obd(exp);
-        struct echo_device *ed = obd2echo_dev(obd);
-        struct echo_client_obd *ec = ed->ed_ec;
-        struct obdo *oa = &data->ioc_obdo1;
-        struct echo_object *eco;
-        int rc;
-        int async = 0;
-        long test_mode;
-        ENTRY;
+       struct obd_device *obd = class_exp2obd(exp);
+       struct echo_device *ed = obd2echo_dev(obd);
+       struct echo_client_obd *ec = ed->ed_ec;
+       struct obdo *oa = &data->ioc_obdo1;
+       struct echo_object *eco;
+       int rc;
+       int async = 0;
+       long test_mode;
 
-        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+       ENTRY;
+       LASSERT(oa->o_valid & OBD_MD_FLGROUP);
 
-        rc = echo_get_object(&eco, ed, oa);
-        if (rc)
-                RETURN(rc);
+       rc = echo_get_object(&eco, ed, oa);
+       if (rc)
+               RETURN(rc);
 
-        oa->o_valid &= ~OBD_MD_FLHANDLE;
+       oa->o_valid &= ~OBD_MD_FLHANDLE;
 
        /* OFD/obdfilter works only via prep/commit */
-        test_mode = (long)data->ioc_pbuf1;
-        if (ed->ed_next == NULL && test_mode != 3) {
-                test_mode = 3;
-                data->ioc_plen1 = data->ioc_count;
-        }
+       test_mode = (long)data->ioc_pbuf1;
+       if (!ed->ed_next && test_mode != 3) {
+               test_mode = 3;
+               data->ioc_plen1 = data->ioc_count;
+       }
 
        if (test_mode == 3)
                async = 1;
 
-        /* Truncate batch size to maximum */
-        if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
-                data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
+       /* Truncate batch size to maximum */
+       if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
+               data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
 
-        switch (test_mode) {
-        case 1:
-                /* fall through */
-        case 2:
+       switch (test_mode) {
+       case 1:
+               /* fall through */
+       case 2:
                rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset,
                                      data->ioc_count, async);
                break;
@@ -2555,63 +2780,67 @@ static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
 
 static int
 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
-                      void *karg, void *uarg)
+                     void *karg, void __user *uarg)
 {
 #ifdef HAVE_SERVER_SUPPORT
        struct tgt_session_info *tsi;
 #endif
-        struct obd_device      *obd = exp->exp_obd;
-        struct echo_device     *ed = obd2echo_dev(obd);
-        struct echo_client_obd *ec = ed->ed_ec;
-        struct echo_object     *eco;
-        struct obd_ioctl_data  *data = karg;
-        struct lu_env          *env;
-        struct obdo            *oa;
-        struct lu_fid           fid;
-        int                     rw = OBD_BRW_READ;
-        int                     rc = 0;
-#ifdef HAVE_SERVER_SUPPORT
-       struct lu_context        echo_session;
-#endif
-        ENTRY;
+       struct obd_device      *obd = exp->exp_obd;
+       struct echo_device     *ed = obd2echo_dev(obd);
+       struct echo_client_obd *ec = ed->ed_ec;
+       struct echo_object     *eco;
+       struct obd_ioctl_data  *data = karg;
+       struct lu_env          *env;
+       unsigned long           env_tags = 0;
+       __u16                   refcheck;
+       struct obdo            *oa;
+       struct lu_fid           fid;
+       int                     rw = OBD_BRW_READ;
+       int                     rc = 0;
 
+       ENTRY;
        oa = &data->ioc_obdo1;
        if (!(oa->o_valid & OBD_MD_FLGROUP)) {
                oa->o_valid |= OBD_MD_FLGROUP;
                ostid_set_seq_echo(&oa->o_oi);
        }
 
-        /* This FID is unpacked just for validation at this point */
-        rc = ostid_to_fid(&fid, &oa->o_oi, 0);
-        if (rc < 0)
-                RETURN(rc);
+       /* This FID is unpacked just for validation at this point */
+       rc = ostid_to_fid(&fid, &oa->o_oi, 0);
+       if (rc < 0)
+               RETURN(rc);
 
-        OBD_ALLOC_PTR(env);
-        if (env == NULL)
-                RETURN(-ENOMEM);
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
 
-       rc = lu_env_init(env, LCT_DT_THREAD);
-       if (rc)
-               GOTO(out_alloc, rc = -ENOMEM);
+       lu_env_add(env);
 
 #ifdef HAVE_SERVER_SUPPORT
-       env->le_ses = &echo_session;
-       rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
-       if (unlikely(rc < 0))
-               GOTO(out_env, rc);
-       lu_context_enter(env->le_ses);
+       if (cmd == OBD_IOC_ECHO_MD || cmd == OBD_IOC_ECHO_ALLOC_SEQ)
+               env_tags = ECHO_MD_CTX_TAG;
+       else
+#endif
+               env_tags = ECHO_DT_CTX_TAG;
+
+       rc = lu_env_refill_by_tags(env, env_tags, ECHO_SES_TAG);
+       if (rc != 0)
+               GOTO(out, rc);
 
+#ifdef HAVE_SERVER_SUPPORT
        tsi = tgt_ses_info(env);
-       tsi->tsi_exp = ec->ec_exp;
+       /* treat as local operation */
+       tsi->tsi_exp = NULL;
        tsi->tsi_jobid = NULL;
 #endif
-        switch (cmd) {
-        case OBD_IOC_CREATE:                    /* may create echo object */
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
+
+       switch (cmd) {
+       case OBD_IOC_CREATE:                    /* may create echo object */
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
 
                rc = echo_create_object(env, ed, oa);
-                GOTO(out, rc);
+               GOTO(out, rc);
 
 #ifdef HAVE_SERVER_SUPPORT
        case OBD_IOC_ECHO_MD: {
@@ -2630,7 +2859,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                id = data->ioc_obdo2.o_oi.oi.oi_id;
                dirlen = data->ioc_plen1;
                OBD_ALLOC(dir, dirlen + 1);
-               if (dir == NULL)
+               if (!dir)
                        GOTO(out, rc = -ENOMEM);
 
                if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
@@ -2642,319 +2871,287 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                OBD_FREE(dir, dirlen + 1);
                GOTO(out, rc);
        }
-        case OBD_IOC_ECHO_ALLOC_SEQ: {
-                struct lu_env   *cl_env;
-                int              refcheck;
-                __u64            seq;
-                int              max_count;
-
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO(out, rc = -EPERM);
-
-                cl_env = cl_env_get(&refcheck);
-                if (IS_ERR(cl_env))
-                        GOTO(out, rc = PTR_ERR(cl_env));
-
-                rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
-                                            ECHO_MD_SES_TAG);
-                if (rc != 0) {
-                        cl_env_put(cl_env, &refcheck);
-                        GOTO(out, rc);
-                }
-
-                rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
-                cl_env_put(cl_env, &refcheck);
-                if (rc < 0) {
-                        CERROR("%s: Can not alloc seq: rc = %d\n",
-                               obd->obd_name, rc);
-                        GOTO(out, rc);
-                }
+       case OBD_IOC_ECHO_ALLOC_SEQ: {
+               __u64            seq;
+               int              max_count;
+
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
+
+               rc = seq_client_get_seq(env, ed->ed_cl_seq, &seq);
+               if (rc < 0) {
+                       CERROR("%s: Can not alloc seq: rc = %d\n",
+                              obd->obd_name, rc);
+                       GOTO(out, rc);
+               }
 
                if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
-                        return -EFAULT;
+                       return -EFAULT;
 
                max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
                if (copy_to_user(data->ioc_pbuf2, &max_count,
                                     data->ioc_plen2))
                        return -EFAULT;
                GOTO(out, rc);
-        }
+       }
 #endif /* HAVE_SERVER_SUPPORT */
-        case OBD_IOC_DESTROY:
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
+       case OBD_IOC_DESTROY:
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
 
-                rc = echo_get_object(&eco, ed, oa);
-                if (rc == 0) {
+               rc = echo_get_object(&eco, ed, oa);
+               if (rc == 0) {
                        rc = obd_destroy(env, ec->ec_exp, oa);
-                        if (rc == 0)
-                                eco->eo_deleted = 1;
-                        echo_put_object(eco);
-                }
-                GOTO(out, rc);
-
-        case OBD_IOC_GETATTR:
-                rc = echo_get_object(&eco, ed, oa);
-                if (rc == 0) {
+                       if (rc == 0)
+                               eco->eo_deleted = 1;
+                       echo_put_object(eco);
+               }
+               GOTO(out, rc);
+
+       case OBD_IOC_GETATTR:
+               rc = echo_get_object(&eco, ed, oa);
+               if (rc == 0) {
                        rc = obd_getattr(env, ec->ec_exp, oa);
-                        echo_put_object(eco);
-                }
-                GOTO(out, rc);
+                       echo_put_object(eco);
+               }
+               GOTO(out, rc);
 
-        case OBD_IOC_SETATTR:
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
+       case OBD_IOC_SETATTR:
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
 
-                rc = echo_get_object(&eco, ed, oa);
-                if (rc == 0) {
+               rc = echo_get_object(&eco, ed, oa);
+               if (rc == 0) {
                        rc = obd_setattr(env, ec->ec_exp, oa);
-                        echo_put_object(eco);
-                }
-                GOTO(out, rc);
+                       echo_put_object(eco);
+               }
+               GOTO(out, rc);
 
-        case OBD_IOC_BRW_WRITE:
-                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                        GOTO (out, rc = -EPERM);
+       case OBD_IOC_BRW_WRITE:
+               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                       GOTO(out, rc = -EPERM);
 
-                rw = OBD_BRW_WRITE;
-                /* fall through */
-        case OBD_IOC_BRW_READ:
+               rw = OBD_BRW_WRITE;
+               /* fall through */
+       case OBD_IOC_BRW_READ:
                rc = echo_client_brw_ioctl(env, rw, exp, data);
-                GOTO(out, rc);
+               GOTO(out, rc);
 
-        default:
-                CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
-                GOTO (out, rc = -ENOTTY);
-        }
+       default:
+               CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
+               GOTO(out, rc = -ENOTTY);
+       }
 
-        EXIT;
+       EXIT;
 out:
-#ifdef HAVE_SERVER_SUPPORT
-       lu_context_exit(env->le_ses);
-       lu_context_fini(env->le_ses);
-out_env:
-#endif
-        lu_env_fini(env);
-out_alloc:
-        OBD_FREE_PTR(env);
+       lu_env_remove(env);
+       cl_env_put(env, &refcheck);
 
-        return rc;
+       return rc;
 }
 
 static int echo_client_setup(const struct lu_env *env,
-                             struct obd_device *obddev, struct lustre_cfg *lcfg)
-{
-        struct echo_client_obd *ec = &obddev->u.echo_client;
-        struct obd_device *tgt;
-        struct obd_uuid echo_uuid = { "ECHO_UUID" };
-        struct obd_connect_data *ocd = NULL;
-        int rc;
-        ENTRY;
-
-        if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
-                CERROR("requires a TARGET OBD name\n");
-                RETURN(-EINVAL);
-        }
-
-        tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
-        if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
-                CERROR("device not attached or not set up (%s)\n",
-                       lustre_cfg_string(lcfg, 1));
-                RETURN(-EINVAL);
-        }
+                            struct obd_device *obddev, struct lustre_cfg *lcfg)
+{
+       struct echo_client_obd *ec = &obddev->u.echo_client;
+       struct obd_device *tgt;
+       struct obd_uuid echo_uuid = { "ECHO_UUID" };
+       struct obd_connect_data *ocd = NULL;
+       int rc;
+
+       ENTRY;
+       if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
+               CERROR("requires a TARGET OBD name\n");
+               RETURN(-EINVAL);
+       }
+
+       tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
+       if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
+               CERROR("device not attached or not set up (%s)\n",
+                      lustre_cfg_string(lcfg, 1));
+               RETURN(-EINVAL);
+       }
 
        spin_lock_init(&ec->ec_lock);
        INIT_LIST_HEAD(&ec->ec_objects);
        INIT_LIST_HEAD(&ec->ec_locks);
-        ec->ec_unique = 0;
+       ec->ec_unique = 0;
+
+       lu_context_tags_update(ECHO_DT_CTX_TAG);
+       lu_session_tags_update(ECHO_SES_TAG);
 
        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
 #ifdef HAVE_SERVER_SUPPORT
                lu_context_tags_update(ECHO_MD_CTX_TAG);
-               lu_session_tags_update(ECHO_MD_SES_TAG);
 #else
-               CERROR("Local operations are NOT supported on client side. "
-                      "Only remote operations are supported. Metadata client "
-                      "must be run on server side.\n");
+               CERROR(
+                      "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
 #endif
                RETURN(0);
        }
 
-        OBD_ALLOC(ocd, sizeof(*ocd));
-        if (ocd == NULL) {
-                CERROR("Can't alloc ocd connecting to %s\n",
-                       lustre_cfg_string(lcfg, 1));
-                return -ENOMEM;
-        }
+       OBD_ALLOC(ocd, sizeof(*ocd));
+       if (!ocd) {
+               CERROR("Can't alloc ocd connecting to %s\n",
+                      lustre_cfg_string(lcfg, 1));
+               return -ENOMEM;
+       }
 
-        ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
+       ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
                                 OBD_CONNECT_BRW_SIZE |
-                                 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
+                                OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
                                 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
                                 OBD_CONNECT_FID;
        ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
-        ocd->ocd_version = LUSTRE_VERSION_CODE;
-        ocd->ocd_group = FID_SEQ_ECHO;
+       ocd->ocd_version = LUSTRE_VERSION_CODE;
+       ocd->ocd_group = FID_SEQ_ECHO;
 
-        rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
-        if (rc == 0) {
-                /* Turn off pinger because it connects to tgt obd directly. */
+       rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
+       if (rc == 0) {
+               /* Turn off pinger because it connects to tgt obd directly. */
                spin_lock(&tgt->obd_dev_lock);
                list_del_init(&ec->ec_exp->exp_obd_chain_timed);
                spin_unlock(&tgt->obd_dev_lock);
-        }
+       }
 
-        OBD_FREE(ocd, sizeof(*ocd));
+       OBD_FREE(ocd, sizeof(*ocd));
 
-        if (rc != 0) {
-                CERROR("fail to connect to device %s\n",
-                       lustre_cfg_string(lcfg, 1));
-                return (rc);
-        }
+       if (rc != 0) {
+               CERROR("fail to connect to device %s\n",
+                      lustre_cfg_string(lcfg, 1));
+               return rc;
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int echo_client_cleanup(struct obd_device *obddev)
 {
-        struct echo_device *ed = obd2echo_dev(obddev);
-        struct echo_client_obd *ec = &obddev->u.echo_client;
-        int rc;
-        ENTRY;
+       struct echo_device *ed = obd2echo_dev(obddev);
+       struct echo_client_obd *ec = &obddev->u.echo_client;
+       int rc;
 
-        /*Do nothing for Metadata echo client*/
-        if (ed == NULL )
-                RETURN(0);
+       ENTRY;
+       /*Do nothing for Metadata echo client*/
+       if (!ed)
+               RETURN(0);
 
-        if (ed->ed_next_ismd) {
+       lu_session_tags_clear(ECHO_SES_TAG & ~LCT_SESSION);
+       lu_context_tags_clear(ECHO_DT_CTX_TAG);
+       if (ed->ed_next_ismd) {
 #ifdef HAVE_SERVER_SUPPORT
                lu_context_tags_clear(ECHO_MD_CTX_TAG);
-               lu_session_tags_clear(ECHO_MD_SES_TAG);
 #else
-               CERROR("This is client-side only module, does not support "
-                       "metadata echo client.\n");
+               CERROR(
+                      "This is client-side only module, does not support metadata echo client.\n");
 #endif
-                RETURN(0);
-        }
+               RETURN(0);
+       }
 
        if (!list_empty(&obddev->obd_exports)) {
-                CERROR("still has clients!\n");
-                RETURN(-EBUSY);
-        }
+               CERROR("still has clients!\n");
+               RETURN(-EBUSY);
+       }
 
        LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
-        rc = obd_disconnect(ec->ec_exp);
-        if (rc != 0)
-                CERROR("fail to disconnect device: %d\n", rc);
+       rc = obd_disconnect(ec->ec_exp);
+       if (rc != 0)
+               CERROR("fail to disconnect device: %d\n", rc);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int echo_client_connect(const struct lu_env *env,
-                               struct obd_export **exp,
-                               struct obd_device *src, struct obd_uuid *cluuid,
-                               struct obd_connect_data *data, void *localdata)
+                              struct obd_export **exp,
+                              struct obd_device *src, struct obd_uuid *cluuid,
+                              struct obd_connect_data *data, void *localdata)
 {
-        int                rc;
-        struct lustre_handle conn = { 0 };
+       int rc;
+       struct lustre_handle conn = { 0 };
 
-        ENTRY;
-        rc = class_connect(&conn, src, cluuid);
-        if (rc == 0) {
-                *exp = class_conn2export(&conn);
-        }
+       ENTRY;
+       rc = class_connect(&conn, src, cluuid);
+       if (rc == 0)
+               *exp = class_conn2export(&conn);
 
-        RETURN (rc);
+       RETURN(rc);
 }
 
 static int echo_client_disconnect(struct obd_export *exp)
 {
-        int                     rc;
-        ENTRY;
+       int rc;
 
-        if (exp == NULL)
-                GOTO(out, rc = -EINVAL);
+       ENTRY;
+       if (!exp)
+               GOTO(out, rc = -EINVAL);
 
-        rc = class_disconnect(exp);
-        GOTO(out, rc);
- out:
-        return rc;
+       rc = class_disconnect(exp);
+       GOTO(out, rc);
+out:
+       return rc;
 }
 
 static struct obd_ops echo_client_obd_ops = {
-        .o_owner       = THIS_MODULE,
-        .o_iocontrol   = echo_client_iocontrol,
-        .o_connect     = echo_client_connect,
-        .o_disconnect  = echo_client_disconnect
+       .o_owner       = THIS_MODULE,
+       .o_iocontrol   = echo_client_iocontrol,
+       .o_connect     = echo_client_connect,
+       .o_disconnect  = echo_client_disconnect
 };
 
-static int echo_client_init(void)
-{
-        int rc;
-
-       rc = lu_kmem_init(echo_caches);
-       if (rc == 0) {
-               rc = class_register_type(&echo_client_obd_ops, NULL, true, NULL,
-                                        LUSTRE_ECHO_CLIENT_NAME,
-                                        &echo_device_type);
-               if (rc)
-                       lu_kmem_fini(echo_caches);
-       }
-       return rc;
-}
-
-static void echo_client_exit(void)
-{
-        class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
-        lu_kmem_fini(echo_caches);
-}
-
 static int __init obdecho_init(void)
 {
-        int rc;
+       int rc;
 
-        ENTRY;
-        LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
+       ENTRY;
+       LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
 
-       LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
+       LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
 
 # ifdef HAVE_SERVER_SUPPORT
-        rc = echo_persistent_pages_init();
-        if (rc != 0)
-                goto failed_0;
+       rc = echo_persistent_pages_init();
+       if (rc != 0)
+               goto failed_0;
 
        rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
-                                LUSTRE_ECHO_NAME, NULL);
+                                LUSTRE_ECHO_NAME, &echo_srv_type);
        if (rc != 0)
                goto failed_1;
 # endif
 
-        rc = echo_client_init();
+       rc = lu_kmem_init(echo_caches);
+       if (rc == 0) {
+               rc = class_register_type(&echo_client_obd_ops, NULL, false,
+                                        NULL, LUSTRE_ECHO_CLIENT_NAME,
+                                        &echo_device_type);
+               if (rc)
+                       lu_kmem_fini(echo_caches);
+       }
 
 # ifdef HAVE_SERVER_SUPPORT
-        if (rc == 0)
-                RETURN(0);
+       if (rc == 0)
+               RETURN(0);
 
-        class_unregister_type(LUSTRE_ECHO_NAME);
+       class_unregister_type(LUSTRE_ECHO_NAME);
 failed_1:
-        echo_persistent_pages_fini();
+       echo_persistent_pages_fini();
 failed_0:
 # endif
-        RETURN(rc);
+       RETURN(rc);
 }
 
-static void /*__exit*/ obdecho_exit(void)
+static void __exit obdecho_exit(void)
 {
-        echo_client_exit();
+       class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
+       lu_kmem_fini(echo_caches);
 
-# ifdef HAVE_SERVER_SUPPORT
-        class_unregister_type(LUSTRE_ECHO_NAME);
-        echo_persistent_pages_fini();
-# endif
+#ifdef HAVE_SERVER_SUPPORT
+       class_unregister_type(LUSTRE_ECHO_NAME);
+       echo_persistent_pages_fini();
+#endif
 }
 
-MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
-MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
+MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
+MODULE_DESCRIPTION("Lustre Echo Client test driver");
 MODULE_VERSION(LUSTRE_VERSION_STRING);
 MODULE_LICENSE("GPL");