Whamcloud - gitweb
LU-593 obdclass: echo client for MDS stack
authorwangdi <di.wang@whamcloud.com>
Wed, 16 Nov 2011 22:55:23 +0000 (14:55 -0800)
committerOleg Drokin <green@whamcloud.com>
Sat, 7 Jan 2012 04:08:54 +0000 (23:08 -0500)
1. Add interfaces and tools for exercising a local MDT
   device for performance reasons, in a similar manner
   to obdfilter-survey.
2. add test_create, test_mkdir, test_lookup, test_destroy,
   test_rmdir, test_setxattr, test_md_getattr in lctl for
   md echo client test.

Signed-off-by: Wang di <di.wang@whamcloud.com>
Change-Id: Ibf774a567820ff36b3624e44371c63a9428d82a5
Reviewed-on: http://review.whamcloud.com/1287
Tested-by: Hudson
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
16 files changed:
lustre/fid/fid_request.c
lustre/include/cl_object.h
lustre/include/lu_object.h
lustre/include/lustre_fid.h
lustre/include/lustre_lib.h
lustre/mdc/mdc_request.c
lustre/mdd/mdd_dir.c
lustre/mdt/mdt_handler.c
lustre/obdclass/cl_object.c
lustre/obdclass/lu_object.c
lustre/obdecho/echo_client.c
lustre/osd-ldiskfs/osd_handler.c
lustre/utils/Makefile.am
lustre/utils/lctl.c
lustre/utils/obd.c
lustre/utils/obdctl.h

index 214f542..ca33efa 100644 (file)
@@ -168,8 +168,8 @@ int seq_client_alloc_super(struct lu_client_seq *seq,
 }
 
 /* Request sequence-controller node to allocate new meta-sequence. */
-static int seq_client_alloc_meta(struct lu_client_seq *seq,
-                                 const struct lu_env *env)
+static int seq_client_alloc_meta(const struct lu_env *env,
+                                 struct lu_client_seq *seq)
 {
         int rc;
         ENTRY;
@@ -189,7 +189,8 @@ static int seq_client_alloc_meta(struct lu_client_seq *seq,
 }
 
 /* Allocate new sequence for client. */
-static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
+static int seq_client_alloc_seq(const struct lu_env *env,
+                                struct lu_client_seq *seq, seqno_t *seqnr)
 {
         int rc;
         ENTRY;
@@ -197,9 +198,9 @@ static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
         LASSERT(range_is_sane(&seq->lcs_space));
 
         if (range_is_exhausted(&seq->lcs_space)) {
-                rc = seq_client_alloc_meta(seq, NULL);
+                rc = seq_client_alloc_meta(env, seq);
                 if (rc) {
-                        CERROR("%s: Can't allocate new meta-sequence, "
+                        CERROR("%s: Can't allocate new meta-sequence,"
                                "rc %d\n", seq->lcs_name, rc);
                         RETURN(rc);
                 } else {
@@ -248,8 +249,55 @@ static void seq_fid_alloc_fini(struct lu_client_seq *seq)
         cfs_waitq_signal(&seq->lcs_waitq);
 }
 
+/* Allocate the whole seq to the caller*/
+int seq_client_get_seq(const struct lu_env *env,
+                       struct lu_client_seq *seq, seqno_t *seqnr)
+{
+        cfs_waitlink_t link;
+        int rc;
+
+        LASSERT(seqnr != NULL);
+        cfs_down(&seq->lcs_sem);
+        cfs_waitlink_init(&link);
+
+        while (1) {
+                rc = seq_fid_alloc_prep(seq, &link);
+                if (rc == 0)
+                        break;
+        }
+
+        rc = seq_client_alloc_seq(env, seq, seqnr);
+        if (rc) {
+                CERROR("%s: Can't allocate new sequence, "
+                       "rc %d\n", seq->lcs_name, rc);
+                seq_fid_alloc_fini(seq);
+                cfs_up(&seq->lcs_sem);
+                return rc;
+        }
+
+        CDEBUG(D_INFO, "%s: allocate sequence "
+               "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr);
+
+        /*Since the caller require the whole seq,
+         *so marked this seq to be used*/
+        seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH;
+        seq->lcs_fid.f_seq = *seqnr;
+        seq->lcs_fid.f_ver = 0;
+
+        /*
+         * Inform caller that sequence switch is performed to allow it
+         * to setup FLD for it.
+         */
+        seq_fid_alloc_fini(seq);
+        cfs_up(&seq->lcs_sem);
+
+        return rc;
+}
+EXPORT_SYMBOL(seq_client_get_seq);
+
 /* Allocate new fid on passed client @seq and save it to @fid. */
-int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
+int seq_client_alloc_fid(const struct lu_env *env,
+                         struct lu_client_seq *seq, struct lu_fid *fid)
 {
         cfs_waitlink_t link;
         int rc;
@@ -276,7 +324,7 @@ int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
                 if (rc)
                         continue;
 
-                rc = seq_client_alloc_seq(seq, &seqnr);
+                rc = seq_client_alloc_seq(env, seq, &seqnr);
                 if (rc) {
                         CERROR("%s: Can't allocate new sequence, "
                                "rc %d\n", seq->lcs_name, rc);
index 3d44b1c..cae226b 100644 (file)
@@ -3123,6 +3123,8 @@ void           cl_env_implant    (struct lu_env *env, int *refcheck);
 void           cl_env_unplant    (struct lu_env *env, int *refcheck);
 unsigned       cl_env_cache_purge(unsigned nr);
 
+void           cl_set_ctx_tags(__u32 tags);
+void           cl_set_ses_tags(__u32 tags);
 /** @} cl_env */
 
 /*
index 4e035cd..f431e0b 100644 (file)
@@ -293,6 +293,10 @@ struct lu_device {
          * A list of references to this object, for debugging.
          */
         struct lu_ref                      ld_reference;
+        /**
+         * Link the device to the site.
+         **/
+        cfs_list_t                         ld_linkage;
 };
 
 struct lu_device_type_operations;
@@ -631,6 +635,13 @@ struct lu_site {
          */
         cfs_list_t                ls_linkage;
         /**
+         * List for lu device for this site, protected
+         * by ls_ld_lock.
+         **/
+        cfs_list_t                ls_ld_linkage;
+        cfs_spinlock_t            ls_ld_lock;
+
+        /**
          * lu_site stats
          */
         struct lprocfs_stats     *ls_stats;
index adb2671..4748466 100644 (file)
@@ -262,8 +262,10 @@ void seq_client_fini(struct lu_client_seq *seq);
 
 void seq_client_flush(struct lu_client_seq *seq);
 
-int seq_client_alloc_fid(struct lu_client_seq *seq,
+int seq_client_alloc_fid(const struct lu_env *env, struct lu_client_seq *seq,
                          struct lu_fid *fid);
+int seq_client_get_seq(const struct lu_env *env, struct lu_client_seq *seq,
+                       seqno_t *seqnr);
 
 /* Fids common stuff */
 int fid_is_local(const struct lu_env *env,
index 499385d..37dd25d 100644 (file)
@@ -139,6 +139,20 @@ void l_unlock(struct lustre_lock *);
 int l_has_lock(struct lustre_lock *);
 
 /*
+ * For md echo client
+ */
+enum md_echo_cmd {
+        ECHO_MD_CREATE       = 1, /* Open/Create file on MDT */
+        ECHO_MD_MKDIR        = 2, /* Mkdir on MDT */
+        ECHO_MD_DESTROY      = 3, /* Unlink file on MDT */
+        ECHO_MD_RMDIR        = 4, /* Rmdir on MDT */
+        ECHO_MD_LOOKUP       = 5, /* Lookup on MDT */
+        ECHO_MD_GETATTR      = 6, /* Getattr on MDT */
+        ECHO_MD_SETATTR      = 7, /* Setattr on MDT */
+        ECHO_MD_ALLOC_FID    = 8, /* Get FIDs from MDT */
+};
+
+/*
  *   OBD IOCTLS
  */
 #define OBD_IOCTL_VERSION 0x00010004
@@ -278,8 +292,11 @@ static inline int obd_ioctl_pack(struct obd_ioctl_data *data, char **pbuf,
         data->ioc_len = obd_ioctl_packlen(data);
         data->ioc_version = OBD_IOCTL_VERSION;
 
-        if (*pbuf && data->ioc_len > max)
+        if (*pbuf && data->ioc_len > max) {
+                fprintf(stderr, "pbuf %p ioc_len %u max %d\n", *pbuf,
+                        data->ioc_len, max);
                 return -EINVAL;
+        }
         if (*pbuf == NULL) {
                 *pbuf = malloc(data->ioc_len);
         }
@@ -297,8 +314,11 @@ static inline int obd_ioctl_pack(struct obd_ioctl_data *data, char **pbuf,
                 LOGL(data->ioc_inlbuf3, data->ioc_inllen3, ptr);
         if (data->ioc_inlbuf4)
                 LOGL(data->ioc_inlbuf4, data->ioc_inllen4, ptr);
-        if (obd_ioctl_is_invalid(overlay))
+        if (obd_ioctl_is_invalid(overlay)) {
+                fprintf(stderr, "ioc_len %u max %d\n",
+                        data->ioc_len, max);
                 return -EINVAL;
+        }
 
         return 0;
 }
@@ -539,6 +559,8 @@ static inline void obd_ioctl_freedata(char *buf, int len)
 
 #define OBD_IOC_GET_MNTOPT             _IOW('f', 220, mntopt_t)
 
+#define OBD_IOC_ECHO_MD                _IOR('f', 221, struct obd_ioctl_data)
+#define OBD_IOC_ECHO_ALLOC_SEQ         _IOWR('f', 222, struct obd_ioctl_data)
 /* XXX _IOWR('f', 250, long) has been defined in
  * libcfs/include/libcfs/libcfs_private.h for debug, don't use it
  */
index 834f013..1f2c6eb 100644 (file)
@@ -2036,7 +2036,7 @@ int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
         struct client_obd *cli = &exp->exp_obd->u.cli;
         struct lu_client_seq *seq = cli->cl_seq;
         ENTRY;
-        RETURN(seq_client_alloc_fid(seq, fid));
+        RETURN(seq_client_alloc_fid(NULL, seq, fid));
 }
 
 /* XXX This method is used only to clear current fid seq
index 38ec54a..9365542 100644 (file)
@@ -1014,8 +1014,8 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         int rc;
         ENTRY;
 
-        LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
-                 PFID(mdd_object_fid(mdd_cobj)));
+        if (mdd_object_exists(mdd_cobj) <= 0)
+                RETURN(-ENOENT);
 
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
@@ -1647,7 +1647,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         else if (unlikely(rc < 0)) {
                 CERROR("Object "DFID" locates on remote server\n",
                         PFID(mdo2fid(mdd_obj)));
-                LBUG();
+                RETURN(-EINVAL);
         }
 
         /* The common filename length check. */
index f6444c3..956a04b 100644 (file)
@@ -4185,8 +4185,11 @@ static struct lu_device *mdt_layer_setup(struct lu_env *env,
         lu_device_get(d);
         lu_ref_add(&d->ld_reference, "lu-stack", &lu_site_init);
 
-        RETURN(d);
+        cfs_spin_lock(&d->ld_site->ls_ld_lock);
+        cfs_list_add_tail(&d->ld_linkage, &d->ld_site->ls_ld_linkage);
+        cfs_spin_unlock(&d->ld_site->ls_ld_lock);
 
+        RETURN(d);
 out_alloc:
         ldt->ldt_ops->ldto_device_free(env, d);
         type->typ_refcnt--;
index 930384e..9e65b3a 100644 (file)
@@ -76,6 +76,8 @@ static cfs_lock_class_key_t cl_lock_guard_class;
 /** Lock class of cl_object_header::coh_attr_guard */
 static cfs_lock_class_key_t cl_attr_guard_class;
 
+static __u32 cl_ctx_tags;
+static __u32 cl_ses_tags;
 /**
  * Initialize cl_object_header.
  */
@@ -750,7 +752,7 @@ static inline struct cl_env *cl_env_detach(struct cl_env *cle)
         return cle;
 }
 
-static struct lu_env *cl_env_new(__u32 tags, void *debug)
+static struct lu_env *cl_env_new(__u32 ctx_tags, __u32 ses_tags, void *debug)
 {
         struct lu_env *env;
         struct cl_env *cle;
@@ -762,9 +764,10 @@ static struct lu_env *cl_env_new(__u32 tags, void *debug)
                 CFS_INIT_LIST_HEAD(&cle->ce_linkage);
                 cle->ce_magic = &cl_env_init0;
                 env = &cle->ce_lu;
-                rc = lu_env_init(env, LCT_CL_THREAD|tags);
+                rc = lu_env_init(env, LCT_CL_THREAD|ctx_tags);
                 if (rc == 0) {
-                        rc = lu_context_init(&cle->ce_ses, LCT_SESSION|tags);
+                        rc = lu_context_init(&cle->ce_ses,
+                                             LCT_SESSION | ses_tags);
                         if (rc == 0) {
                                 lu_context_enter(&cle->ce_ses);
                                 env->le_ses = &cle->ce_ses;
@@ -792,6 +795,18 @@ static void cl_env_fini(struct cl_env *cle)
         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
 }
 
+void cl_set_ctx_tags(__u32 tags)
+{
+        cl_ctx_tags = tags;
+}
+EXPORT_SYMBOL(cl_set_ctx_tags);
+
+void cl_set_ses_tags(__u32 tags)
+{
+        cl_ses_tags = tags;
+}
+EXPORT_SYMBOL(cl_set_ses_tags);
+
 static struct lu_env *cl_env_obtain(void *debug)
 {
         struct cl_env *cle;
@@ -820,7 +835,7 @@ static struct lu_env *cl_env_obtain(void *debug)
                 }
         } else {
                 cfs_spin_unlock(&cl_envs_guard);
-                env = cl_env_new(0, debug);
+                env = cl_env_new(cl_ctx_tags, cl_ses_tags, debug);
         }
         RETURN(env);
 }
@@ -895,7 +910,7 @@ struct lu_env *cl_env_alloc(int *refcheck, __u32 tags)
         struct lu_env *env;
 
         LASSERT(cl_env_peek(refcheck) == NULL);
-        env = cl_env_new(tags, __builtin_return_address(0));
+        env = cl_env_new(tags, tags, __builtin_return_address(0));
         if (!IS_ERR(env)) {
                 struct cl_env *cle;
 
index 7768ec6..2ad22f0 100644 (file)
@@ -969,6 +969,13 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
         lu_device_get(top);
         lu_ref_add(&top->ld_reference, "site-top", s);
 
+        CFS_INIT_LIST_HEAD(&s->ls_ld_linkage);
+        cfs_spin_lock_init(&s->ls_ld_lock);
+
+        cfs_spin_lock(&s->ls_ld_lock);
+        cfs_list_add(&top->ld_linkage, &s->ls_ld_linkage);
+        cfs_spin_unlock(&s->ls_ld_lock);
+
         RETURN(0);
 }
 EXPORT_SYMBOL(lu_site_init);
@@ -1044,6 +1051,7 @@ int lu_device_init(struct lu_device *d, struct lu_device_type *t)
         cfs_atomic_set(&d->ld_ref, 0);
         d->ld_type = t;
         lu_ref_init(&d->ld_reference);
+        CFS_INIT_LIST_HEAD(&d->ld_linkage);
         return 0;
 }
 EXPORT_SYMBOL(lu_device_init);
index cd56684..669761b 100644 (file)
@@ -50,6 +50,8 @@
 #include <lustre_debug.h>
 #include <lprocfs_status.h>
 #include <cl_object.h>
+#include <lustre_fid.h>
+#include <lustre_acl.h>
 
 #include "echo_internal.h"
 
@@ -65,6 +67,8 @@ struct echo_device {
         struct cl_site         *ed_site;
         struct lu_device       *ed_next;
         int                     ed_next_islov;
+        int                     ed_next_ismd;
+        struct lu_client_seq   *ed_cl_seq;
 };
 
 struct echo_object {
@@ -107,7 +111,8 @@ struct echo_req {
 };
 #endif
 
-static int echo_client_setup(struct obd_device *obddev,
+static int echo_client_setup(const struct lu_env *env,
+                             struct obd_device *obddev,
                              struct lustre_cfg *lcfg);
 static int echo_client_cleanup(struct obd_device *obddev);
 
@@ -199,6 +204,13 @@ struct echo_thread_info {
         struct cl_io            eti_io;
         struct cl_lock_descr    eti_descr;
         struct lu_fid           eti_fid;
+        struct md_op_spec       eti_spec;
+        struct lov_mds_md_v3    eti_lmm;
+        struct lov_user_md_v3   eti_lum;
+        struct md_attr          eti_ma;
+        struct lu_name          eti_lname;
+        char                    eti_name[20];
+        char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
 };
 
 /* No session used right now */
@@ -471,8 +483,6 @@ static const struct cl_object_operations echo_cl_obj_ops = {
 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
                             const struct lu_object_conf *conf)
 {
-        const struct cl_object_conf *cconf = lu2cl_conf(conf);
-        struct echo_object_conf *econf = cl2echo_conf(cconf);
         struct echo_device *ed         = cl2echo_dev(lu2cl_dev(obj->lo_dev));
         struct echo_client_obd *ec     = ed->ed_ec;
         struct echo_object *eco        = cl2echo_obj(lu2cl(obj));
@@ -490,14 +500,21 @@ static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
                 lu_object_add(obj, below);
         }
 
-        LASSERT(econf->eoc_md);
-        eco->eo_lsm = *econf->eoc_md;
+        if (!ed->ed_next_ismd) {
+                const struct cl_object_conf *cconf = lu2cl_conf(conf);
+                struct echo_object_conf *econf = cl2echo_conf(cconf);
+
+                LASSERT(econf->eoc_md);
+                eco->eo_lsm = *econf->eoc_md;
+                /* clear the lsm pointer so that it won't get freed. */
+                *econf->eoc_md = NULL;
+        } else {
+                eco->eo_lsm = NULL;
+        }
+
         eco->eo_dev = ed;
         cfs_atomic_set(&eco->eo_npages, 0);
 
-        /* clear the lsm pointer so that it won't get freed. */
-        *econf->eoc_md = NULL;
-
         cfs_spin_lock(&ec->ec_lock);
         cfs_list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
         cfs_spin_unlock(&ec->ec_lock);
@@ -535,7 +552,6 @@ static int echo_object_print(const struct lu_env *env, void *cookie,
         return (*p)(env, cookie, "echoclient-object@%p", obj);
 }
 
-
 static const struct lu_object_operations echo_lu_obj_ops = {
         .loo_object_init      = echo_object_init,
         .loo_object_delete    = NULL,
@@ -580,6 +596,7 @@ static struct lu_object *echo_object_alloc(const struct lu_env *env,
 static struct lu_device_operations echo_device_lu_ops = {
         .ldo_object_alloc   = echo_object_alloc,
 };
+
 /** @} echo_lu_dev_ops */
 
 static struct cl_device_operations echo_device_cl_ops = {
@@ -614,7 +631,8 @@ static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
 {
         if (ed->ed_site) {
-                cl_site_fini(ed->ed_site);
+                if (!ed->ed_next_ismd)
+                        cl_site_fini(ed->ed_site);
                 ed->ed_site = NULL;
         }
 }
@@ -681,6 +699,55 @@ static struct lu_context_key echo_session_key = {
 
 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
 
+#define ECHO_SEQ_WIDTH 0xffffffff
+static int echo_fid_init(struct echo_device *ed, char *obd_name,
+                         struct md_site *ms)
+{
+        char *prefix;
+        int rc;
+        ENTRY;
+
+        OBD_ALLOC_PTR(ed->ed_cl_seq);
+        if (ed->ed_cl_seq == NULL)
+                RETURN(-ENOMEM);
+
+        OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+        if (prefix == NULL)
+                GOTO(out_free_seq, rc = -ENOMEM);
+
+        snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
+
+        /* Init client side sequence-manager */
+        rc = seq_client_init(ed->ed_cl_seq, NULL,
+                             LUSTRE_SEQ_METADATA,
+                             prefix, ms->ms_server_seq);
+        ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
+        OBD_FREE(prefix, MAX_OBD_NAME + 5);
+        if (rc)
+                GOTO(out_free_seq, rc);
+
+        RETURN(0);
+
+out_free_seq:
+        OBD_FREE_PTR(ed->ed_cl_seq);
+        ed->ed_cl_seq = NULL;
+        RETURN(rc);
+}
+
+static int echo_fid_fini(struct obd_device *obddev)
+{
+        struct echo_device *ed = obd2echo_dev(obddev);
+        ENTRY;
+
+        if (ed->ed_cl_seq != NULL) {
+                seq_client_fini(ed->ed_cl_seq);
+                OBD_FREE_PTR(ed->ed_cl_seq);
+                ed->ed_cl_seq = NULL;
+        }
+
+        RETURN(0);
+}
+
 static struct lu_device *echo_device_alloc(const struct lu_env *env,
                                            struct lu_device_type *t,
                                            struct lustre_cfg *cfg)
@@ -709,55 +776,115 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
         cd->cd_ops = &echo_device_cl_ops;
 
         cleanup = 2;
-        rc = echo_site_init(env, ed);
-        if (rc)
+        obd = class_name2obd(lustre_cfg_string(cfg, 0));
+        LASSERT(obd != NULL);
+        LASSERT(env != NULL);
+
+        tgt = class_name2obd(lustre_cfg_string(cfg, 1));
+        if (tgt == NULL) {
+                CERROR("Can not find tgt device %s\n",
+                        lustre_cfg_string(cfg, 1));
                 GOTO(out, rc);
+        }
 
+        next = tgt->obd_lu_dev;
+        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
+                ed->ed_next_ismd = 1;
+        } else {
+                ed->ed_next_ismd = 0;
+                rc = echo_site_init(env, ed);
+                if (rc)
+                        GOTO(out, rc);
+        }
         cleanup = 3;
-        obd = class_name2obd(lustre_cfg_string(cfg, 0));
-        LASSERT(obd != NULL);
-        rc = echo_client_setup(obd, cfg);
+
+        rc = echo_client_setup(env, obd, cfg);
         if (rc)
                 GOTO(out, rc);
-        ed->ed_ec = &obd->u.echo_client;
 
+        ed->ed_ec = &obd->u.echo_client;
         cleanup = 4;
-        tgt = class_name2obd(lustre_cfg_string(cfg, 1));
-        LASSERT(tgt != NULL);
-        next = tgt->obd_lu_dev;
-        if (next != NULL && !lu_device_is_cl(next))
-                next = NULL;
 
-        /*
-         * if echo client is to be stacked upon ost device, the next is NULL
-         * since ost is not a clio device so far
-         */
-        tgt_type_name = tgt->obd_type->typ_name;
-        if (next != NULL) {
-                LASSERT(next != NULL);
-                if (next->ld_site != NULL)
-                        GOTO(out, rc = -EBUSY);
-
-                next->ld_site = &ed->ed_site->cs_lu;
-                rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
-                                             next->ld_type->ldt_name, NULL);
-                if (rc)
-                        GOTO(out, rc);
+        if (ed->ed_next_ismd) {
+                /* Suppose to connect to some Metadata layer */
+                struct lu_site *ls;
+                struct lu_device *ld;
+                int    found = 0;
 
-                /* Trikcy case, I have to determine the obd type since clio
-                 * uses the different parameters to initialize objects for
-                 * lov & osc.
-                 */
-                if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
-                        ed->ed_next_islov = 1;
-                else
-                        LASSERT(strcmp(tgt_type_name, LUSTRE_OSC_NAME) == 0);
-        } else
-                LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
+                if (next == NULL) {
+                        CERROR("%s is not lu device type!\n",
+                               lustre_cfg_string(cfg, 1));
+                        GOTO(out, rc = -EINVAL);
+                }
+
+                tgt_type_name = lustre_cfg_string(cfg, 2);
+                if (!tgt_type_name) {
+                        CERROR("%s no type name for echo %s setup\n",
+                                lustre_cfg_string(cfg, 1),
+                                tgt->obd_type->typ_name);
+                        GOTO(out, rc = -EINVAL);
+                }
+
+                ls = next->ld_site;
+
+                cfs_spin_lock(&ls->ls_ld_lock);
+                cfs_list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
+                        if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
+                                found = 1;
+                                break;
+                        }
+                }
+                cfs_spin_unlock(&ls->ls_ld_lock);
+
+                if (found == 0) {
+                        CERROR("%s is not lu device type!\n",
+                               lustre_cfg_string(cfg, 1));
+                        GOTO(out, rc = -EINVAL);
+                }
+
+                next = ld;
+                /* For MD echo client, it will use the site in MDS stack */
+                ed->ed_site_myself.cs_lu = *ls;
+                ed->ed_site = &ed->ed_site_myself;
+                ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu;
+                rc = echo_fid_init(ed, obd->obd_name, lu_site2md(ls));
+                if (rc) {
+                        CERROR("echo fid init error %d\n", rc);
+                        GOTO(out, rc);
+                }
+        } else {
+                 /* if echo client is to be stacked upon ost device, the next is
+                  * NULL since ost is not a clio device so far */
+                if (next != NULL && !lu_device_is_cl(next))
+                        next = NULL;
+
+                tgt_type_name = tgt->obd_type->typ_name;
+                if (next != NULL) {
+                        LASSERT(next != NULL);
+                        if (next->ld_site != NULL)
+                                GOTO(out, rc = -EBUSY);
+
+                        next->ld_site = &ed->ed_site->cs_lu;
+                        rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
+                                                     next->ld_type->ldt_name,
+                                                     NULL);
+                        if (rc)
+                                GOTO(out, rc);
+
+                        /* Tricky case, I have to determine the obd type since
+                         * CLIO uses the different parameters to initialize
+                         * objects for lov & osc. */
+                        if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
+                                ed->ed_next_islov = 1;
+                        else
+                                LASSERT(strcmp(tgt_type_name,
+                                               LUSTRE_OSC_NAME) == 0);
+                } else
+                        LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
+        }
 
         ed->ed_next = next;
         RETURN(&cd->cd_lu_dev);
-
 out:
         switch(cleanup) {
         case 4: {
@@ -794,7 +921,7 @@ static struct lu_device *echo_device_fini(const struct lu_env *env,
         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
         struct lu_device *next = ed->ed_next;
 
-        while (next)
+        while (next && !ed->ed_next_ismd)
                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
         return NULL;
 }
@@ -828,7 +955,6 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
                ed, next);
 
-        LASSERT(ed->ed_site);
         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
 
         /* check if there are objects still alive.
@@ -855,6 +981,7 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
                        "wait for 1 second\n");
                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
                                                    cfs_time_seconds(1));
+                lu_site_purge(env, &ed->ed_site->cs_lu, -1);
                 cfs_spin_lock(&ec->ec_lock);
         }
         cfs_spin_unlock(&ec->ec_lock);
@@ -864,8 +991,9 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
         CDEBUG(D_INFO, "No object exists, exiting...\n");
 
         echo_client_cleanup(d->ld_obd);
+        echo_fid_fini(d->ld_obd);
 
-        while (next)
+        while (next && !ed->ed_next_ismd)
                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
 
         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
@@ -893,7 +1021,7 @@ static struct lu_device_type echo_device_type = {
         .ldt_tags     = LU_DEVICE_CL,
         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
         .ldt_ops      = &echo_device_type_ops,
-        .ldt_ctx_tags = LCT_CL_THREAD
+        .ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
 };
 /** @} echo_init */
 
@@ -1283,6 +1411,645 @@ echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
         return (0);
 }
 
+static inline void echo_md_build_name(struct lu_name *lname, char *name,
+                                      __u64 id)
+{
+        sprintf(name, "%llu", id);
+        lname->ln_name = name;
+        lname->ln_namelen = strlen(name);
+}
+
+static int echo_md_create_internal(const struct lu_env *env,
+                                   struct echo_device *ed,
+                                   struct md_object *parent,
+                                   struct lu_fid *fid,
+                                   struct lu_name *lname,
+                                   struct md_op_spec *spec,
+                                   struct md_attr *ma)
+{
+        struct lu_object        *ec_child, *child;
+        struct lu_device        *ld = ed->ed_next;
+        int                      rc;
+
+        ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
+                                     fid, NULL);
+        if (IS_ERR(ec_child)) {
+                CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
+                        PTR_ERR(ec_child));
+                return PTR_ERR(ec_child);
+        }
+
+        child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+        if (child == NULL) {
+                CERROR("Can not locate the child "DFID"\n", PFID(fid));
+                GOTO(out_put, rc = -EINVAL);
+        }
+
+        CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
+               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
+
+        rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
+        if (rc) {
+                CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
+                GOTO(out_put, rc);
+        }
+        CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc  = %d\n",
+               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
+out_put:
+        lu_object_put(env, ec_child);
+        return rc;
+}
+
+static int echo_set_lmm_size(const struct lu_env *env,
+                             struct lu_device *ld,
+                             struct md_attr *ma,
+                             int *max_lmm_size)
+{
+        struct echo_thread_info *info = echo_env_info(env);
+        struct md_device *md = lu2md_dev(ld);
+        int tmp, rc;
+        ENTRY;
+
+        LASSERT(max_lmm_size != NULL);
+        if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
+                ma->ma_lmm = (void *)&info->eti_lmm;
+                ma->ma_lmm_size = sizeof(info->eti_lmm);
+                *max_lmm_size = 0;
+                RETURN(0);
+        }
+
+        md = lu2md_dev(ld);
+        rc = md->md_ops->mdo_maxsize_get(env, md,
+                                         max_lmm_size, &tmp);
+        if (rc)
+                RETURN(rc);
+
+        if (*max_lmm_size == 0)
+                /* In case xattr is set in echo_setattr_object */
+                *max_lmm_size = sizeof(struct lov_user_md_v3);
+
+        ma->ma_lmm_size = *max_lmm_size;
+        OBD_ALLOC(ma->ma_lmm, ma->ma_lmm_size);
+        if (ma->ma_lmm == NULL)
+                RETURN(-ENOMEM);
+
+        RETURN(0);
+}
+
+static int echo_create_md_object(const struct lu_env *env,
+                                 struct echo_device *ed,
+                                 struct lu_object *ec_parent,
+                                 struct lu_fid *fid,
+                                 char *name, int namelen,
+                                 __u64 id, __u32 mode, int count,
+                                 int stripe_count, int stripe_offset)
+{
+        struct lu_object        *parent;
+        struct echo_thread_info *info = echo_env_info(env);
+        struct lu_name          *lname = &info->eti_lname;
+        struct md_op_spec       *spec = &info->eti_spec;
+        struct md_attr          *ma = &info->eti_ma;
+        struct lu_device        *ld = ed->ed_next;
+        int                      rc = 0;
+        int                      max_lmm_size = 0;
+        int                      i;
+
+        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+        if (ec_parent == NULL) {
+                lu_object_put(env, ec_parent);
+                RETURN(PTR_ERR(parent));
+        }
+
+        memset(ma, 0, sizeof(*ma));
+        memset(spec, 0, sizeof(*spec));
+        if (stripe_count != 0) {
+                spec->sp_cr_flags |= FMODE_WRITE;
+                rc = echo_set_lmm_size(env, ld, ma, &max_lmm_size);
+                if (rc)
+                        GOTO(out_free, rc);
+                if (stripe_count != -1) {
+                        struct lov_user_md_v3 *lum = &info->eti_lum;
+                        lum->lmm_magic = LOV_USER_MAGIC_V3;
+                        lum->lmm_stripe_count = stripe_count;
+                        lum->lmm_stripe_offset = stripe_offset;
+                        lum->lmm_pattern = 0;
+                        spec->u.sp_ea.eadata = lum;
+                        spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
+                }
+        }
+
+        ma->ma_attr.la_mode = mode;
+        ma->ma_attr.la_valid = LA_CTIME;
+        ma->ma_attr.la_ctime = cfs_time_current_64();
+
+        if (name != NULL) {
+                lname->ln_name = name;
+                lname->ln_namelen = namelen;
+                /* If name is specified, only create one object by name */
+                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
+                                             spec, ma);
+                GOTO(out_free, rc);
+        }
+
+        /* Create multiple object sequenced by id */
+        for (i = 0; i < count; i++) {
+                char *tmp_name = info->eti_name;
+
+                echo_md_build_name(lname, tmp_name, id);
+
+                rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
+                                             spec, ma);
+                if (rc) {
+                        CERROR("Can not create child %s: rc = %d\n", tmp_name,
+                                rc);
+                        break;
+                }
+                id++;
+                fid->f_oid++;
+        }
+
+out_free:
+        if (!strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME) &&
+             max_lmm_size > 0  && ma->ma_lmm != NULL)
+                OBD_FREE(ma->ma_lmm, max_lmm_size);
+
+        return rc;
+}
+
+static struct lu_object *echo_md_lookup(const struct lu_env *env,
+                                        struct echo_device *ed,
+                                        struct md_object *parent,
+                                        struct lu_name *lname)
+{
+        struct echo_thread_info *info = echo_env_info(env);
+        struct lu_fid           *fid = &info->eti_fid;
+        struct lu_object        *child;
+        int    rc;
+        ENTRY;
+
+        CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
+               PFID(fid), parent);
+        rc = mdo_lookup(env, parent, lname, fid, NULL);
+        if (rc) {
+                CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
+                RETURN(ERR_PTR(rc));
+        }
+
+        child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
+
+        RETURN(child);
+}
+
+static int echo_setattr_object(const struct lu_env *env,
+                               struct echo_device *ed,
+                               struct lu_object *ec_parent,
+                               __u64 id, int count)
+{
+        struct lu_object        *parent;
+        struct echo_thread_info *info = echo_env_info(env);
+        struct lu_name          *lname = &info->eti_lname;
+        char                    *name = info->eti_name;
+        struct md_attr          *ma = &info->eti_ma;
+        struct lu_device        *ld = ed->ed_next;
+        struct lov_user_md_v3   *lum = &info->eti_lum;
+        int                      rc = 0;
+        int                      i;
+
+        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+        if (ec_parent == NULL) {
+                lu_object_put(env, ec_parent);
+                return PTR_ERR(parent);
+        }
+
+        memset(ma, 0, sizeof(*ma));
+        lum->lmm_magic = LOV_USER_MAGIC_V3;
+        lum->lmm_stripe_count = 1;
+        lum->lmm_stripe_offset = -1;
+        lum->lmm_pattern = 0;
+
+        ma->ma_lmm = (struct lov_mds_md *)lum;
+        ma->ma_lmm_size = sizeof(*lum);
+        ma->ma_valid = MA_LOV | MA_HSM;
+        for (i = 0; i < count; i++) {
+                struct lu_object *ec_child, *child;
+
+                echo_md_build_name(lname, name, id);
+
+                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
+                if (IS_ERR(ec_child)) {
+                        CERROR("Can't find child %s: rc = %ld\n",
+                                lname->ln_name, PTR_ERR(ec_child));
+                        RETURN(PTR_ERR(ec_child));
+                }
+
+                child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+                if (child == NULL) {
+                        CERROR("Can not locate the child %s\n", lname->ln_name);
+                        lu_object_put(env, ec_child);
+                        rc = -EINVAL;
+                        break;
+                }
+
+                CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
+                       PFID(lu_object_fid(child)));
+                rc = mo_attr_set(env, lu2md(child), ma);
+                if (rc) {
+                        CERROR("Can not getattr child "DFID": rc = %d\n",
+                                PFID(lu_object_fid(child)), rc);
+                        lu_object_put(env, ec_child);
+                        break;
+                }
+                CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
+                       PFID(lu_object_fid(child)));
+                id++;
+                lu_object_put(env, ec_child);
+        }
+        return rc;
+}
+
+static int echo_getattr_object(const struct lu_env *env,
+                               struct echo_device *ed,
+                               struct lu_object *ec_parent,
+                               __u64 id, int count)
+{
+        struct lu_object        *parent;
+        struct echo_thread_info *info = echo_env_info(env);
+        struct lu_name          *lname = &info->eti_lname;
+        char                    *name = info->eti_name;
+        struct md_attr          *ma = &info->eti_ma;
+        struct lu_device        *ld = ed->ed_next;
+        int                      max_lmm_size;
+        int                      rc = 0;
+        int                      i;
+
+        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+        if (ec_parent == NULL) {
+                lu_object_put(env, ec_parent);
+                return PTR_ERR(parent);
+        }
+
+        memset(ma, 0, sizeof(*ma));
+        rc = echo_set_lmm_size(env, ld, ma, &max_lmm_size);
+        if (rc)
+                GOTO(out_free, rc);
+
+        ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
+        ma->ma_acl = info->eti_xattr_buf;
+        ma->ma_acl_size = sizeof(info->eti_xattr_buf);
+
+        for (i = 0; i < count; i++) {
+                struct lu_object *ec_child, *child;
+
+                echo_md_build_name(lname, name, id);
+
+                ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
+                if (IS_ERR(ec_child)) {
+                        CERROR("Can't find child %s: rc = %ld\n",
+                               lname->ln_name, PTR_ERR(ec_child));
+                        RETURN(PTR_ERR(ec_child));
+                }
+
+                child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+                if (child == NULL) {
+                        CERROR("Can not locate the child %s\n", lname->ln_name);
+                        lu_object_put(env, ec_child);
+                        GOTO(out_free, rc = -EINVAL);
+                }
+
+                CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
+                       PFID(lu_object_fid(child)));
+                rc = mo_attr_get(env, lu2md(child), ma);
+                if (rc) {
+                        CERROR("Can not getattr child "DFID": rc = %d\n",
+                                PFID(lu_object_fid(child)), rc);
+                        lu_object_put(env, ec_child);
+                        break;
+                }
+                CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
+                       PFID(lu_object_fid(child)));
+                id++;
+                lu_object_put(env, ec_child);
+        }
+
+out_free:
+        if (!strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME) &&
+             max_lmm_size > 0 && ma->ma_lmm)
+                OBD_FREE(ma->ma_lmm, max_lmm_size);
+
+        return rc;
+}
+
+static int echo_lookup_object(const struct lu_env *env,
+                              struct echo_device *ed,
+                              struct lu_object *ec_parent,
+                              __u64 id, int count)
+{
+        struct lu_object        *parent;
+        struct echo_thread_info *info = echo_env_info(env);
+        struct lu_name          *lname = &info->eti_lname;
+        char                    *name = info->eti_name;
+        struct lu_fid           *fid = &info->eti_fid;
+        struct lu_device        *ld = ed->ed_next;
+        int                      rc = 0;
+        int                      i;
+
+        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+        if (ec_parent == NULL) {
+                lu_object_put(env, ec_parent);
+                return PTR_ERR(parent);
+        }
+
+        /*prepare the requests*/
+        for (i = 0; i < count; i++) {
+                echo_md_build_name(lname, name, id);
+
+                CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
+                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+                rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
+                if (rc) {
+                        CERROR("Can not lookup child %s: rc = %d\n", name, rc);
+                        break;
+                }
+                CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
+                       PFID(lu_object_fid(parent)), lname->ln_name, parent);
+
+                id++;
+        }
+        return rc;
+}
+
+static int echo_md_destroy_internal(const struct lu_env *env,
+                                    struct echo_device *ed,
+                                    struct md_object *parent,
+                                    struct lu_name *lname,
+                                    struct md_attr *ma)
+{
+        struct lu_device   *ld = ed->ed_next;
+        struct lu_object   *ec_child;
+        struct lu_object   *child;
+        int                 rc;
+
+        ec_child = echo_md_lookup(env, ed, parent, lname);
+        if (IS_ERR(ec_child)) {
+                CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
+                        PTR_ERR(ec_child));
+                RETURN(PTR_ERR(ec_child));
+        }
+
+        child = lu_object_locate(ec_child->lo_header, ld->ld_type);
+        if (child == NULL) {
+                CERROR("Can not locate the child %s\n", lname->ln_name);
+                GOTO(out_put, rc = -EINVAL);
+        }
+
+        CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
+               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
+
+        rc = mdo_unlink(env, parent, lu2md(child), lname, ma);
+        if (rc) {
+                CERROR("Can not unlink child %s: rc = %d\n",
+                        lname->ln_name, rc);
+                GOTO(out_put, rc);
+        }
+        CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
+               PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
+out_put:
+        lu_object_put(env, ec_child);
+        return rc;
+}
+
+static int echo_destroy_object(const struct lu_env *env,
+                               struct echo_device *ed,
+                               struct lu_object *ec_parent,
+                               char *name, int namelen,
+                               __u64 id, __u32 mode,
+                               int count)
+{
+        struct echo_thread_info *info = echo_env_info(env);
+        struct lu_name          *lname = &info->eti_lname;
+        struct md_attr          *ma = &info->eti_ma;
+        struct lu_device        *ld = ed->ed_next;
+        struct lu_object        *parent;
+        int                      rc = 0;
+        int                      max_lmm_size = 0;
+        int                      i;
+        ENTRY;
+
+        parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
+        if (parent == NULL)
+                RETURN(-EINVAL);
+
+        memset(ma, 0, sizeof(*ma));
+        ma->ma_attr.la_mode = mode;
+        ma->ma_attr.la_valid = LA_CTIME;
+        ma->ma_attr.la_ctime = cfs_time_current_64();
+        ma->ma_need = MA_INODE;
+        ma->ma_valid = 0;
+
+        rc = echo_set_lmm_size(env, ld, ma, &max_lmm_size);
+        if (rc)
+                GOTO(out_free, rc);
+
+        /*FIXME: Do not need logcookie for now, and check stripes*/
+        ma->ma_cookie = NULL;
+        ma->ma_cookie_size = 0;
+        ma->ma_need = MA_INODE | MA_LOV;
+        ma->ma_valid = 0;
+
+        if (name != NULL) {
+                lname->ln_name = name;
+                lname->ln_namelen = namelen;
+                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
+                                              ma);
+                GOTO(out_free, rc);
+        }
+
+        /*prepare the requests*/
+        for (i = 0; i < count; i++) {
+                char *tmp_name = info->eti_name;
+
+                echo_md_build_name(lname, tmp_name, id);
+
+                rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
+                                              ma);
+                if (rc) {
+                        CERROR("Can not unlink child %s: rc = %d\n", name, rc);
+                        break;
+                }
+                id++;
+        }
+
+out_free:
+        if (!strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME) &&
+             max_lmm_size > 0 && ma->ma_lmm)
+                OBD_FREE(ma->ma_lmm, max_lmm_size);
+
+        RETURN(rc);
+}
+
+struct lu_object *echo_resolve_path(const struct lu_env *env,
+                                    struct echo_device *ed, char *path,
+                                    int path_len)
+{
+        struct lu_device        *ld = ed->ed_next;
+        struct md_device        *md = lu2md_dev(ld);
+        struct echo_thread_info *info = echo_env_info(env);
+        struct lu_fid           *fid = &info->eti_fid;
+        struct lu_name          *lname = &info->eti_lname;
+        struct lu_object        *parent = NULL;
+        struct lu_object        *child = NULL;
+        int rc = 0;
+        ENTRY;
+
+        /*Only support MDD layer right now*/
+        LASSERT(!strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME));
+
+        rc = md->md_ops->mdo_root_get(env, md, fid);
+        if (rc) {
+                CERROR("get root error: rc = %d\n", rc);
+                RETURN(ERR_PTR(rc));
+        }
+
+        parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
+        if (IS_ERR(parent)) {
+                CERROR("Can not find the parent "DFID": rc = %ld\n",
+                        PFID(fid), PTR_ERR(parent));
+                RETURN(parent);
+        }
+
+        while (1) {
+                struct lu_object *ld_parent;
+                char *e;
+
+                e = strsep(&path, "/");
+                if (e == NULL)
+                        break;
+
+                if (e[0] == 0) {
+                        if (!path || path[0] == '\0')
+                                break;
+                        continue;
+                }
+
+                lname->ln_name = e;
+                lname->ln_namelen = strlen(e);
+
+                ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
+                if (ld_parent == NULL) {
+                        lu_object_put(env, parent);
+                        rc = -EINVAL;
+                        break;
+                }
+
+                child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
+                lu_object_put(env, parent);
+                if (IS_ERR(child)) {
+                        rc = (int)PTR_ERR(child);
+                        CERROR("lookup %s under parent "DFID": rc = %d\n",
+                                lname->ln_name, PFID(lu_object_fid(ld_parent)),
+                                rc);
+                        break;
+                }
+                parent = child;
+        }
+        if (rc)
+                RETURN(ERR_PTR(rc));
+
+        RETURN(parent);
+}
+
+static int echo_md_handler(struct echo_device *ed, int command,
+                           char *path, int path_len, int id, int count,
+                           struct obd_ioctl_data *data)
+{
+        struct lu_device      *ld = ed->ed_next;
+        struct lu_env         *env;
+        int                    refcheck;
+        struct lu_object      *parent;
+        char                  *name = NULL;
+        int                    namelen = data->ioc_plen2;
+        int rc = 0;
+        ENTRY;
+
+        if (ld == NULL) {
+                CERROR("MD echo client is not being initialized properly\n");
+                RETURN(-EINVAL);
+        }
+
+        if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
+                CERROR("Only support MDD layer right now!\n");
+                RETURN(-EINVAL);
+        }
+
+        env = cl_env_get(&refcheck);
+        if (IS_ERR(env))
+                RETURN(PTR_ERR(env));
+        lu_env_refill(env);
+
+        parent = echo_resolve_path(env, ed, path, path_len);
+        if (IS_ERR(parent)) {
+                CERROR("Can not resolve the path %s: rc = %ld\n", path,
+                        PTR_ERR(parent));
+                cl_env_put(env, &refcheck);
+                RETURN(PTR_ERR(parent));
+        }
+
+        if (namelen > 0) {
+                OBD_ALLOC(name, namelen + 1);
+                if (name == NULL)
+                        RETURN(-ENOMEM);
+                if (cfs_copy_from_user(name, data->ioc_pbuf2, namelen)) {
+                        OBD_FREE(name, namelen + 1);
+                        RETURN(-EFAULT);
+                }
+        }
+
+        switch (command) {
+        case ECHO_MD_CREATE:
+        case ECHO_MD_MKDIR: {
+                struct echo_thread_info *info = echo_env_info(env);
+                __u32 mode = data->ioc_obdo2.o_mode;
+                struct lu_fid *fid = &info->eti_fid;
+                int stripe_count = (int)data->ioc_obdo2.o_misc;
+                int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
+
+                fid->f_seq = data->ioc_obdo1.o_seq;
+                fid->f_oid = (__u32)data->ioc_obdo1.o_id;
+                fid->f_ver = 0;
+                rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
+                                           id, mode, count, stripe_count,
+                                           stripe_index);
+                break;
+        }
+        case ECHO_MD_DESTROY:
+        case ECHO_MD_RMDIR: {
+                __u32 mode = data->ioc_obdo2.o_mode;
+
+                rc = echo_destroy_object(env, ed, parent, name, namelen,
+                                         id, mode, count);
+                break;
+        }
+        case ECHO_MD_LOOKUP:
+                rc = echo_lookup_object(env, ed, parent, id, count);
+                break;
+        case ECHO_MD_GETATTR:
+                rc = echo_getattr_object(env, ed, parent, id, count);
+                break;
+        case ECHO_MD_SETATTR:
+                rc = echo_setattr_object(env, ed, parent, id, count);
+                break;
+        default:
+                CERROR("unknown command %d\n", command);
+                rc = -EINVAL;
+                break;
+        }
+        if (name != NULL)
+                OBD_FREE(name, namelen + 1);
+        lu_object_put(env, parent);
+        cl_env_put(env, &refcheck);
+        return rc;
+}
+
 static int echo_create_object(struct echo_device *ed, int on_target,
                               struct obdo *oa, void *ulsm, int ulsm_nob,
                               struct obd_trans_info *oti)
@@ -1303,7 +2070,7 @@ static int echo_create_object(struct echo_device *ed, int on_target,
 
         rc = obd_alloc_memmd(ec->ec_exp, &lsm);
         if (rc < 0) {
-                CERROR("Cannot allocate md, rc = %d\n", rc);
+                CERROR("Cannot allocate md: rc = %d\n", rc);
                 GOTO(failed, rc);
         }
 
@@ -1346,7 +2113,7 @@ static int echo_create_object(struct echo_device *ed, int on_target,
                         (oa->o_seq == FID_SEQ_ECHO));
                 rc = obd_create(ec->ec_exp, oa, &lsm, oti);
                 if (rc != 0) {
-                        CERROR("Cannot create objects, rc = %d\n", rc);
+                        CERROR("Cannot create objects: rc = %d\n", rc);
                         GOTO(failed, rc);
                 }
                 created = 1;
@@ -1370,7 +2137,7 @@ static int echo_create_object(struct echo_device *ed, int on_target,
         if (lsm)
                 obd_free_memmd(ec->ec_exp, &lsm);
         if (rc)
-                CERROR("create object failed with rc = %d\n", rc);
+                CERROR("create object failed with: rc = %d\n", rc);
         return (rc);
 }
 
@@ -1839,6 +2606,65 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
                                          &dummy_oti);
                 GOTO(out, rc);
 
+        case OBD_IOC_ECHO_MD: {
+                int count;
+                int cmd;
+                char *dir = NULL;
+                int dirlen;
+                __u64 id;
+
+                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                        GOTO(out, rc = -EPERM);
+
+                count = data->ioc_count;
+                cmd = data->ioc_command;
+
+                id = data->ioc_obdo2.o_id;
+
+                dirlen = data->ioc_plen1;
+                OBD_ALLOC(dir, dirlen + 1);
+                if (dir == NULL)
+                        GOTO(out, rc = -ENOMEM);
+
+                if (cfs_copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
+                        OBD_FREE(dir, data->ioc_plen1 + 1);
+                        GOTO(out, rc = -EFAULT);
+                }
+
+                rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
+                OBD_FREE(dir, dirlen + 1);
+                GOTO(out, rc);
+        }
+        case OBD_IOC_ECHO_ALLOC_SEQ: {
+                struct lu_env   *env;
+                int              refcheck;
+                __u64            seq;
+                int              max_count;
+
+                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+                        GOTO(out, rc = -EPERM);
+
+                env = cl_env_get(&refcheck);
+                if (IS_ERR(env))
+                        GOTO(out, rc = PTR_ERR(env));
+                lu_env_refill(env);
+                rc = seq_client_get_seq(env, ed->ed_cl_seq, &seq);
+                cl_env_put(env, &refcheck);
+                if (rc < 0) {
+                        CERROR("%s: Can not alloc seq: rc = %d\n",
+                               obd->obd_name, rc);
+                        GOTO(out, rc);
+                }
+
+                if (cfs_copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
+                        return -EFAULT;
+
+                max_count = LUSTRE_SEQ_MAX_WIDTH;
+                if (cfs_copy_to_user(data->ioc_pbuf2, &max_count,
+                                     data->ioc_plen2))
+                        return -EFAULT;
+                GOTO(out, rc);
+        }
         case OBD_IOC_DESTROY:
                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
                         GOTO (out, rc = -EPERM);
@@ -1952,7 +2778,8 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
         return rc;
 }
 
-static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
+static int echo_client_setup(const struct lu_env *env,
+                             struct obd_device *obddev, struct lustre_cfg *lcfg)
 {
         struct echo_client_obd *ec = &obddev->u.echo_client;
         struct obd_device *tgt;
@@ -1979,6 +2806,12 @@ static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         ec->ec_unique = 0;
         ec->ec_nstripes = 0;
 
+        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
+                cl_set_ctx_tags(LCT_REMEMBER | LCT_NOREF | LCT_MD_THREAD);
+                cl_set_ses_tags(LCT_SESSION | LCT_REMEMBER | LCT_NOREF);
+                RETURN(0);
+        }
+
         OBD_ALLOC(ocd, sizeof(*ocd));
         if (ocd == NULL) {
                 CERROR("Can't alloc ocd connecting to %s\n",
@@ -1992,7 +2825,7 @@ static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         ocd->ocd_version = LUSTRE_VERSION_CODE;
         ocd->ocd_group = FID_SEQ_ECHO;
 
-        rc = obd_connect(NULL, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
+        rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
         if (rc == 0) {
                 /* Turn off pinger because it connects to tgt obd directly. */
                 cfs_spin_lock(&tgt->obd_dev_lock);
@@ -2013,10 +2846,15 @@ static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
 
 static int echo_client_cleanup(struct obd_device *obddev)
 {
+        struct echo_device *ed = obd2echo_dev(obddev);
         struct echo_client_obd *ec = &obddev->u.echo_client;
         int rc;
         ENTRY;
 
+        /*Do nothing for Metadata echo client*/
+        if (!ed || ed->ed_next_ismd)
+                RETURN(0);
+
         if (!cfs_list_empty(&obddev->obd_exports)) {
                 CERROR("still has clients!\n");
                 RETURN(-EBUSY);
index 45f92aa..a8111eb 100644 (file)
@@ -438,7 +438,7 @@ static int osd_fid_lookup(const struct lu_env *env,
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
-        LASSERT(fid_is_sane(fid) || osd_fid_is_root(fid));
+        LASSERTF(fid_is_sane(fid) || osd_fid_is_root(fid), DFID, PFID(fid));
         /*
          * This assertion checks that osd layer sees only local
          * fids. Unfortunately it is somewhat expensive (does a
index f6c234b..7e13022 100644 (file)
@@ -34,19 +34,19 @@ endif # UTILS
 lib_LIBRARIES = liblustreapi.a libiam.a
 
 lctl_SOURCES = obd.c lustre_cfg.c lctl.c obdctl.h
-lctl_LDADD := $(LIBREADLINE) liblustreapi.a $(LIBPTLCTL)
+lctl_LDADD := $(LIBREADLINE) liblustreapi.a $(LIBPTLCTL) $(PTHREAD_LIBS)
 lctl_DEPENDENCIES := $(LIBPTLCTL) liblustreapi.a
 
 lfs_SOURCES = lfs.c obd.c lustre_cfg.c
-lfs_LDADD := $(LIBREADLINE) liblustreapi.a $(LIBPTLCTL)
-lfs_DEPENDENCIES := $(LIBPTLCTL) liblustreapi.a 
+lfs_LDADD := $(LIBREADLINE) liblustreapi.a $(LIBPTLCTL) $(PTHREAD_LIBS)
+lfs_DEPENDENCIES := $(LIBPTLCTL) liblustreapi.a
 
 loadgen_SOURCES = loadgen.c lustre_cfg.c obd.c
 loadgen_LDADD := $(LIBREADLINE) liblustreapi.a $(LIBPTLCTL) $(PTHREAD_LIBS)
 loadgen_DEPENDENCIES := $(LIBPTLCTL) liblustreapi.a
 
 lustre_rsync_SOURCES = lustre_rsync.c obd.c lustre_cfg.c lustre_rsync.h
-lustre_rsync_LDADD :=  $(LIBREADLINE) liblustreapi.a $(LIBPTLCTL)
+lustre_rsync_LDADD := $(LIBREADLINE) liblustreapi.a $(LIBPTLCTL) $(PTHREAD_LIBS)
 lustre_rsync_DEPENDENCIES := $(LIBPTLCTL) liblustreapi.a
 
 ll_recover_lost_found_objs_SOURCES = ll_recover_lost_found_objs.c
@@ -59,7 +59,7 @@ if EXT2FS_DEVEL
 EXT2FSLIB = -lext2fs
 E2PLIB = -le2p
 else
-E2PLIB = 
+E2PLIB =
 EXT2FSLIB =
 endif
 
index 8e08071..4278985 100644 (file)
@@ -280,13 +280,43 @@ command_t cmdlist[] = {
          "Omitting the count means indefinitely, 0 means restore, "
          "otherwise fail 'count' messages.\n"
          "usage: fail nid|_all_ [count]"},
+
+        /*Test commands for echo client*/
+        {"test_create", jt_obd_test_create, 0,
+         "create files on MDT by echo client\n"
+         "usage: test_create [-d parent_basedir] <-D parent_count> "
+         "[-b child_base_id] <-c stripe_count> <-n count> <-t time>\n"},
+        {"test_mkdir", jt_obd_test_mkdir, 0,
+         "mkdir on MDT by echo client\n"
+         "usage: test_mkdir [-d parent_basedir] <-D parent_count>"
+         "[-b child_base_id] [-n count] <-t time>\n"},
+        {"test_destroy", jt_obd_test_destroy, 0,
+         "Destroy files on MDT by echo client\n"
+         "usage: test_destroy [-d parent_basedir] <-D parent_count>"
+         "[-b child_base_id] [-n count] <-t time>\n"},
+        {"test_rmdir", jt_obd_test_rmdir, 0,
+         "rmdir on MDT by echo client\n"
+         "usage: test_rmdir [-d parent_basedir] <-D parent_count>"
+         "[-b child_base_id] [-n count] <-t time>\n"},
+        {"test_lookup", jt_obd_test_lookup, 0,
+         "lookup files on MDT by echo client\n"
+         "usage: test_lookup [-d parent_basedir] <-D parent_count>"
+         "[-b child_base_id] [-n count] <-t time>\n"},
+        {"test_setxattr", jt_obd_test_setxattr, 0,
+         "Set EA for files/directory on MDT by echo client\n"
+         "usage: test_setxattr [-d parent_baseid] <-D parent_count>"
+         "[-b child_base_id] [-x size] [-n count] <-t time>\n"},
+        {"test_md_getattr", jt_obd_test_md_getattr, 0,
+         "getattr files on MDT by echo client\n"
+         "usage: test_md_getattr [-d parent_basedir] <-D parent_count>"
+         "[-b child_base_id] [-n count] <-t time>\n"},
         {"getattr", jt_obd_getattr, 0,
          "get attribute for OST object <objid>\n"
          "usage: getattr <objid>"},
         {"setattr", jt_obd_setattr, 0,
          "set mode attribute for OST object <objid>\n"
          "usage: setattr <objid> <mode>"},
-         {"create", jt_obd_create, 0,
+        {"create", jt_obd_create, 0,
          "create <num> OST objects (with <mode>)\n"
          "usage: create [num [mode [verbose [lsm data]]]]"},
         {"destroy", jt_obd_destroy, 0,
index 4f10404..178d1e3 100644 (file)
 #include <sys/shm.h>
 #include <pthread.h>
 
-#define MAX_THREADS 1024
-
+#define MAX_THREADS 4096
+#define MAX_BASE_ID 0xffffffff
 struct shared_data {
-        __u64 counters[MAX_THREADS];
-        __u64 offsets[MAX_THREADS];
-        int   running;
-        int   barrier;
-        int   stop;
         l_mutex_t mutex;
         l_cond_t  cond;
+        int       stopping;
+        struct {
+                __u64 counters[MAX_THREADS];
+                __u64 offsets[MAX_THREADS];
+                int   thr_running;
+                int   start_barrier;
+                int   stop_barrier;
+                struct timeval start_time;
+                struct timeval end_time;
+        } body;
 };
 
 static struct shared_data *shared_data;
@@ -126,7 +131,7 @@ int lcfg_ioctl(char * func, int dev_id, struct lustre_cfg *lcfg)
         char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
         int rc;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_type = LUSTRE_CFG_TYPE;
         data.ioc_plen1 = lustre_cfg_len(lcfg->lcfg_bufcount,
@@ -174,7 +179,7 @@ int lcfg_mgs_ioctl(char *func, int dev_id, struct lustre_cfg *lcfg)
         char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
         int rc;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         rc = data.ioc_dev = get_mgs_device();
         if (rc < 0)
                 goto out;
@@ -225,7 +230,7 @@ static int do_name2dev(char *func, char *name)
         char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
         int rc;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_inllen1 = strlen(name) + 1;
         data.ioc_inlbuf1 = name;
@@ -431,15 +436,19 @@ int do_disconnect(char *func, int verbose)
 }
 
 #ifdef MAX_THREADS
-static void shmem_setup(void)
+static int shmem_setup(void)
 {
-        /* Create new segment */
-        int shmid = shmget(IPC_PRIVATE, sizeof(*shared_data), 0600);
+        pthread_mutexattr_t mattr;
+        pthread_condattr_t  cattr;
+        int                 rc;
+        int                 shmid;
 
+        /* Create new segment */
+        shmid = shmget(IPC_PRIVATE, sizeof(*shared_data), 0600);
         if (shmid == -1) {
                 fprintf(stderr, "Can't create shared data: %s\n",
                         strerror(errno));
-                return;
+                return errno;
         }
 
         /* Attatch to new segment */
@@ -449,7 +458,7 @@ static void shmem_setup(void)
                 fprintf(stderr, "Can't attach shared data: %s\n",
                         strerror(errno));
                 shared_data = NULL;
-                return;
+                return errno;
         }
 
         /* Mark segment as destroyed, so it will disappear when we exit.
@@ -458,7 +467,31 @@ static void shmem_setup(void)
         if (shmctl(shmid, IPC_RMID, NULL) == -1) {
                 fprintf(stderr, "Can't destroy shared data: %s\n",
                         strerror(errno));
+                return errno;
         }
+
+        pthread_mutexattr_init(&mattr);
+        pthread_condattr_init(&cattr);
+
+        rc = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+        if (rc != 0) {
+                fprintf(stderr, "Can't set shared mutex attr\n");
+                return rc;
+        }
+
+        rc = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+        if (rc != 0) {
+                fprintf(stderr, "Can't set shared cond attr\n");
+                return rc;
+        }
+
+        pthread_mutex_init(&shared_data->mutex, &mattr);
+        pthread_cond_init(&shared_data->cond, &cattr);
+
+        pthread_mutexattr_destroy(&mattr);
+        pthread_condattr_destroy(&cattr);
+
+        return 0;
 }
 
 static inline void shmem_lock(void)
@@ -471,20 +504,30 @@ static inline void shmem_unlock(void)
         l_mutex_unlock(&shared_data->mutex);
 }
 
+static inline void shmem_wait(void)
+{
+        l_cond_wait(&shared_data->cond, &shared_data->mutex);
+}
+
+static inline void shmem_wakeup_all(void)
+{
+        l_cond_broadcast(&shared_data->cond);
+}
+
 static inline void shmem_reset(int total_threads)
 {
         if (shared_data == NULL)
                 return;
 
-        memset(shared_data, 0, sizeof(*shared_data));
-        l_mutex_init(&shared_data->mutex);
-        l_cond_init(&shared_data->cond);
+        memset(&shared_data->body, 0, sizeof(shared_data->body));
         memset(counter_snapshot, 0, sizeof(counter_snapshot));
         prev_valid = 0;
-        shared_data->barrier = total_threads;
+        shared_data->stopping = 0;
+        shared_data->body.start_barrier = total_threads;
+        shared_data->body.stop_barrier = total_threads;
 }
 
-static inline void shmem_bump(void)
+static inline void shmem_bump(__u32 counter)
 {
         static bool running_not_bumped = true;
 
@@ -492,14 +535,37 @@ static inline void shmem_bump(void)
                 return;
 
         shmem_lock();
-        shared_data->counters[thread - 1]++;
+        shared_data->body.counters[thread - 1] += counter;
         if (running_not_bumped) {
-                shared_data->running++;
+                shared_data->body.thr_running++;
                 running_not_bumped = false;
         }
         shmem_unlock();
 }
 
+static void shmem_total(int total_threads)
+{
+        __u64 total = 0;
+        double secs;
+        int i;
+
+        if (shared_data == NULL || total_threads > MAX_THREADS)
+                return;
+
+        shmem_lock();
+        for (i = 0; i < total_threads; i++)
+                total += shared_data->body.counters[i];
+
+        secs = difftime(&shared_data->body.end_time,
+                        &shared_data->body.start_time);
+        shmem_unlock();
+
+        printf("Total: total %llu threads %d sec %f %f/second\n", total,
+                total_threads, secs, total / secs);
+
+        return;
+}
+
 static void shmem_snap(int total_threads, int live_threads)
 {
         struct timeval this_time;
@@ -513,9 +579,9 @@ static void shmem_snap(int total_threads, int live_threads)
                 return;
 
         shmem_lock();
-        memcpy(counter_snapshot[0], shared_data->counters,
+        memcpy(counter_snapshot[0], shared_data->body.counters,
                total_threads * sizeof(counter_snapshot[0][0]));
-        running = shared_data->running;
+        running = shared_data->body.thr_running;
         shmem_unlock();
 
         gettimeofday(&this_time, NULL);
@@ -530,12 +596,10 @@ static void shmem_snap(int total_threads, int live_threads)
                 }
         }
 
-        secs = (this_time.tv_sec + this_time.tv_usec / 1000000.0) -
-               (prev_time.tv_sec + prev_time.tv_usec / 1000000.0);
-
-        if (prev_valid &&
-            secs > 1.0)                    /* someone screwed with the time? */
-                printf("%d/%d Total: %f/second\n", non_zero, total_threads, total / secs);
+        secs = difftime(&this_time, &prev_time);
+        if (prev_valid && secs > 1.0)    /* someone screwed with the time? */
+                printf("%d/%d Total: %f/second\n", non_zero, total_threads,
+                       total / secs);
 
         memcpy(counter_snapshot[1], counter_snapshot[0],
                total_threads * sizeof(counter_snapshot[0][0]));
@@ -550,24 +614,35 @@ static void shmem_stop(void)
         if (shared_data == NULL)
                 return;
 
-        shared_data->stop = 1;
+        shared_data->stopping = 1;
+}
+
+static void shmem_cleanup(void)
+{
+        if (shared_data == NULL)
+                return;
+
+        shmem_stop();
+
+        pthread_mutex_destroy(&shared_data->mutex);
+        pthread_cond_destroy(&shared_data->cond);
 }
 
 static int shmem_running(void)
 {
-        return (shared_data == NULL ||
-                !shared_data->stop);
+        return (shared_data == NULL || !shared_data->stopping);
 }
 #else
-static void shmem_setup(void)
+static int shmem_setup(void)
 {
+        return 0;
 }
 
 static inline void shmem_reset(int total_threads)
 {
 }
 
-static inline void shmem_bump(void)
+static inline void shmem_bump(__u32 counters)
 {
 }
 
@@ -579,7 +654,7 @@ static void shmem_unlock()
 {
 }
 
-static void shmem_stop(void)
+static void shmem_cleanup(void)
 {
 }
 
@@ -785,6 +860,7 @@ int jt_opt_threads(int argc, char **argv)
                 sigaction(SIGALRM, &saveact1, NULL);
         }
 
+        shmem_total(threads);
         sigprocmask(SIG_SETMASK, &saveset, NULL);
 
         return rc;
@@ -823,7 +899,7 @@ int jt_obd_no_transno(int argc, char **argv)
         char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
         int rc;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
 
         if (argc != 1)
@@ -850,7 +926,7 @@ int jt_obd_set_readonly(int argc, char **argv)
         char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
         int rc;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
 
         if (argc != 1)
@@ -877,7 +953,7 @@ int jt_obd_abort_recovery(int argc, char **argv)
         char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
         int rc;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
 
         if (argc != 1)
@@ -1039,6 +1115,414 @@ int jt_obd_list(int argc, char **argv)
         return 0;
 }
 
+struct jt_fid_space {
+        obd_seq jt_seq;
+        obd_id  jt_id;
+        int     jt_width;
+};
+
+int jt_obd_alloc_fids(struct jt_fid_space *space, struct lu_fid *fid,
+                      __u64 *count)
+{
+        int rc;
+
+        if (space->jt_seq == 0 || space->jt_id == space->jt_width) {
+                struct obd_ioctl_data  data;
+                char rawbuf[MAX_IOC_BUFLEN];
+                char *buf = rawbuf;
+                __u64 seqnr;
+                int max_count;
+
+                memset(&data, 0, sizeof(data));
+                data.ioc_dev = cur_device;
+
+                data.ioc_pbuf1 = (char *)&seqnr;
+                data.ioc_plen1 = sizeof(seqnr);
+
+                data.ioc_pbuf2 = (char *)&max_count;
+                data.ioc_plen2 = sizeof(max_count);
+
+                memset(buf, 0, sizeof(rawbuf));
+                rc = obd_ioctl_pack(&data, &buf, sizeof(rawbuf));
+                if (rc) {
+                        fprintf(stderr, "error: invalid ioctl rc = %d\n", rc);
+                        return rc;
+                }
+
+                rc = l2_ioctl(OBD_DEV_ID, OBD_IOC_ECHO_ALLOC_SEQ, buf);
+                if (rc) {
+                        fprintf(stderr, "ioctl error: rc = %d\n", rc);
+                        return rc;
+                }
+
+                space->jt_seq = *(__u64 *)data.ioc_pbuf1;
+                space->jt_width = *(int *)data.ioc_pbuf2;
+                space->jt_id = 1;
+        }
+        fid->f_seq = space->jt_seq;
+        fid->f_oid = space->jt_id;
+        fid->f_ver = 0;
+
+        space->jt_id = min(space->jt_id + *count, space->jt_width);
+
+        *count = space->jt_id - fid->f_oid;
+        return 0;
+}
+
+#define MD_STEP_COUNT 1000
+int jt_obd_md_common(int argc, char **argv, int cmd)
+{
+        struct obd_ioctl_data  data;
+        struct timeval         start;
+        struct timeval         next_time;
+        struct timeval         end_time;
+        char                   rawbuf[MAX_IOC_BUFLEN];
+        char                  *buf = rawbuf;
+        int                    verbose = 1;
+        int                    mode = 0000644;
+        int                    create_mode;
+        int                    rc = 0;
+        char                  *parent_basedir = NULL;
+        char                   dirname[4096];
+        int                    parent_base_id = 0;
+        int                    parent_count = 1;
+        __u64                  child_base_id = -1;
+        int                    stripe_count = 0;
+        int                    stripe_index = -1;
+        int                    count = 0;
+        char                  *end;
+        __u64                  seconds = 0;
+        double                 diff;
+        int                    c;
+        int                    xattr_size = 0;
+        __u64                  total_count = 0;
+        char                  *name = NULL;
+        struct jt_fid_space    fid_space = {0};
+        int                    version = 0;
+        struct option          long_opts[] = {
+                {"child_base_id",     required_argument, 0, 'b'},
+                {"stripe_count",      required_argument, 0, 'c'},
+                {"parent_basedir",    required_argument, 0, 'd'},
+                {"parent_dircount",   required_argument, 0, 'D'},
+                {"stripe_index",      required_argument, 0, 'i'},
+                {"mode",              required_argument, 0, 'm'},
+                {"count",             required_argument, 0, 'n'},
+                {"time",              required_argument, 0, 't'},
+                {"version",           no_argument,       0, 'v'},
+                {"xattr_size",        required_argument, 0, 'x'},
+                {0, 0, 0, 0}
+        };
+
+        optind = 0;
+        while ((c = getopt_long(argc, argv, "b:c:d:D:m:n:t:vx:",
+                                long_opts, NULL)) >= 0) {
+                switch (c) {
+                case 'b':
+                        child_base_id = strtoull(optarg, &end, 0);
+                        if (*end) {
+                                fprintf(stderr, "error: %s: bad child_base_id"
+                                        " '%s'\n", jt_cmdname(argv[0]), optarg);
+                                return CMD_HELP;
+                        }
+                        break;
+                case 'c':
+                        stripe_count = strtoul(optarg, &end, 0);
+                        if (*end) {
+                                fprintf(stderr, "error: %s: bad stripe count"
+                                        " '%s'\n", jt_cmdname(argv[0]), optarg);
+                                return CMD_HELP;
+                        }
+                        break;
+                case 'd':
+                        parent_basedir = optarg;
+                        break;
+                case 'D':
+                        parent_count = strtoul(optarg, &end, 0);
+                        if (*end) {
+                                fprintf(stderr, "error: %s: bad parent count"
+                                        " '%s'\n", jt_cmdname(argv[0]), optarg);
+                                return CMD_HELP;
+                        }
+                        break;
+                case 'i':
+                        stripe_index = strtoul(optarg, &end, 0);
+                        if (*end) {
+                                fprintf(stderr, "error: %s: bad stripe index"
+                                        " '%s'\n", jt_cmdname(argv[0]), optarg);
+                                return CMD_HELP;
+                        }
+                        break;
+                case 'm':
+                        mode = strtoul(optarg, &end, 0);
+                        if (*end) {
+                                fprintf(stderr, "error: %s: bad mode '%s'\n",
+                                        jt_cmdname(argv[0]), optarg);
+                                return CMD_HELP;
+                        }
+                        break;
+                case 'n':
+                        total_count = strtoul(optarg, &end, 0);
+                        if (*end || total_count == 0) {
+                                fprintf(stderr, "%s: bad child count '%s'\n",
+                                        jt_cmdname(argv[0]), optarg);
+                                return CMD_HELP;
+                        }
+                        break;
+                case 't':
+                        seconds = strtoull(optarg, &end, 0);
+                        if (*end) {
+                                fprintf(stderr, "error: %s: senconds '%s'\n",
+                                        jt_cmdname(argv[0]), optarg);
+                                return CMD_HELP;
+                        }
+                        break;
+                case 'v':
+                        version = 1;
+                        break;
+                case 'x':
+                        xattr_size = strtoul(optarg, &end, 0);
+                        if (*end) {
+                                fprintf(stderr, "error: %s: senconds '%s'\n",
+                                        jt_cmdname(argv[0]), optarg);
+                                return CMD_HELP;
+                        }
+                        break;
+                default:
+                        fprintf(stderr, "error: %s: option '%s' "
+                                "unrecognized\n", argv[0], argv[optind - 1]);
+                        return CMD_HELP;
+                }
+        }
+
+        memset(&data, 0, sizeof(data));
+        data.ioc_dev = cur_device;
+        if (child_base_id == -1) {
+                if (optind >= argc)
+                        return CMD_HELP;
+                name = argv[optind];
+                total_count = 1;
+        } else {
+                if (optind < argc) {
+                        fprintf(stderr, "child_base_id and name can not"
+                                        " specified at the same time\n");
+                        return CMD_HELP;
+                }
+        }
+
+        if (stripe_count == 0 && stripe_index != -1) {
+                fprintf(stderr, "If stripe_count is 0, stripe_index can not"
+                                "be specified\n");
+                return CMD_HELP;
+        }
+
+        if (total_count == 0 && seconds == 0) {
+                fprintf(stderr, "count or seconds needs to be indicated\n");
+                return CMD_HELP;
+        }
+
+        if (parent_count <= 0) {
+                fprintf(stderr, "parent count must < 0\n");
+                return CMD_HELP;
+        }
+
+#ifdef MAX_THREADS
+        if (thread) {
+                shmem_lock();
+                /* threads interleave */
+                if (parent_base_id != -1)
+                        parent_base_id += (thread - 1) % parent_count;
+
+                if (child_base_id != -1)
+                        child_base_id +=  (thread - 1) * \
+                                          (MAX_BASE_ID / nthreads);
+
+                shared_data->body.start_barrier--;
+                if (shared_data->body.start_barrier == 0) {
+                        shmem_wakeup_all();
+
+                        gettimeofday(&shared_data->body.start_time, NULL);
+                        printf("%s: start at %s", jt_cmdname(argv[0]),
+                               ctime(&shared_data->body.start_time.tv_sec));
+                } else {
+                        shmem_wait();
+                }
+                shmem_unlock();
+        }
+#endif
+        /* If parent directory is not specified, try to get the directory
+         * from name */
+        if (parent_basedir == NULL) {
+                char *last_lash;
+                if (name == NULL) {
+                        fprintf(stderr, "parent_basedir or name must be"
+                                        "indicated!\n");
+                        return CMD_HELP;
+                }
+                /*Get directory and name from name*/
+                last_lash = strrchr(name, '/');
+                if (last_lash == NULL || name[0] != '/') {
+                        fprintf(stderr, "Can not locate %s\n", name);
+                        return CMD_HELP;
+                }
+
+                if (last_lash == name) {
+                        sprintf(dirname, "%s", "/");
+                        name++;
+                } else {
+                        int namelen = (unsigned long)last_lash -
+                                      (unsigned long)name;
+                        snprintf(dirname, namelen, "%s", name);
+                }
+
+                data.ioc_pbuf1 = dirname;
+                data.ioc_plen1 = strlen(dirname);
+
+                data.ioc_pbuf2 = name;
+                data.ioc_plen2 = strlen(name);
+        } else {
+                if (name != NULL) {
+                        data.ioc_pbuf2 = name;
+                        data.ioc_plen2 = strlen(name);
+                } else {
+                        if (parent_base_id > 0)
+                                sprintf(dirname, "%s%d", parent_basedir,
+                                        parent_base_id);
+                        else
+                                sprintf(dirname, "%s", parent_basedir);
+                }
+                data.ioc_pbuf1 = dirname;
+                data.ioc_plen1 = strlen(dirname);
+        }
+
+        if (cmd == ECHO_MD_MKDIR || cmd == ECHO_MD_RMDIR)
+                create_mode = S_IFDIR;
+        else
+                create_mode = S_IFREG;
+
+        data.ioc_obdo1.o_mode = mode | S_IFDIR;
+        data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE |
+                                 OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
+        data.ioc_command = cmd;
+
+        gettimeofday(&start, NULL);
+        next_time.tv_sec = start.tv_sec - verbose;
+        next_time.tv_usec = start.tv_usec;
+        while (shmem_running()) {
+                struct lu_fid fid;
+
+                data.ioc_obdo2.o_id = child_base_id;
+                data.ioc_obdo2.o_mode = mode | create_mode;
+                data.ioc_obdo2.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
+                                         OBD_MD_FLMODE | OBD_MD_FLFLAGS |
+                                         OBD_MD_FLGROUP;
+                data.ioc_obdo2.o_misc = stripe_count;
+                data.ioc_obdo2.o_stripe_idx = stripe_index;
+
+                if (total_count > 0) {
+                        if ((total_count - count) > MD_STEP_COUNT)
+                                data.ioc_count = MD_STEP_COUNT;
+                        else
+                                data.ioc_count = total_count - count;
+                } else {
+                        data.ioc_count = MD_STEP_COUNT;
+                }
+
+                child_base_id += data.ioc_count;
+                count += data.ioc_count;
+                if (cmd == ECHO_MD_CREATE || cmd == ECHO_MD_MKDIR) {
+                        /*Allocate fids for the create */
+                        rc = jt_obd_alloc_fids(&fid_space, &fid,
+                                               &data.ioc_count);
+                        if (rc) {
+                                fprintf(stderr, "Allocate fids error %d.\n",rc);
+                                return rc;
+                        }
+                        data.ioc_obdo1.o_seq = fid.f_seq;
+                        data.ioc_obdo1.o_id = fid.f_oid;
+                }
+                memset(buf, 0, sizeof(rawbuf));
+                rc = obd_ioctl_pack(&data, &buf, sizeof(rawbuf));
+                if (rc) {
+                        fprintf(stderr, "error: %s: invalid ioctl %d\n",
+                                jt_cmdname(argv[0]), rc);
+                        return rc;
+                }
+
+                rc = l2_ioctl(OBD_DEV_ID, OBD_IOC_ECHO_MD, buf);
+                if (rc) {
+                        fprintf(stderr, "error: %s: %s\n",
+                                jt_cmdname(argv[0]), strerror(rc = errno));
+                        return rc;
+                }
+                shmem_bump(data.ioc_count);
+
+                gettimeofday(&end_time, NULL);
+                diff = difftime(&end_time, &start);
+                if (seconds > 0 && (__u64)diff > seconds)
+                        break;
+
+                if (count >= total_count && total_count > 0)
+                        break;
+        }
+
+        if (count > 0 && version) {
+                gettimeofday(&end_time, NULL);
+                diff = difftime(&end_time, &start);
+                printf("%s: %d in %.3fs (%.3f /s): %s",
+                        jt_cmdname(argv[0]), count, diff,
+                        (double)count/diff, ctime(&end_time.tv_sec));
+        }
+
+#ifdef MAX_THREADS
+        if (thread) {
+                shmem_lock();
+                shared_data->body.stop_barrier--;
+                if (shared_data->body.stop_barrier == 0) {
+                        gettimeofday(&shared_data->body.end_time, NULL);
+                        printf("%s: end at %s", jt_cmdname(argv[0]),
+                                ctime(&shared_data->body.end_time.tv_sec));
+                }
+                shmem_unlock();
+        }
+#endif
+        return rc;
+}
+
+int jt_obd_test_create(int argc, char **argv)
+{
+        return jt_obd_md_common(argc, argv, ECHO_MD_CREATE);
+}
+
+int jt_obd_test_mkdir(int argc, char **argv)
+{
+        return jt_obd_md_common(argc, argv, ECHO_MD_MKDIR);
+}
+
+int jt_obd_test_destroy(int argc, char **argv)
+{
+        return jt_obd_md_common(argc, argv, ECHO_MD_DESTROY);
+}
+
+int jt_obd_test_rmdir(int argc, char **argv)
+{
+        return jt_obd_md_common(argc, argv, ECHO_MD_RMDIR);
+}
+
+int jt_obd_test_lookup(int argc, char **argv)
+{
+        return jt_obd_md_common(argc, argv, ECHO_MD_LOOKUP);
+}
+
+int jt_obd_test_setxattr(int argc, char **argv)
+{
+        return jt_obd_md_common(argc, argv, ECHO_MD_SETATTR);
+}
+
+int jt_obd_test_md_getattr(int argc, char **argv)
+{
+        return jt_obd_md_common(argc, argv, ECHO_MD_GETATTR);
+}
+
 /* Create one or more objects, arg[4] may describe stripe meta-data.  If
  * not, defaults assumed.  This echo-client instance stashes the stripe
  * object ids.  Use get_stripe on this node to print full lsm and
@@ -1054,7 +1538,7 @@ int jt_obd_create(int argc, char **argv)
         int verbose = 1, mode = 0100644, rc = 0, i, valid_lsm = 0;
         char *end;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         if (argc < 2 || argc > 5)
                 return CMD_HELP;
@@ -1122,7 +1606,7 @@ int jt_obd_create(int argc, char **argv)
                 }
                 rc = l2_ioctl(OBD_DEV_ID, OBD_IOC_CREATE, buf);
                 obd_ioctl_unpack(&data, buf, sizeof(rawbuf));
-                shmem_bump();
+                shmem_bump(1);
                 if (rc < 0) {
                         fprintf(stderr, "error: %s: #%d - %s\n",
                                 jt_cmdname(argv[0]), i, strerror(rc = errno));
@@ -1149,7 +1633,7 @@ int jt_obd_setattr(int argc, char **argv)
         char *end;
         int rc;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         if (argc != 2)
                 return CMD_HELP;
@@ -1197,7 +1681,7 @@ int jt_obd_test_setattr(int argc, char **argv)
         if (argc < 2 || argc > 4)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         count = strtoull(argv[1], &end, 0);
         if (*end) {
@@ -1237,7 +1721,7 @@ int jt_obd_test_setattr(int argc, char **argv)
                 data.ioc_obdo1.o_id = objid;
                 data.ioc_obdo1.o_mode = S_IFREG;
                 data.ioc_obdo1.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE;
-                memset(buf, 0x00, sizeof(rawbuf));
+                memset(buf, 0, sizeof(rawbuf));
                 rc = obd_ioctl_pack(&data, &buf, sizeof(rawbuf));
                 if (rc) {
                         fprintf(stderr, "error: %s: invalid ioctl\n",
@@ -1245,7 +1729,7 @@ int jt_obd_test_setattr(int argc, char **argv)
                         return rc;
                 }
                 rc = l2_ioctl(OBD_DEV_ID, OBD_IOC_SETATTR, &data);
-                shmem_bump();
+                shmem_bump(1);
                 if (rc < 0) {
                         fprintf(stderr, "error: %s: #"LPD64" - %d:%s\n",
                                 jt_cmdname(argv[0]), i, errno, strerror(rc = errno));
@@ -1286,7 +1770,7 @@ int jt_obd_destroy(int argc, char **argv)
         char *end;
         int rc = 0, i;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         if (argc < 2 || argc > 4)
                 return CMD_HELP;
@@ -1331,7 +1815,7 @@ int jt_obd_destroy(int argc, char **argv)
                 }
                 rc = l2_ioctl(OBD_DEV_ID, OBD_IOC_DESTROY, buf);
                 obd_ioctl_unpack(&data, buf, sizeof(rawbuf));
-                shmem_bump();
+                shmem_bump(1);
                 if (rc < 0) {
                         fprintf(stderr, "error: %s: objid "LPX64": %s\n",
                                 jt_cmdname(argv[0]), id, strerror(rc = errno));
@@ -1356,7 +1840,7 @@ int jt_obd_getattr(int argc, char **argv)
         if (argc != 2)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_obdo1.o_id = strtoull(argv[1], &end, 0);
         if (*end) {
@@ -1402,7 +1886,7 @@ int jt_obd_test_getattr(int argc, char **argv)
         if (argc < 2 || argc > 4)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         count = strtoull(argv[1], &end, 0);
         if (*end) {
@@ -1442,7 +1926,7 @@ int jt_obd_test_getattr(int argc, char **argv)
                 data.ioc_obdo1.o_id = objid;
                 data.ioc_obdo1.o_mode = S_IFREG;
                 data.ioc_obdo1.o_valid = 0xffffffff;
-                memset(buf, 0x00, sizeof(rawbuf));
+                memset(buf, 0, sizeof(rawbuf));
                 rc = obd_ioctl_pack(&data, &buf, sizeof(rawbuf));
                 if (rc) {
                         fprintf(stderr, "error: %s: invalid ioctl\n",
@@ -1450,7 +1934,7 @@ int jt_obd_test_getattr(int argc, char **argv)
                         return rc;
                 }
                 rc = l2_ioctl(OBD_DEV_ID, OBD_IOC_GETATTR, &data);
-                shmem_bump();
+                shmem_bump(1);
                 if (rc < 0) {
                         fprintf(stderr, "error: %s: #"LPD64" - %d:%s\n",
                                 jt_cmdname(argv[0]), i, errno, strerror(rc = errno));
@@ -1581,7 +2065,7 @@ int jt_obd_test_brw(int argc, char **argv)
                 }
         }
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
 
         /* communicate the 'type' of brw test and batching to echo_client.
@@ -1629,20 +2113,21 @@ int jt_obd_test_brw(int argc, char **argv)
                         obj_idx = (thread - 1)/nthr_per_obj;
                         objid += obj_idx;
                         stride *= nthr_per_obj;
-                        if ((thread - 1) % nthr_per_obj == 0)
-                                shared_data->offsets[obj_idx] = stride + thr_offset;
+                        if ((thread - 1) % nthr_per_obj == 0) {
+                                shared_data->body.offsets[obj_idx] =
+                                        stride + thr_offset;
+                        }
                         thr_offset += ((thread - 1) % nthr_per_obj) * len;
                 } else {
                         /* threads disjoint */
                         thr_offset += (thread - 1) * len;
                 }
 
-                shared_data->barrier--;
-                if (shared_data->barrier == 0)
-                        l_cond_broadcast(&shared_data->cond);
+                shared_data->body.start_barrier--;
+                if (shared_data->body.start_barrier == 0)
+                        shmem_wakeup_all();
                 else
-                        l_cond_wait(&shared_data->cond,
-                                    &shared_data->mutex);
+                        shmem_wait();
 
                 shmem_unlock ();
         }
@@ -1667,7 +2152,7 @@ int jt_obd_test_brw(int argc, char **argv)
         cmd = write ? OBD_IOC_BRW_WRITE : OBD_IOC_BRW_READ;
         for (i = 1, next_count = verbose; i <= count && shmem_running(); i++) {
                 data.ioc_obdo1.o_valid &= ~(OBD_MD_FLBLOCKS|OBD_MD_FLGRANT);
-                memset(buf, 0x00, sizeof(rawbuf));
+                memset(buf, 0, sizeof(rawbuf));
                 rc = obd_ioctl_pack(&data, &buf, sizeof(rawbuf));
                 if (rc) {
                         fprintf(stderr, "error: %s: invalid ioctl\n",
@@ -1675,7 +2160,7 @@ int jt_obd_test_brw(int argc, char **argv)
                         return rc;
                 }
                 rc = l2_ioctl(OBD_DEV_ID, cmd, buf);
-                shmem_bump();
+                shmem_bump(1);
                 if (rc) {
                         fprintf(stderr, "error: %s: #%d - %s on %s\n",
                                 jt_cmdname(argv[0]), i, strerror(rc = errno),
@@ -1696,8 +2181,9 @@ int jt_obd_test_brw(int argc, char **argv)
                                 data.ioc_offset += stride;
                         } else if (i < count) {
                                 shmem_lock ();
-                                data.ioc_offset = shared_data->offsets[obj_idx];
-                                shared_data->offsets[obj_idx] += len;
+                                data.ioc_offset =
+                                        shared_data->body.offsets[obj_idx];
+                                shared_data->body.offsets[obj_idx] += len;
                                 shmem_unlock ();
                         }
 #else
@@ -1738,7 +2224,7 @@ int jt_obd_lov_getconfig(int argc, char **argv)
         char *path;
         int rc, fd;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
 
         if (argc != 2)
@@ -1773,7 +2259,7 @@ repeat:
                 goto out_uuidarray;
         }
 
-        memset(buf, 0x00, sizeof(rawbuf));
+        memset(buf, 0, sizeof(rawbuf));
         data.ioc_inllen1 = sizeof(desc);
         data.ioc_inlbuf1 = (char *)&desc;
         data.ioc_inllen2 = desc.ld_tgt_count * sizeof(*uuidarray);
@@ -1840,7 +2326,7 @@ int jt_obd_ldlm_regress_start(int argc, char **argv)
         char argstring[200];
         int i, count = sizeof(argstring) - 1;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         if (argc > 5)
                 return CMD_HELP;
@@ -1879,7 +2365,7 @@ int jt_obd_ldlm_regress_stop(int argc, char **argv)
         char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
         struct obd_ioctl_data data;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
 
         if (argc != 1)
@@ -1906,7 +2392,7 @@ static int do_activate(int argc, char **argv, int flag)
         char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
         int rc;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         if (argc != 1)
                 return CMD_HELP;
@@ -1945,7 +2431,7 @@ int jt_obd_recover(int argc, char **argv)
         char rawbuf[MAX_IOC_BUFLEN], *buf = rawbuf;
         struct obd_ioctl_data data;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         if (argc > 2)
                 return CMD_HELP;
@@ -1986,7 +2472,7 @@ int jt_obd_mdc_lookup(int argc, char **argv)
         if (argc == 4)
                 verbose = get_verbose(argv[0], argv[3]);
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
 
         data.ioc_inllen1 = strlen(child) + 1;
@@ -2038,7 +2524,7 @@ int jt_cfg_dump_log(int argc, char **argv)
         if (argc != 2)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_inllen1 = strlen(argv[1]) + 1;
         data.ioc_inlbuf1 = argv[1];
@@ -2067,7 +2553,7 @@ int jt_llog_catlist(int argc, char **argv)
         if (argc != 1)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_inllen1 = sizeof(rawbuf) - cfs_size_round(sizeof(data));
         memset(buf, 0, sizeof(rawbuf));
@@ -2096,7 +2582,7 @@ int jt_llog_info(int argc, char **argv)
         if (argc != 2)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_inllen1 = strlen(argv[1]) + 1;
         data.ioc_inlbuf1 = argv[1];
@@ -2129,7 +2615,7 @@ int jt_llog_print(int argc, char **argv)
         if (argc != 2 && argc != 4)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_inllen1 = strlen(argv[1]) + 1;
         data.ioc_inlbuf1 = argv[1];
@@ -2176,7 +2662,7 @@ int jt_llog_cancel(int argc, char **argv)
         if (argc != 4)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_inllen1 = strlen(argv[1]) + 1;
         data.ioc_inlbuf1 = argv[1];
@@ -2211,7 +2697,7 @@ int jt_llog_check(int argc, char **argv)
         if (argc != 2 && argc != 4)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_inllen1 = strlen(argv[1]) + 1;
         data.ioc_inlbuf1 = argv[1];
@@ -2257,7 +2743,7 @@ int jt_llog_remove(int argc, char **argv)
         if (argc != 3 && argc != 2)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_inllen1 = strlen(argv[1]) + 1;
         data.ioc_inlbuf1 = argv[1];
@@ -2501,7 +2987,9 @@ int obd_initialize(int argc, char **argv)
         for (i = 0; i < MAX_STRIPES; i++)
                 lsm_buffer.lsm.lsm_oinfo[i] = lov_oinfos + i;
 
-        shmem_setup();
+        if (shmem_setup() != 0)
+                return -1;
+
         register_ioc_dev(OBD_DEV_ID, OBD_DEV_PATH,
                          OBD_DEV_MAJOR, OBD_DEV_MINOR);
 
@@ -2517,7 +3005,7 @@ void obd_finalize(int argc, char **argv)
         sigact.sa_flags = SA_RESTART;
         sigaction(SIGINT, &sigact, NULL);
 
-        shmem_stop();
+        shmem_cleanup();
         do_disconnect(argv[0], 1);
 }
 
@@ -2757,7 +3245,7 @@ static int pool_cmd(enum lcfg_command_type cmd,
                 return rc;
         }
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         rc = data.ioc_dev = get_mgs_device();
         if (rc < 0)
                 goto out;
@@ -3131,7 +3619,7 @@ int jt_changelog_register(int argc, char **argv)
         if (cur_device < 0)
                 return CMD_HELP;
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         memset(buf, 0, sizeof(rawbuf));
         rc = obd_ioctl_pack(&data, &buf, sizeof(rawbuf));
@@ -3186,7 +3674,7 @@ int jt_changelog_deregister(int argc, char **argv)
                 return CMD_HELP;
         }
 
-        memset(&data, 0x00, sizeof(data));
+        memset(&data, 0, sizeof(data));
         data.ioc_dev = cur_device;
         data.ioc_u32_1 = id;
         memset(buf, 0, sizeof(rawbuf));
index 8ec992e..436c855 100644 (file)
@@ -65,6 +65,14 @@ int jt_obd_set_readonly(int argc, char **argv);
 int jt_obd_abort_recovery(int argc, char **argv);
 int jt_obd_list(int argc, char **argv);
 int jt_obd_create(int argc, char **argv);
+int jt_obd_test_create(int argc, char **argv);
+int jt_obd_test_mkdir(int argc, char **argv);
+int jt_obd_test_destroy(int argc, char **argv);
+int jt_obd_test_rmdir(int argc, char **argv);
+int jt_obd_test_lookup(int argc, char **argv);
+int jt_obd_test_setxattr(int argc, char **argv);
+int jt_obd_test_md_getattr(int argc, char **argv);
+
 int jt_obd_setattr(int argc, char **argv);
 int jt_obd_test_setattr(int argc, char **argv);
 int jt_obd_destroy(int argc, char **argv);