Whamcloud - gitweb
LU-13799 llite: Adjust dio refcounting
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
index ee6167a..4042fe5 100644 (file)
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #define DEBUG_SUBSYSTEM S_ECHO
 
 #include <linux/user_namespace.h>
-#ifdef HAVE_UIDGID_HEADER
-# include <linux/uidgid.h>
-#endif
-#include <libcfs/libcfs.h>
+#include <linux/uidgid.h>
 
+#include <libcfs/libcfs.h>
 #include <obd.h>
 #include <obd_support.h>
 #include <obd_class.h>
-#include <lustre_debug.h>
 #include <lprocfs_status.h>
 #include <cl_object.h>
 #include <lustre_fid.h>
  * @{
  */
 
+/* echo thread key have a CL_THREAD flag, which set cl_env function directly */
+#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
+#define ECHO_DT_CTX_TAG (LCT_REMEMBER | LCT_DT_THREAD)
+#define ECHO_SES_TAG    (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
+
 struct echo_device {
        struct cl_device          ed_cl;
        struct echo_client_obd   *ed_ec;
@@ -94,7 +95,7 @@ struct echo_object_conf {
 
 struct echo_page {
        struct cl_page_slice    ep_cl;
-       struct mutex            ep_lock;
+       unsigned long           ep_lock;
 };
 
 struct echo_lock {
@@ -123,16 +124,16 @@ struct echo_md_device {
 #endif /* HAVE_SERVER_SUPPORT */
 
 static int echo_client_setup(const struct lu_env *env,
-                            struct obd_device *obddev,
+                            struct obd_device *obd,
                             struct lustre_cfg *lcfg);
-static int echo_client_cleanup(struct obd_device *obddev);
+static int echo_client_cleanup(struct obd_device *obd);
 
 /** \defgroup echo_helpers Helper functions
  * @{
  */
 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
 {
-       return container_of0(dev, struct echo_device, ed_cl);
+       return container_of_safe(dev, struct echo_device, ed_cl);
 }
 
 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
@@ -190,7 +191,8 @@ struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
 #ifdef HAVE_SERVER_SUPPORT
 static inline struct echo_md_device *lu2emd_dev(struct lu_device *d)
 {
-       return container_of0(d, struct echo_md_device, emd_md_dev.md_lu_dev);
+       return container_of_safe(d, struct echo_md_device,
+                                emd_md_dev.md_lu_dev);
 }
 
 static inline struct lu_device *emd2lu_dev(struct echo_md_device *d)
@@ -288,10 +290,13 @@ static int echo_page_own(const struct lu_env *env,
 {
        struct echo_page *ep = cl2echo_page(slice);
 
-       if (!nonblock)
-               mutex_lock(&ep->ep_lock);
-       else if (!mutex_trylock(&ep->ep_lock))
-               return -EAGAIN;
+       if (!nonblock) {
+               if (test_and_set_bit(0, &ep->ep_lock))
+                       return -EAGAIN;
+       } else {
+               while (test_and_set_bit(0, &ep->ep_lock))
+                       wait_on_bit(&ep->ep_lock, 0, TASK_UNINTERRUPTIBLE);
+       }
        return 0;
 }
 
@@ -301,8 +306,8 @@ static void echo_page_disown(const struct lu_env *env,
 {
        struct echo_page *ep = cl2echo_page(slice);
 
-       LASSERT(mutex_is_locked(&ep->ep_lock));
-       mutex_unlock(&ep->ep_lock);
+       LASSERT(test_bit(0, &ep->ep_lock));
+       clear_and_wake_up_bit(0, &ep->ep_lock);
 }
 
 static void echo_page_discard(const struct lu_env *env,
@@ -315,7 +320,7 @@ static void echo_page_discard(const struct lu_env *env,
 static int echo_page_is_vmlocked(const struct lu_env *env,
                                 const struct cl_page_slice *slice)
 {
-       if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
+       if (test_bit(0, &cl2echo_page(slice)->ep_lock))
                return -EBUSY;
        return -ENODATA;
 }
@@ -353,7 +358,7 @@ static int echo_page_print(const struct lu_env *env,
        struct echo_page *ep = cl2echo_page(slice);
 
        (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
-                  ep, mutex_is_locked(&ep->ep_lock),
+                  ep, test_bit(0, &ep->ep_lock),
                   slice->cpl_page->cp_vmpage);
        return 0;
 }
@@ -394,8 +399,8 @@ static void echo_lock_fini(const struct lu_env *env,
        OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
 }
 
-static struct cl_lock_operations echo_lock_ops = {
-       .clo_fini      = echo_lock_fini,
+static const struct cl_lock_operations echo_lock_ops = {
+       .clo_fini       = echo_lock_fini,
 };
 
 /** @} echo_lock */
@@ -414,8 +419,14 @@ static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
 
        ENTRY;
        get_page(page->cp_vmpage);
-       mutex_init(&ep->ep_lock);
-       cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
+       /*
+        * ep_lock is similar to the lock_page() lock, and
+        * cannot usefully be monitored by lockdep.
+        * So just use a bit in an "unsigned long" and use the
+        * wait_on_bit() interface to wait for the bit to be clear.
+        */
+       ep->ep_lock = 0;
+       cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
        atomic_inc(&eco->eo_npages);
        RETURN(0);
 }
@@ -510,25 +521,48 @@ static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
        RETURN(0);
 }
 
-static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
+static void echo_object_delete(const struct lu_env *env, struct lu_object *obj)
 {
        struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
-       struct echo_client_obd *ec = eco->eo_dev->ed_ec;
+       struct echo_client_obd *ec;
 
        ENTRY;
+
+       /* object delete called unconditolally - layer init or not */
+       if (eco->eo_dev == NULL)
+               return;
+
+       ec = eco->eo_dev->ed_ec;
+
        LASSERT(atomic_read(&eco->eo_npages) == 0);
 
        spin_lock(&ec->ec_lock);
        list_del_init(&eco->eo_obj_chain);
        spin_unlock(&ec->ec_lock);
 
-       lu_object_fini(obj);
-       lu_object_header_fini(obj->lo_header);
-
        if (eco->eo_oinfo)
                OBD_FREE_PTR(eco->eo_oinfo);
+}
 
-       OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
+static void echo_object_free_rcu(struct rcu_head *head)
+{
+       struct echo_object *eco = container_of(head, struct echo_object,
+                                              eo_hdr.coh_lu.loh_rcu);
+
+       kmem_cache_free(echo_object_kmem, eco);
+}
+
+static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
+{
+       struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
+
+       ENTRY;
+
+       lu_object_fini(obj);
+       lu_object_header_fini(obj->lo_header);
+
+       OBD_FREE_PRE(eco, sizeof(*eco), "slab-freed");
+       call_rcu(&eco->eo_hdr.coh_lu.loh_rcu, echo_object_free_rcu);
        EXIT;
 }
 
@@ -542,7 +576,7 @@ static int echo_object_print(const struct lu_env *env, void *cookie,
 
 static const struct lu_object_operations echo_lu_obj_ops = {
        .loo_object_init      = echo_object_init,
-       .loo_object_delete    = NULL,
+       .loo_object_delete    = echo_object_delete,
        .loo_object_release   = NULL,
        .loo_object_free      = echo_object_free,
        .loo_object_print     = echo_object_print,
@@ -583,7 +617,7 @@ static struct lu_object *echo_object_alloc(const struct lu_env *env,
        RETURN(obj);
 }
 
-static struct lu_device_operations echo_device_lu_ops = {
+static const struct lu_device_operations echo_device_lu_ops = {
        .ldo_object_alloc   = echo_object_alloc,
 };
 
@@ -698,13 +732,11 @@ static int echo_fid_init(struct echo_device *ed, char *obd_name,
        snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
 
        /* Init client side sequence-manager */
-       rc = seq_client_init(ed->ed_cl_seq, NULL,
-                            LUSTRE_SEQ_METADATA,
-                            prefix, ss->ss_server_seq);
+       seq_client_init(ed->ed_cl_seq, NULL,
+                       LUSTRE_SEQ_METADATA,
+                       prefix, ss->ss_server_seq);
        ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
        OBD_FREE(prefix, MAX_OBD_NAME + 5);
-       if (rc)
-               GOTO(out_free_seq, rc);
 
        RETURN(0);
 
@@ -714,9 +746,9 @@ out_free_seq:
        RETURN(rc);
 }
 
-static int echo_fid_fini(struct obd_device *obddev)
+static int echo_fid_fini(struct obd_device *obd)
 {
-       struct echo_device *ed = obd2echo_dev(obddev);
+       struct echo_device *ed = obd2echo_dev(obd);
 
        ENTRY;
        if (ed->ed_cl_seq) {
@@ -972,13 +1004,17 @@ out:
                        CERROR("Cleanup obd device %s error(%d)\n",
                               obd->obd_name, rc2);
        }
+       /* fallthrough */
 
        case 3:
                echo_site_fini(env, ed);
+               /* fallthrough */
        case 2:
                cl_device_fini(&ed->ed_cl);
+               /* fallthrough */
        case 1:
                OBD_FREE_PTR(ed);
+               /* fallthrough */
        case 0:
        default:
                break;
@@ -1049,8 +1085,7 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
                spin_unlock(&ec->ec_lock);
                CERROR(
                       "echo_client still has objects at cleanup time, wait for 1 second\n");
-               set_current_state(TASK_UNINTERRUPTIBLE);
-               schedule_timeout(cfs_time_seconds(1));
+               schedule_timeout_uninterruptible(cfs_time_seconds(1));
                lu_site_purge(env, ed->ed_site, -1);
                spin_lock(&ec->ec_lock);
        }
@@ -1278,16 +1313,23 @@ static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
 }
 
 static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
-                                struct cl_page *page)
+                                struct pagevec *pvec)
 {
        struct echo_thread_info *info;
        struct cl_2queue        *queue;
+       int i = 0;
 
        info = echo_env_info(env);
        LASSERT(io == &info->eti_io);
 
        queue = &info->eti_queue;
-       cl_page_list_add(&queue->c2_qout, page);
+
+       for (i = 0; i < pagevec_count(pvec); i++) {
+               struct page *vmpage = pvec->pages[i];
+               struct cl_page *page = (struct cl_page *)vmpage->private;
+
+               cl_page_list_add(&queue->c2_qout, page, true);
+       }
 }
 
 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
@@ -1349,7 +1391,7 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
                        break;
                }
 
-               cl_2queue_add(queue, clp);
+               cl_2queue_add(queue, clp, true);
 
                /*
                 * drop the reference count for cl_page_find, so that the page
@@ -1519,7 +1561,7 @@ static int echo_attr_get_complex(const struct lu_env *env,
                }
        }
 
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
        if ((ma->ma_need & MA_ACL_DEF) && S_ISDIR(mode)) {
                buf->lb_buf = ma->ma_acl;
                buf->lb_len = ma->ma_acl_size;
@@ -1648,7 +1690,7 @@ echo_md_dir_stripe_choose(const struct lu_env *env, struct echo_device *ed,
        }
 
        lmv = (struct lmv_mds_md_v1 *)ma->ma_lmm;
-       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) {
+       if (!lmv_is_sane(lmv)) {
                rc = -EINVAL;
                CERROR("Invalid mds md magic %x "DFID": rc = %d\n",
                       le32_to_cpu(lmv->lmv_magic), PFID(lu_object_fid(obj)),
@@ -1664,9 +1706,8 @@ echo_md_dir_stripe_choose(const struct lu_env *env, struct echo_device *ed,
                echo_md_build_name(&tmp_ln_name, info->eti_name, id);
        }
 
-       idx = lmv_name_to_stripe_index(LMV_HASH_TYPE_FNV_1A_64,
-                               le32_to_cpu(lmv->lmv_stripe_count),
-                               tmp_ln_name.ln_name, tmp_ln_name.ln_namelen);
+       idx = lmv_name_to_stripe_index(lmv, tmp_ln_name.ln_name,
+                                      tmp_ln_name.ln_namelen);
 
        LASSERT(idx < le32_to_cpu(lmv->lmv_stripe_count));
        fid_le_to_cpu(&stripe_fid, &lmv->lmv_stripe_fids[idx]);
@@ -2204,6 +2245,7 @@ static struct lu_object *echo_resolve_path(const struct lu_env *env,
 static void echo_ucred_init(struct lu_env *env)
 {
        struct lu_ucred *ucred = lu_ucred(env);
+       kernel_cap_t kcap = current_cap();
 
        ucred->uc_valid = UCRED_INVALID;
 
@@ -2221,8 +2263,11 @@ static void echo_ucred_init(struct lu_env *env)
        ucred->uc_cap = cfs_curproc_cap_pack();
 
        /* remove fs privilege for non-root user. */
-       if (ucred->uc_fsuid)
-               ucred->uc_cap &= ~CFS_CAP_FS_MASK;
+       if (ucred->uc_fsuid) {
+               kcap = cap_drop_nfsd_set(kcap);
+               kcap = cap_drop_fs_set(kcap);
+       }
+       ucred->uc_cap = kcap.cap[0];
        ucred->uc_valid = UCRED_NEW;
 }
 
@@ -2233,8 +2278,6 @@ static void echo_ucred_fini(struct lu_env *env)
        ucred->uc_valid = UCRED_INIT;
 }
 
-#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
-#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
 static int echo_md_handler(struct echo_device *ed, int command,
                           char *path, int path_len, __u64 id, int count,
                           struct obd_ioctl_data *data)
@@ -2263,7 +2306,7 @@ static int echo_md_handler(struct echo_device *ed, int command,
        if (IS_ERR(env))
                RETURN(PTR_ERR(env));
 
-       rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
+       rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_SES_TAG);
        if (rc != 0)
                GOTO(out_env, rc);
 
@@ -2532,13 +2575,13 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
        if (rw == OBD_BRW_WRITE)
                brw_flags = OBD_BRW_ASYNC;
 
-       OBD_ALLOC(pga, npages * sizeof(*pga));
+       OBD_ALLOC_PTR_ARRAY_LARGE(pga, npages);
        if (!pga)
                RETURN(-ENOMEM);
 
-       OBD_ALLOC(pages, npages * sizeof(*pages));
+       OBD_ALLOC_PTR_ARRAY_LARGE(pages, npages);
        if (!pages) {
-               OBD_FREE(pga, npages * sizeof(*pga));
+               OBD_FREE_PTR_ARRAY_LARGE(pga, npages);
                RETURN(-ENOMEM);
        }
 
@@ -2553,6 +2596,8 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
                if (!pgp->pg)
                        goto out;
 
+               /* set mapping so page is not considered encrypted */
+               pgp->pg->mapping = ECHO_MAPPING_UNENCRYPTED;
                pages[i] = pgp->pg;
                pgp->count = PAGE_SIZE;
                pgp->off = off;
@@ -2588,8 +2633,8 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
                }
                __free_page(pgp->pg);
        }
-       OBD_FREE(pga, npages * sizeof(*pga));
-       OBD_FREE(pages, npages * sizeof(*pages));
+       OBD_FREE_PTR_ARRAY_LARGE(pga, npages);
+       OBD_FREE_PTR_ARRAY_LARGE(pages, npages);
        RETURN(rc);
 }
 
@@ -2613,7 +2658,7 @@ static int echo_client_prep_commit(const struct lu_env *env,
        apc = npages = batch >> PAGE_SHIFT;
        tot_pages = count >> PAGE_SHIFT;
 
-       OBD_ALLOC_LARGE(lnb, apc * sizeof(struct niobuf_local));
+       OBD_ALLOC_PTR_ARRAY_LARGE(lnb, apc);
        if (!lnb)
                RETURN(-ENOMEM);
 
@@ -2679,7 +2724,7 @@ static int echo_client_prep_commit(const struct lu_env *env,
        }
 
 out:
-       OBD_FREE_LARGE(lnb, apc * sizeof(struct niobuf_local));
+       OBD_FREE_PTR_ARRAY_LARGE(lnb, apc);
 
        RETURN(ret);
 }
@@ -2754,13 +2799,12 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
        struct echo_object     *eco;
        struct obd_ioctl_data  *data = karg;
        struct lu_env          *env;
+       unsigned long           env_tags = 0;
+       __u16                   refcheck;
        struct obdo            *oa;
        struct lu_fid           fid;
        int                     rw = OBD_BRW_READ;
        int                     rc = 0;
-#ifdef HAVE_SERVER_SUPPORT
-       struct lu_context        echo_session;
-#endif
 
        ENTRY;
        oa = &data->ioc_obdo1;
@@ -2774,31 +2818,33 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
        if (rc < 0)
                RETURN(rc);
 
-       OBD_ALLOC_PTR(env);
-       if (!env)
-               RETURN(-ENOMEM);
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
 
-       rc = lu_env_init(env, LCT_DT_THREAD);
-       if (rc)
-               GOTO(out_alloc, rc = -ENOMEM);
        lu_env_add(env);
-       if (rc)
-               GOTO(out_env_fini, rc = -ENOMEM);
 
 #ifdef HAVE_SERVER_SUPPORT
-       env->le_ses = &echo_session;
-       rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
-       if (unlikely(rc < 0))
-               GOTO(out_env, rc);
-       lu_context_enter(env->le_ses);
+       if (cmd == OBD_IOC_ECHO_MD || cmd == OBD_IOC_ECHO_ALLOC_SEQ)
+               env_tags = ECHO_MD_CTX_TAG;
+       else
+#endif
+               env_tags = ECHO_DT_CTX_TAG;
+
+       rc = lu_env_refill_by_tags(env, env_tags, ECHO_SES_TAG);
+       if (rc != 0)
+               GOTO(out, rc);
 
+#ifdef HAVE_SERVER_SUPPORT
        tsi = tgt_ses_info(env);
-       tsi->tsi_exp = ec->ec_exp;
+       /* treat as local operation */
+       tsi->tsi_exp = NULL;
        tsi->tsi_jobid = NULL;
 #endif
+
        switch (cmd) {
        case OBD_IOC_CREATE:                    /* may create echo object */
-               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+               if (!capable(CAP_SYS_ADMIN))
                        GOTO(out, rc = -EPERM);
 
                rc = echo_create_object(env, ed, oa);
@@ -2812,7 +2858,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                int dirlen;
                __u64 id;
 
-               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+               if (!capable(CAP_SYS_ADMIN))
                        GOTO(out, rc = -EPERM);
 
                count = data->ioc_count;
@@ -2834,27 +2880,13 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                GOTO(out, rc);
        }
        case OBD_IOC_ECHO_ALLOC_SEQ: {
-               struct lu_env   *cl_env;
-               __u16            refcheck;
                __u64            seq;
                int              max_count;
 
-               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+               if (!capable(CAP_SYS_ADMIN))
                        GOTO(out, rc = -EPERM);
 
-               cl_env = cl_env_get(&refcheck);
-               if (IS_ERR(cl_env))
-                       GOTO(out, rc = PTR_ERR(cl_env));
-
-               rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
-                                          ECHO_MD_SES_TAG);
-               if (rc != 0) {
-                       cl_env_put(cl_env, &refcheck);
-                       GOTO(out, rc);
-               }
-
-               rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
-               cl_env_put(cl_env, &refcheck);
+               rc = seq_client_get_seq(env, ed->ed_cl_seq, &seq);
                if (rc < 0) {
                        CERROR("%s: Can not alloc seq: rc = %d\n",
                               obd->obd_name, rc);
@@ -2872,7 +2904,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
        }
 #endif /* HAVE_SERVER_SUPPORT */
        case OBD_IOC_DESTROY:
-               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+               if (!capable(CAP_SYS_ADMIN))
                        GOTO(out, rc = -EPERM);
 
                rc = echo_get_object(&eco, ed, oa);
@@ -2893,7 +2925,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                GOTO(out, rc);
 
        case OBD_IOC_SETATTR:
-               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+               if (!capable(CAP_SYS_ADMIN))
                        GOTO(out, rc = -EPERM);
 
                rc = echo_get_object(&eco, ed, oa);
@@ -2904,7 +2936,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                GOTO(out, rc);
 
        case OBD_IOC_BRW_WRITE:
-               if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+               if (!capable(CAP_SYS_ADMIN))
                        GOTO(out, rc = -EPERM);
 
                rw = OBD_BRW_WRITE;
@@ -2920,24 +2952,16 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
        EXIT;
 out:
-#ifdef HAVE_SERVER_SUPPORT
-       lu_context_exit(env->le_ses);
-       lu_context_fini(env->le_ses);
-out_env:
-#endif
        lu_env_remove(env);
-out_env_fini:
-       lu_env_fini(env);
-out_alloc:
-       OBD_FREE_PTR(env);
+       cl_env_put(env, &refcheck);
 
        return rc;
 }
 
 static int echo_client_setup(const struct lu_env *env,
-                            struct obd_device *obddev, struct lustre_cfg *lcfg)
+                            struct obd_device *obd, struct lustre_cfg *lcfg)
 {
-       struct echo_client_obd *ec = &obddev->u.echo_client;
+       struct echo_client_obd *ec = &obd->u.echo_client;
        struct obd_device *tgt;
        struct obd_uuid echo_uuid = { "ECHO_UUID" };
        struct obd_connect_data *ocd = NULL;
@@ -2961,10 +2985,12 @@ static int echo_client_setup(const struct lu_env *env,
        INIT_LIST_HEAD(&ec->ec_locks);
        ec->ec_unique = 0;
 
+       lu_context_tags_update(ECHO_DT_CTX_TAG);
+       lu_session_tags_update(ECHO_SES_TAG);
+
        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
 #ifdef HAVE_SERVER_SUPPORT
                lu_context_tags_update(ECHO_MD_CTX_TAG);
-               lu_session_tags_update(ECHO_MD_SES_TAG);
 #else
                CERROR(
                       "Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
@@ -2983,7 +3009,9 @@ static int echo_client_setup(const struct lu_env *env,
                                 OBD_CONNECT_BRW_SIZE |
                                 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
                                 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
-                                OBD_CONNECT_FID;
+                                OBD_CONNECT_FID | OBD_CONNECT_FLAGS2;
+       ocd->ocd_connect_flags2 = OBD_CONNECT2_REP_MBITS;
+
        ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
        ocd->ocd_version = LUSTRE_VERSION_CODE;
        ocd->ocd_group = FID_SEQ_ECHO;
@@ -3007,10 +3035,10 @@ static int echo_client_setup(const struct lu_env *env,
        RETURN(rc);
 }
 
-static int echo_client_cleanup(struct obd_device *obddev)
+static int echo_client_cleanup(struct obd_device *obd)
 {
-       struct echo_device *ed = obd2echo_dev(obddev);
-       struct echo_client_obd *ec = &obddev->u.echo_client;
+       struct echo_device *ed = obd2echo_dev(obd);
+       struct echo_client_obd *ec = &obd->u.echo_client;
        int rc;
 
        ENTRY;
@@ -3018,10 +3046,11 @@ static int echo_client_cleanup(struct obd_device *obddev)
        if (!ed)
                RETURN(0);
 
+       lu_session_tags_clear(ECHO_SES_TAG & ~LCT_SESSION);
+       lu_context_tags_clear(ECHO_DT_CTX_TAG);
        if (ed->ed_next_ismd) {
 #ifdef HAVE_SERVER_SUPPORT
                lu_context_tags_clear(ECHO_MD_CTX_TAG);
-               lu_session_tags_clear(ECHO_MD_SES_TAG);
 #else
                CERROR(
                       "This is client-side only module, does not support metadata echo client.\n");
@@ -3029,12 +3058,12 @@ static int echo_client_cleanup(struct obd_device *obddev)
                RETURN(0);
        }
 
-       if (!list_empty(&obddev->obd_exports)) {
+       if (!list_empty(&obd->obd_exports)) {
                CERROR("still has clients!\n");
                RETURN(-EBUSY);
        }
 
-       LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
+       LASSERT(refcount_read(&ec->ec_exp->exp_handle.h_ref) > 0);
        rc = obd_disconnect(ec->ec_exp);
        if (rc != 0)
                CERROR("fail to disconnect device: %d\n", rc);
@@ -3072,7 +3101,7 @@ out:
        return rc;
 }
 
-static struct obd_ops echo_client_obd_ops = {
+static const struct obd_ops echo_client_obd_ops = {
        .o_owner       = THIS_MODULE,
        .o_iocontrol   = echo_client_iocontrol,
        .o_connect     = echo_client_connect,
@@ -3093,7 +3122,7 @@ static int __init obdecho_init(void)
        if (rc != 0)
                goto failed_0;
 
-       rc = class_register_type(&echo_obd_ops, NULL, true, NULL,
+       rc = class_register_type(&echo_obd_ops, NULL, true,
                                 LUSTRE_ECHO_NAME, &echo_srv_type);
        if (rc != 0)
                goto failed_1;
@@ -3102,7 +3131,7 @@ static int __init obdecho_init(void)
        rc = lu_kmem_init(echo_caches);
        if (rc == 0) {
                rc = class_register_type(&echo_client_obd_ops, NULL, false,
-                                        NULL, LUSTRE_ECHO_CLIENT_NAME,
+                                        LUSTRE_ECHO_CLIENT_NAME,
                                         &echo_device_type);
                if (rc)
                        lu_kmem_fini(echo_caches);