Whamcloud - gitweb
- generate capability in open resend case
[fs/lustre-release.git] / lustre / cobd / cache_obd.c
index 15e2b88..a2b1d17 100644 (file)
 #include <linux/lustre_mds.h>
 #include <linux/obd_class.h>
 #include <linux/obd_cache.h>
+#include <linux/obd_lmv.h>
+#include <linux/obd_lov.h>
 
-static int cobd_attach(struct obd_device *obd, obd_count len, void *buf)
+static int cobd_attach(struct obd_device *obd,
+                       obd_count len, void *buf)
 {
         struct lprocfs_static_vars lvars;
+        int rc = 0;
+        ENTRY;
         
         lprocfs_init_vars(cobd, &lvars);
-        return lprocfs_obd_attach(obd, lvars.obd_vars);
+        rc = lprocfs_obd_attach(obd, lvars.obd_vars);
+        
+        RETURN(rc);
 }
 
 static int cobd_detach(struct obd_device *obd)
 {
-        return lprocfs_obd_detach(obd);
-}
-
-static int connect_to_obd(char *name, struct lustre_handle *conn)
-{ 
-        struct obd_uuid   obd_uuid;
-        struct obd_device *obd;
-        int    rc = 0;
         ENTRY;
-        obd = class_name2obd(name);
-        if (obd == NULL) {
-                CERROR("%s: unable to find a client for obd: %s\n",
-                       obd->obd_name, name);
-                RETURN(-EINVAL);
-        }
-        rc = obd_connect(conn, obd, &obd_uuid, 0);
-        RETURN(rc);
+        RETURN(lprocfs_obd_detach(obd));
 }
 
 static int cobd_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         struct lustre_cfg *lcfg = (struct lustre_cfg *)buf;
+        int inst_len = 0, mname_len = 0, cname_len = 0;
+        struct obd_device *master_obd, *cache_obd;
         struct cache_obd  *cobd = &obd->u.cobd;
-#if 0
-        struct lustre_handle master_conn = {0,};
-#endif
-        struct lustre_handle cache_conn = {0,};
-        struct obd_device *master;
-        struct obd_device *cache;
-        int rc;
+        struct lustre_handle conn = { 0 };
+        int rc = 0;
         ENTRY;
 
-        if (lcfg->lcfg_inllen1 == 0 || lcfg->lcfg_inlbuf1 == NULL) {
+        sema_init(&cobd->sem, 1);
+        
+        if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1 ||
+            lustre_cfg_buf(lcfg, 1) == NULL) {
                 CERROR("%s: setup requires master device name\n", 
                        obd->obd_name);
                 RETURN(-EINVAL);
         }
 
-        master = class_name2obd(lcfg->lcfg_inlbuf1);
-        if (master == NULL) {
-                CERROR("%s: unable to find a client for master: %s\n",
-                       obd->obd_name, lcfg->lcfg_inlbuf1);
+        if (LUSTRE_CFG_BUFLEN(lcfg, 2) < 1 ||
+            lustre_cfg_buf(lcfg, 2) == NULL) {
+                CERROR("%s: setup requires cache device name\n",
+                       obd->obd_name);
                 RETURN(-EINVAL);
         }
-
-        if (lcfg->lcfg_inllen2 == 0 || lcfg->lcfg_inlbuf2 == NULL) {
-                CERROR("%s: setup requires cache device name\n", obd->obd_name);
-                RETURN(-EINVAL);
+        inst_len = LUSTRE_CFG_BUFLEN(lcfg, 3);
+        
+        if (inst_len) {
+                LASSERT(lustre_cfg_buf(lcfg, 3) != NULL);
+                mname_len = LUSTRE_CFG_BUFLEN(lcfg, 1) + inst_len;  
+                cname_len = LUSTRE_CFG_BUFLEN(lcfg, 2) + inst_len;  
+        } else {
+                mname_len = LUSTRE_CFG_BUFLEN(lcfg, 1);
+                cname_len = LUSTRE_CFG_BUFLEN(lcfg, 2);
+        } 
+       
+        /* get the cache obd name and master name */
+        OBD_ALLOC(cobd->master_name, mname_len);
+        if (!cobd->master_name) 
+                RETURN(-ENOMEM);
+        if(inst_len)
+                sprintf(cobd->master_name, "%s-%s", lustre_cfg_string(lcfg, 1), 
+                        lustre_cfg_string(lcfg, 3));
+        else 
+                sprintf(cobd->master_name, "%s", lustre_cfg_string(lcfg, 1));
+        
+        OBD_ALLOC(cobd->cache_name, cname_len);
+        if (!cobd->cache_name) {
+                OBD_FREE(cobd->master_name, mname_len);
+                RETURN(-ENOMEM);
+        } 
+        if (inst_len)
+                sprintf(cobd->cache_name, "%s-%s", lustre_cfg_string(lcfg, 2), 
+                        lustre_cfg_string(lcfg, 3));
+        else 
+                sprintf(cobd->cache_name, "%s", lustre_cfg_string(lcfg, 2));
+
+        CDEBUG(D_INFO, "master name %s cache name %s\n", cobd->master_name,
+               cobd->cache_name);
+        
+        /* getting master obd */
+        master_obd = class_name2obd(cobd->master_name);
+        if (!master_obd) {
+                CERROR("can't find master obd by name %s\n",
+                       cobd->master_name);
+                GOTO(put_names, rc = -EINVAL);
         }
 
-        cache = class_name2obd(lcfg->lcfg_inlbuf2);
-        if (cache == NULL) {
-                CERROR("%s: unable to find a client for cache: %s\n",
-                       obd->obd_name, lcfg->lcfg_inlbuf2);
-                RETURN(-EINVAL);
+        /* connecting master */
+        memset(&conn, 0, sizeof(conn));
+        rc = class_connect(&conn, master_obd, &obd->obd_uuid);
+        if (rc)
+               GOTO(put_names, rc);
+        
+        cobd->master_exp = class_conn2export(&conn);
+
+        /* getting cache obd */
+        cache_obd = class_name2obd(cobd->cache_name);
+        if (!cache_obd) {
+                class_disconnect(cobd->master_exp, 0);
+                CERROR("can't find cache obd by name %s\n",
+                       cobd->cache_name);
+                GOTO(put_names, rc = -EINVAL);
         }
 
-        OBD_ALLOC(cobd->master_name, strlen(lcfg->lcfg_inlbuf1) + 1);
-        if (!cobd->master_name) 
-                GOTO(exit, rc = -ENOMEM);
-        memcpy(cobd->master_name, lcfg->lcfg_inlbuf1, 
-               strlen(lcfg->lcfg_inlbuf1));
+        /* connecting cache */
+        memset(&conn, 0, sizeof(conn));
+        rc = class_connect(&conn, cache_obd, &obd->obd_uuid);
+        if (rc) {
+                class_disconnect(cobd->master_exp, 0);
+                GOTO(put_names, rc);
+        }
+        cobd->cache_exp = class_conn2export(&conn);
         
-        OBD_ALLOC(cobd->cache_name, strlen(lcfg->lcfg_inlbuf2) + 1);
-        if (!cobd->cache_name) 
-                GOTO(exit, rc = -ENOMEM);
-        memcpy(cobd->cache_name, lcfg->lcfg_inlbuf2, 
-               strlen(lcfg->lcfg_inlbuf2));
-
-#if 0        
-        /* don't bother checking attached/setup; obd_connect() should, and it
-         * can change underneath us */
-        rc = connect_to_obd(cobd->master_name, &master_conn);
-        if (rc != 0)
-                GOTO(exit, rc);
-        cobd->master_exp = class_conn2export(&master_conn);
-#endif        
-        rc = connect_to_obd(cobd->cache_name, &cache_conn);
-        if (rc != 0) {
-                obd_disconnect(cobd->cache_exp, 0);
-                GOTO(exit, rc);
-        }
-        cobd->cache_exp = class_conn2export(&cache_conn);
+        /* default set cache on, but nothing is realy connected yet, will be
+         * done in cobd_connect() time. */
         cobd->cache_on = 1;
 
-        if (!strcmp(master->obd_type->typ_name, LUSTRE_MDC_NAME)) {
-                int mds_type;
-                
-                mds_type = MDS_MASTER_OBD;
-                obd_set_info(cobd->master_exp, strlen("mds_type"),
-                             "mds_type", sizeof(mds_type), &mds_type);
-                
-                mds_type = MDS_CACHE_OBD;
-                obd_set_info(cobd->cache_exp, strlen("mds_type"),
-                             "mds_type", sizeof(mds_type), &mds_type);
-        }
-exit:
+        /* nothing is connected, make exports reflect this state to not confuse
+         * cobd_switch() later. */
+        cobd->cache_real_exp = NULL;
+        cobd->master_real_exp = NULL;
+        
+        EXIT;
+put_names:
         if (rc) {
-                if (cobd->cache_name)
-                        OBD_FREE(cobd->cache_name, 
-                                 strlen(cobd->cache_name) + 1);
-                if (cobd->master_name)
-                        OBD_FREE(cobd->master_name, 
-                                 strlen(cobd->master_name) + 1);
+                if (cobd->master_name) {
+                        OBD_FREE(cobd->master_name, LUSTRE_CFG_BUFLEN(lcfg, 1));
+                        cobd->master_name = NULL;
+                } 
+                if (cobd->cache_name) {
+                        OBD_FREE(cobd->cache_name, LUSTRE_CFG_BUFLEN(lcfg, 2));
+                        cobd->cache_name = NULL;
+                }
         }
-        RETURN(rc);
+        return rc;
 }
 
 static int cobd_cleanup(struct obd_device *obd, int flags)
 {
         struct cache_obd  *cobd = &obd->u.cobd;
-        int                rc;
+        int rc = 0;
+        ENTRY;
 
         if (!list_empty(&obd->obd_exports))
-                return (-EBUSY);
-        
+                RETURN(-EBUSY);
+
         if (cobd->cache_name)
                 OBD_FREE(cobd->cache_name, 
                          strlen(cobd->cache_name) + 1);
         if (cobd->master_name)
                 OBD_FREE(cobd->master_name, 
                          strlen(cobd->master_name) + 1);
-        if (cobd->cache_on) { 
-                rc = obd_disconnect(cobd->cache_exp, flags);
-                if (rc != 0)
-                        CERROR("error %d disconnecting cache\n", rc);
-        }
-        rc = obd_disconnect(cobd->master_exp, flags);
-        if (rc != 0)
-                CERROR("error %d disconnecting master\n", rc);
         
-        return (rc);
+        rc = class_disconnect(cobd->master_exp, flags);
+        if (rc) {
+                CERROR("error disconnecting master, err %d\n",
+                       rc);
+        }
+        rc = class_disconnect(cobd->cache_exp, flags);
+        if (rc) {
+                CERROR("error disconnecting master, err %d\n",
+                       rc);
+        }
+
+        RETURN(0);
 }
 
-struct obd_export *cobd_get_exp(struct obd_device *obd)
+static inline struct obd_export *
+cobd_get_exp(struct obd_device *obd)
 {
-        struct cache_obd  *cobd = &obd->u.cobd;
+        struct cache_obd *cobd = &obd->u.cobd;
+        ENTRY;
         
-        if (cobd->cache_on)  
-                return cobd->cache_exp;
-        else
-                return cobd->master_exp;
+        if (cobd->cache_on) {
+                CDEBUG(D_TRACE, "get cache exp %p \n", cobd->cache_exp); 
+                if (cobd->cache_real_exp)
+                        RETURN(cobd->cache_real_exp);
+                RETURN(cobd->cache_exp);
+        }
+        
+        CDEBUG(D_TRACE, "get master exp %p \n", cobd->master_exp);
+        if (cobd->master_real_exp)
+                RETURN(cobd->master_real_exp); 
+        RETURN(cobd->master_exp);
+}
+
+static int cobd_init_dt_desc(struct obd_device *obd)
+{
+        struct cache_obd *cobd = &obd->u.cobd;
+        struct obd_export *cobd_exp;
+        __u32 valsize;
+        int rc = 0;
+        ENTRY;
+        
+        valsize = sizeof(cobd->dt_desc);
+        memset(&cobd->dt_desc, 0, sizeof(cobd->dt_desc));
+
+        cobd_exp = cobd_get_exp(obd);
+        rc = obd_get_info(cobd_exp, strlen("lovdesc") + 1,
+                          "lovdesc", &valsize, &cobd->dt_desc);
+        RETURN(rc);
+}
+        
+static int cobd_init_ea_size(struct obd_device *obd)
+{
+        int rc = 0, tgt_count, easize, cookiesize;
+        struct cache_obd *cobd = &obd->u.cobd;
+        struct obd_export *cobd_exp;
+        ENTRY;
+
+        tgt_count = cobd->dt_desc.ld_tgt_count;
+
+        /* no EA setup is needed as there is single OST with no LOV */
+        if (tgt_count == 0)
+                RETURN(0);
+
+        cobd_exp = cobd_get_exp(obd);
+        easize = lov_mds_md_size(tgt_count);
+        cookiesize = tgt_count * sizeof(struct llog_cookie);
+        rc = obd_init_ea_size(cobd_exp, easize, cookiesize);
+        RETURN(rc);
+}
+
+static int
+cobd_connect_client(struct obd_device *obd,
+                    struct obd_export *exp,
+                    struct lustre_handle *conn,
+                    struct obd_connect_data *data,
+                    unsigned long flags)
+{ 
+        struct obd_device *cli_obd;
+        int rc = 0;
+        ENTRY;
+        LASSERT(obd);
+        LASSERT(conn);
+        
+        cli_obd = class_exp2obd(exp);
+        if (cli_obd == NULL) 
+                RETURN(-EINVAL);
+
+        rc = obd_connect(conn, cli_obd, &obd->obd_uuid,
+                         data, flags);
+        if (rc) 
+                CERROR("error connecting err %d\n", rc);
+
+        RETURN(rc);
+}
+
+static int
+cobd_disconnect_client(struct obd_device *obd,
+                       struct obd_export *exp,
+                       unsigned long flags)
+{
+        struct obd_device *cli_obd;
+        int rc = 0;
+        ENTRY;
+
+        cli_obd = class_exp2obd(exp);
+        cli_obd->obd_no_recov = obd->obd_no_recov;
+        
+        rc = obd_disconnect(exp, flags);
+        if (rc) {
+                CERROR("error disconnecting from %s, err %d\n",
+                       cli_obd->obd_name, rc);
+                class_export_put(exp);
+        }
+        RETURN(rc);
+}
+
+#define COBD_CONNECT (1 << 0)
+#define COBD_DISCON  (1 << 1)
+#define COBD_SWITCH  (1 << 2)
+
+/* magic function for switching cobd between two exports cache and master in
+ * strong correspondence with passed @cache_on. It also may perform partial
+ * actions like only turn off old export or only turn on new one.
+ *
+ * bias == COBD_CONNECT only connect new export (used in cobd_connect())
+ * bias == COBD_DISCON only disconnect old export (used in cobd_disconnect())
+ *
+ * bias == COBD_SWITCH do both (disconnect old and connect new). This will also
+ * set ->cache_on  to passed @cache_on value.
+ */
+static int cobd_switch(struct obd_device *obd,
+                       int cache_on, int bias)
+{
+        struct cache_obd *cobd = &obd->u.cobd;
+        struct obd_device *cli_obd = NULL;
+        struct lustre_handle conn = {0,};
+        struct obd_export *discon_exp;
+        struct obd_export *conn_exp;
+        int rc = 0;
+        ENTRY;
+
+        if (cache_on) {
+                discon_exp = cobd->master_real_exp;
+                conn_exp = cobd->cache_exp;
+        } else {
+                discon_exp = cobd->cache_real_exp;
+                conn_exp = cobd->master_exp;
+        }
+
+        /* disconnect old export */
+        if (bias == COBD_SWITCH || bias == COBD_DISCON) {
+                if (discon_exp) {
+                        rc = cobd_disconnect_client(obd, discon_exp, 0);
+                        if (rc) {
+                                CWARN("can't disconnect export %p, err %d\n",
+                                      discon_exp, rc);
+                        }
+                }
+                
+                if (cache_on)
+                        cobd->master_real_exp = NULL;
+                else
+                        cobd->cache_real_exp = NULL; 
+        }
+
+        /* connect new export */
+        if (bias == COBD_SWITCH || bias == COBD_CONNECT) {
+                int connected;
+
+                connected = cache_on ? (cobd->cache_real_exp != NULL) :
+                        (cobd->master_real_exp != NULL);
+
+                /* correct export already may be connected */
+                if (!connected) {
+                        rc = cobd_connect_client(obd, conn_exp, &conn,
+                                                 NULL, OBD_OPT_REAL_CLIENT);
+                        if (rc) {
+                                CERROR("can't connect export %p, err %d\n",
+                                       conn_exp, rc);
+                                RETURN(rc);
+                        }
+
+                        if (cache_on) {
+                                cobd->cache_real_exp = class_conn2export(&conn);
+                                cli_obd = class_exp2obd(cobd->cache_exp);
+                        } else {
+                                cobd->master_real_exp = class_conn2export(&conn);
+                                cli_obd = class_exp2obd(cobd->master_exp);
+                        }
+
+                        /* change flag only if connect is allowed to keep
+                         * ->cache_on coherent with real export connected. */
+                        cobd->cache_on = cache_on;
+                
+                        /* re-init EA size for new selected export. This should
+                         * be done after assigining new state to @cobd->cache_on
+                         * to not call disconnected old export. */
+                        if (obd_md_type(cli_obd)) {
+                                rc = cobd_init_dt_desc(obd);
+                                if (rc == 0) {
+                                        rc = cobd_init_ea_size(obd);
+                                        if (rc) {
+                                                CERROR("can't initialize EA size, "
+                                                       "err %d\n", rc);
+                                        }
+                                } else {
+                                        CERROR("can't initialize data lovdesc, "
+                                               "err %d\n", rc);
+                                        /* ignore cases when we did not manage
+                                         * to init lovdesc. This is because some
+                                         * devices may not know "lovdesc" info
+                                         * command. */
+                                        rc = 0;
+                                }
+                        }
+                }
+        }
+
+        RETURN(rc);
 }
 
 static int
 cobd_connect(struct lustre_handle *conn, struct obd_device *obd,
-             struct obd_uuid *cluuid, unsigned long connect_flags)
+             struct obd_uuid *cluuid, struct obd_connect_data *data,
+             unsigned long flags)
 {
-        return class_connect(conn, obd, cluuid);
+        struct cache_obd *cobd = &obd->u.cobd;
+        struct obd_export *exp;
+        int rc = 0;
+        ENTRY;
+
+        rc = class_connect(conn, obd, cluuid);
+        if (rc)
+                RETURN(rc);
+
+        exp = class_conn2export(conn);
+        rc = cobd_switch(obd, cobd->cache_on,
+                         COBD_CONNECT);
+        if (rc)
+                class_disconnect(exp, 0);
+        else
+                class_export_put(exp);
+        RETURN(rc);
 }
 
-static int cobd_disconnect(struct obd_export *exp, unsigned long flags)
+static int
+cobd_disconnect(struct obd_export *exp, unsigned long flags)
 {
-        return class_disconnect(exp, flags);
+        struct cache_obd *cobd;
+        struct obd_device *obd;
+        int rc = 0;
+        ENTRY;
+        
+        LASSERT(exp != NULL);
+        obd = class_exp2obd(exp);
+        if (obd == NULL) {
+                CDEBUG(D_IOCTL, "invalid client cookie "
+                       LPX64"\n", exp->exp_handle.h_cookie);
+                RETURN(-EINVAL);
+        }
+
+        /* here would be nice also to check that disconnect goes to the same
+         * export as connect did. But as now we are accepting the notion that
+         * cache should be switched after client umount this is not needed.
+         * --umka. */
+        cobd = &obd->u.cobd;
+        rc = cobd_switch(obd, !cobd->cache_on, COBD_DISCON);
+        class_disconnect(exp, flags);
+
+        RETURN(rc);
 }
 
-static int cobd_get_info(struct obd_export *exp, obd_count keylen,
+static int cobd_get_info(struct obd_export *exp, __u32 keylen,
                          void *key, __u32 *vallen, void *val)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
         
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
 
         /* intercept cache utilisation info? */
-        return obd_get_info(cobd_exp, keylen, key, vallen, val);
+        rc = obd_get_info(cobd_exp, keylen, key, vallen, val);
+        RETURN(rc);
 }
 
 static int cobd_set_info(struct obd_export *exp, obd_count keylen,
@@ -221,647 +485,756 @@ static int cobd_set_info(struct obd_export *exp, obd_count keylen,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
+       
+        LASSERT(cobd_exp);
         
         /* intercept cache utilisation info? */
-        return obd_set_info(cobd_exp, keylen, key, vallen, val);
+        rc = obd_set_info(cobd_exp, keylen, key, vallen, val);
+        RETURN(rc);
 }
 
-static int cobd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
+static int cobd_statfs(struct obd_device *obd,
+                       struct obd_statfs *osfs,
                        unsigned long max_age)
 {
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         cobd_exp = cobd_get_exp(obd);
+        rc = obd_statfs(class_exp2obd(cobd_exp), osfs, max_age);
+        RETURN(rc);
+}
+
+static int cobd_iocontrol(unsigned int cmd, struct obd_export *exp,
+                          int len, void *karg, void *uarg)
+{
+        struct obd_device *obd = class_exp2obd(exp);
+        struct cache_obd  *cobd = &obd->u.cobd;
+        struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
-        return obd_statfs(class_exp2obd(cobd_exp), osfs, max_age);
+        down(&cobd->sem);
+        
+        /* here would be nice also to make sure somehow that there are no
+         * out-standing requests which go to wrong MDS after cache switch (close
+         * RPCs). But how to check that from COBD? I do not know. --umka */
+        switch (cmd) {
+        case OBD_IOC_COBD_CON:
+                if (!cobd->cache_on)
+                        rc = cobd_switch(obd, 1, COBD_SWITCH);
+                break;
+        case OBD_IOC_COBD_COFF: 
+                if (cobd->cache_on)
+                        rc = cobd_switch(obd, 0, COBD_SWITCH);
+                break;
+        default:
+                cobd_exp = cobd_get_exp(obd);
+                rc = obd_iocontrol(cmd, cobd_exp, len, karg, uarg);
+        }
+
+        up(&cobd->sem);
+        RETURN(rc);
 }
 
-static int cobd_packmd(struct obd_export *exp,
-                       struct lov_mds_md **disk_tgt,
-                       struct lov_stripe_md *mem_src)
+static int cobd_notify(struct obd_device *obd, struct obd_device *watched,
+                       int active, void *data)
+{
+        struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
+
+        cobd_exp = cobd_get_exp(obd);
+        rc = obd_notify(class_exp2obd(cobd_exp), watched, active, data);
+        RETURN(rc);
+}
+
+static int cobd_pin(struct obd_export *exp, obd_id ino, __u32 gen,
+                    int type, struct obd_client_handle *handle,
+                    int flag)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_packmd(cobd_exp, disk_tgt, mem_src);
+        rc = obd_pin(cobd_exp, ino, gen, type, handle, flag);
+        RETURN(rc);
 }
 
-static int cobd_unpackmd(struct obd_export *exp,
-                         struct lov_stripe_md **mem_tgt,
-                         struct lov_mds_md *disk_src,
-                         int disk_len)
+static int cobd_unpin(struct obd_export *exp,
+                      struct obd_client_handle *handle,
+                      int flag)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_unpackmd(cobd_exp, mem_tgt, disk_src, disk_len);
+        rc = obd_unpin(cobd_exp, handle, flag);
+        RETURN(rc);
 }
 
-static int cobd_create(struct obd_export *exp, struct obdo *obdo,
-                       struct lov_stripe_md **ea,
-                       struct obd_trans_info *oti)
+/* data related stuff */
+static int cobd_dt_packmd(struct obd_export *exp,
+                          struct lov_mds_md **disk_tgt,
+                          struct lov_stripe_md *mem_src)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_create(cobd_exp, obdo, ea, oti); 
+        rc = obd_packmd(cobd_exp, disk_tgt, mem_src);
+        RETURN(rc);
 }
 
-static int cobd_destroy(struct obd_export *exp, struct obdo *obdo,
-                        struct lov_stripe_md *ea,
-                        struct obd_trans_info *oti)
+static int cobd_dt_unpackmd(struct obd_export *exp,
+                            struct lov_stripe_md **mem_tgt,
+                            struct lov_mds_md *disk_src,
+                            int disk_len)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_destroy(cobd_exp, obdo, ea, oti); 
+        rc = obd_unpackmd(cobd_exp, mem_tgt, disk_src, disk_len);
+        RETURN(rc);
 }
 
-static int cobd_precleanup(struct obd_device *obd, int flags)
+static int cobd_dt_create(struct obd_export *exp,
+                          struct obdo *obdo,
+                          void *acl, int acl_size,
+                          struct lov_stripe_md **ea,
+                          struct obd_trans_info *oti)
 {
-        /* FIXME-WANGDI: do we need some cleanup here? */
-        return 0;
+        struct obd_device *obd = class_exp2obd(exp);
+        struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
+
+        if (obd == NULL) {
+                CERROR("invalid client cookie "LPX64"\n", 
+                       exp->exp_handle.h_cookie);
+                RETURN(-EINVAL);
+        }
+        cobd_exp = cobd_get_exp(obd);
+        rc = obd_create(cobd_exp, obdo, acl, acl_size, ea, oti);
+        RETURN(rc);
 }
 
-static int cobd_getattr(struct obd_export *exp, struct obdo *oa,
-                        struct lov_stripe_md *lsm)
+static int cobd_dt_destroy(struct obd_export *exp,
+                           struct obdo *obdo,
+                           struct lov_stripe_md *ea,
+                           struct obd_trans_info *oti)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_getattr(cobd_exp, oa, lsm);
+        rc = obd_destroy(cobd_exp, obdo, ea, oti);
+        RETURN(rc);
+}
+
+static int cobd_dt_precleanup(struct obd_device *obd, int flags)
+{
+        /* FIXME: do we need some cleanup here? */
+        ENTRY;
+        RETURN(0);
 }
 
-static int cobd_getattr_async(struct obd_export *exp,
-                              struct obdo *obdo, struct lov_stripe_md *ea,
-                              struct ptlrpc_request_set *set)
+static int cobd_dt_getattr(struct obd_export *exp, struct obdo *oa,
+                           struct lov_stripe_md *ea)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_getattr_async(cobd_exp, obdo, ea, set);
+        rc = obd_getattr(cobd_exp, oa, ea);
+        RETURN(rc);
 }
 
-static int cobd_setattr(struct obd_export *exp, struct obdo *obdo,
-                        struct lov_stripe_md *ea,
-                        struct obd_trans_info *oti)
+static int cobd_dt_getattr_async(struct obd_export *exp,
+                                 struct obdo *obdo, struct lov_stripe_md *ea,
+                                 struct ptlrpc_request_set *set)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_setattr(cobd_exp, obdo, ea, oti);
+        rc = obd_getattr_async(cobd_exp, obdo, ea, set);
+        RETURN(rc);
 }
 
-static int cobd_md_getstatus(struct obd_export *exp,
-                             struct lustre_id *rootid)
+static int cobd_dt_setattr(struct obd_export *exp, struct obdo *obdo,
+                           struct lov_stripe_md *ea,
+                           struct obd_trans_info *oti,
+                           struct lustre_capa *capa)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_getstatus(cobd_exp, rootid);
+        rc = obd_setattr(cobd_exp, obdo, ea, oti, capa);
+        RETURN(rc);
 }
 
-static int cobd_brw(int cmd, struct obd_export *exp, struct obdo *oa,
-                    struct lov_stripe_md *ea, obd_count oa_bufs,
-                    struct brw_page *pg, struct obd_trans_info *oti)
+static int cobd_dt_brw(int cmd, struct obd_export *exp, struct obdo *oa,
+                       struct lov_stripe_md *ea, obd_count oa_bufs,
+                       struct brw_page *pg, struct obd_trans_info *oti)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_brw(cmd, cobd_exp, oa, ea, oa_bufs, pg, oti);
+        rc = obd_brw(cmd, cobd_exp, oa, ea, oa_bufs, pg, oti);
+        RETURN(rc);
 }
 
-static int cobd_brw_async(int cmd, struct obd_export *exp,
-                          struct obdo *oa, struct lov_stripe_md *ea,
-                          obd_count oa_bufs, struct brw_page *pg,
-                          struct ptlrpc_request_set *set,
-                          struct obd_trans_info *oti)
+static int cobd_dt_brw_async(int cmd, struct obd_export *exp,
+                             struct obdo *oa, struct lov_stripe_md *ea,
+                             obd_count oa_bufs, struct brw_page *pg,
+                             struct ptlrpc_request_set *set,
+                             struct obd_trans_info *oti)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_brw_async(cmd, cobd_exp, oa, ea, oa_bufs, 
-                             pg, set, oti);
+        rc = obd_brw_async(cmd, cobd_exp, oa, ea, oa_bufs, 
+                           pg, set, oti);
+        RETURN(rc);
 }
 
-static int cobd_prep_async_page(struct obd_export *exp, 
-                                struct lov_stripe_md *lsm,
-                                struct lov_oinfo *loi, 
-                                struct page *page, obd_off offset, 
-                                struct obd_async_page_ops *ops, 
-                                void *data, void **res)
+static int cobd_dt_prep_async_page(struct obd_export *exp, 
+                                   struct lov_stripe_md *lsm,
+                                   struct lov_oinfo *loi, 
+                                   struct page *page, obd_off offset, 
+                                   struct obd_async_page_ops *ops, 
+                                   void *data, void **res)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_prep_async_page(cobd_exp, lsm, loi, page, offset,
-                                   ops, data, res);
+        rc = obd_prep_async_page(cobd_exp, lsm, loi, page,
+                                 offset, ops, data, res);
+        RETURN(rc);
 }
 
-static int cobd_queue_async_io(struct obd_export *exp,
-                               struct lov_stripe_md *lsm,
-                               struct lov_oinfo *loi, void *cookie,
-                               int cmd, obd_off off, int count,
-                               obd_flags brw_flags, obd_flags async_flags)
+static int cobd_dt_queue_async_io(struct obd_export *exp,
+                                  struct lov_stripe_md *lsm,
+                                  struct lov_oinfo *loi, void *cookie,
+                                  int cmd, obd_off off, int count,
+                                  obd_flags brw_flags, obd_flags async_flags)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_queue_async_io(cobd_exp, lsm, loi, cookie, cmd, off, count,
-                                  brw_flags, async_flags);
+        rc = obd_queue_async_io(cobd_exp, lsm, loi, cookie, cmd, off,
+                                count, brw_flags, async_flags);
+        RETURN(rc);
 }
 
-static int cobd_set_async_flags(struct obd_export *exp,
-                               struct lov_stripe_md *lsm,
-                               struct lov_oinfo *loi, void *cookie,
-                               obd_flags async_flags)
+static int cobd_dt_set_async_flags(struct obd_export *exp,
+                                   struct lov_stripe_md *lsm,
+                                   struct lov_oinfo *loi, void *cookie,
+                                   obd_flags async_flags)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_set_async_flags(cobd_exp, lsm, loi, cookie, async_flags);
+        rc = obd_set_async_flags(cobd_exp, lsm, loi, cookie,
+                                 async_flags);
+        RETURN(rc);
 }
 
-static int cobd_queue_group_io(struct obd_export *exp, 
-                               struct lov_stripe_md *lsm, 
-                               struct lov_oinfo *loi, 
-                               struct obd_io_group *oig, 
-                               void *cookie, int cmd, obd_off off, 
-                               int count, obd_flags brw_flags,
-                               obd_flags async_flags)
+static int cobd_dt_queue_group_io(struct obd_export *exp, 
+                                  struct lov_stripe_md *lsm, 
+                                  struct lov_oinfo *loi, 
+                                  struct obd_io_group *oig, 
+                                  void *cookie, int cmd, obd_off off, 
+                                  int count, obd_flags brw_flags,
+                                  obd_flags async_flags)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_queue_group_io(cobd_exp, lsm, loi, oig, cookie,
-                                  cmd, off, count, brw_flags, async_flags);
+        rc = obd_queue_group_io(cobd_exp, lsm, loi, oig, cookie,
+                                cmd, off, count, brw_flags,
+                                async_flags);
+        RETURN(rc);
 }
 
-static int cobd_trigger_group_io(struct obd_export *exp, 
-                                 struct lov_stripe_md *lsm, 
-                                 struct lov_oinfo *loi,
-                                 struct obd_io_group *oig)
+static int cobd_dt_trigger_group_io(struct obd_export *exp, 
+                                    struct lov_stripe_md *lsm, 
+                                    struct lov_oinfo *loi,
+                                    struct obd_io_group *oig)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_trigger_group_io(cobd_exp, lsm, loi, oig); 
+        rc = obd_trigger_group_io(cobd_exp, lsm, loi, oig);
+        RETURN(rc);
 }
 
-static int cobd_teardown_async_page(struct obd_export *exp,
-                                    struct lov_stripe_md *lsm,
-                                    struct lov_oinfo *loi, void *cookie)
+static int cobd_dt_teardown_async_page(struct obd_export *exp,
+                                       struct lov_stripe_md *lsm,
+                                       struct lov_oinfo *loi,
+                                       void *cookie)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_teardown_async_page(cobd_exp, lsm, loi, cookie);
+        rc = obd_teardown_async_page(cobd_exp, lsm, loi, cookie);
+        RETURN(rc);
 }
 
-static int cobd_punch(struct obd_export *exp, struct obdo *oa,
-                      struct lov_stripe_md *ea, obd_size start,
-                      obd_size end, struct obd_trans_info *oti)
+static int cobd_dt_punch(struct obd_export *exp, struct obdo *oa,
+                         struct lov_stripe_md *ea, obd_size start,
+                         obd_size end, struct obd_trans_info *oti,
+                         struct lustre_capa *capa)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_punch(cobd_exp, oa, ea, start, end, oti);
+        rc = obd_punch(cobd_exp, oa, ea, start, end, oti, capa);
+        RETURN(rc);
 }
 
-static int cobd_sync(struct obd_export *exp, struct obdo *oa,
-                     struct lov_stripe_md *ea, obd_size start, 
-                     obd_size end)
+static int cobd_dt_sync(struct obd_export *exp, struct obdo *oa,
+                        struct lov_stripe_md *ea, obd_size start, 
+                        obd_size end)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_sync(cobd_exp, oa, ea, start, end);
+        rc = obd_sync(cobd_exp, oa, ea, start, end);
+        RETURN(rc);
 }
 
-static int cobd_enqueue(struct obd_export *exp, struct lov_stripe_md *ea,
-                        __u32 type, ldlm_policy_data_t *policy,
-                        __u32 mode, int *flags, void *bl_cb, void *cp_cb,
-                        void *gl_cb, void *data, __u32 lvb_len,
-                        void *lvb_swabber, struct lustre_handle *lockh)
+static int cobd_dt_enqueue(struct obd_export *exp, struct lov_stripe_md *ea,
+                           __u32 type, ldlm_policy_data_t *policy,
+                           __u32 mode, int *flags, void *bl_cb, void *cp_cb,
+                           void *gl_cb, void *data, __u32 lvb_len,
+                           void *lvb_swabber, struct lustre_handle *lockh)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_enqueue(cobd_exp, ea, type, policy, mode, flags, 
-                           bl_cb, cp_cb, gl_cb, data, lvb_len,
-                           lvb_swabber, lockh);
+        rc = obd_enqueue(cobd_exp, ea, type, policy, mode, flags, 
+                         bl_cb, cp_cb, gl_cb, data, lvb_len,
+                         lvb_swabber, lockh);
+        RETURN(rc);
 }
 
-static int cobd_match(struct obd_export *exp, struct lov_stripe_md *ea,
-                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
-                      int *flags, void *data, struct lustre_handle *lockh)
+static int cobd_dt_match(struct obd_export *exp, struct lov_stripe_md *ea,
+                         __u32 type, ldlm_policy_data_t *policy, __u32 mode,
+                         int *flags, void *data, struct lustre_handle *lockh)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_match(cobd_exp, ea, type, policy, mode, flags, data,
-                         lockh); 
+        rc = obd_match(cobd_exp, ea, type, policy, mode, flags,
+                       data, lockh);
+        RETURN(rc);
 }
-static int cobd_change_cbdata(struct obd_export *exp,
-                              struct lov_stripe_md *lsm, 
-                              ldlm_iterator_t it, void *data)
+static int cobd_dt_change_cbdata(struct obd_export *exp,
+                                 struct lov_stripe_md *lsm, 
+                                 ldlm_iterator_t it, void *data)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_change_cbdata(cobd_exp, lsm, it, data);
+        rc = obd_change_cbdata(cobd_exp, lsm, it, data);
+        RETURN(rc);
 }
 
-static int cobd_cancel(struct obd_export *exp,
-                       struct lov_stripe_md *ea, __u32 mode,
-                       struct lustre_handle *lockh)
+static int cobd_dt_cancel(struct obd_export *exp,
+                          struct lov_stripe_md *ea, __u32 mode,
+                          struct lustre_handle *lockh)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_cancel(cobd_exp, ea, mode, lockh);
+        rc = obd_cancel(cobd_exp, ea, mode, lockh);
+        RETURN(rc);
 }
 
-static int cobd_cancel_unused(struct obd_export *exp,
-                              struct lov_stripe_md *ea, int flags,
-                              void *opaque)
+static int cobd_dt_cancel_unused(struct obd_export *exp,
+                                 struct lov_stripe_md *ea,
+                                 int flags, void *opaque)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
-
+        int rc = 0;
+        ENTRY;
+        
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_cancel_unused(cobd_exp, ea, flags, opaque);
+        rc = obd_cancel_unused(cobd_exp, ea, flags, opaque);
+        RETURN(rc);
 }
 
-static int cobd_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
-                       int objcount, struct obd_ioobj *obj,
-                       int niocount, struct niobuf_remote *nb,
-                       struct niobuf_local *res, struct obd_trans_info *oti)
+static int cobd_dt_preprw(int cmd, struct obd_export *exp,
+                          struct obdo *oa, int objcount,
+                          struct obd_ioobj *obj, int niocount,
+                          struct niobuf_remote *nb,
+                          struct niobuf_local *res,
+                          struct obd_trans_info *oti,
+                          struct lustre_capa *capa)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_preprw(cmd, cobd_exp, oa, objcount, obj, niocount, nb, 
-                          res, oti);
+        rc = obd_preprw(cmd, cobd_exp, oa, objcount, obj,
+                          niocount, nb, res, oti, capa);
+        RETURN(rc);
 }
 
-static int cobd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
-                         int objcount, struct obd_ioobj *obj,
-                         int niocount, struct niobuf_local *local,
-                         struct obd_trans_info *oti, int rc)
+static int cobd_dt_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
+                            int objcount, struct obd_ioobj *obj,
+                            int niocount, struct niobuf_local *local,
+                            struct obd_trans_info *oti, int rc)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int err = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return obd_commitrw(cmd, cobd_exp, oa, objcount, obj, niocount, 
-                            local, oti, rc);
+        err = obd_commitrw(cmd, cobd_exp, oa, objcount, obj,
+                           niocount, local, oti, rc);
+        RETURN(err);
 }
 
-static int cobd_flush(struct obd_device *obd)
-{
-        /* flush the filesystem from the cache to the real device. */
-        return 0; 
-}
-
-static int cobd_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
-                          void *karg, void *uarg)
+static int cobd_dt_adjust_kms(struct obd_export *exp,
+                              struct lov_stripe_md *lsm,
+                              obd_off size, int shrink)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        struct cache_obd  *cobd = &obd->u.cobd;
-        struct obd_device *master_dev = NULL;
         struct obd_export *cobd_exp;
         int rc = 0;
-        switch (cmd) {
-        case OBD_IOC_COBD_CON:
-                if (!cobd->cache_on) {
-                        struct lustre_handle cache_conn = {0,};
-                        
-                        rc = obd_disconnect(cobd->master_exp, 0);
-                        if (rc != 0)
-                                CERROR("error %d disconnecting master\n", rc);
-                        rc = connect_to_obd(cobd->cache_name, &cache_conn);
-                        if (rc != 0)
-                                RETURN(rc); 
-                        cobd->cache_exp = class_conn2export(&cache_conn);
-                        
-                        cobd->cache_on = 1;
-                }
-                break;
-        case OBD_IOC_COBD_COFF: 
-                if (cobd->cache_on) {
-                        struct lustre_handle master_conn = {0,};
-                        struct obd_device *cache_dev = NULL;
-                        int m_easize, m_cooksize;
-
-                        cache_dev = class_exp2obd(cobd->cache_exp); 
-                        m_easize = cache_dev->u.cli.cl_max_mds_easize; 
-                        m_cooksize = cache_dev->u.cli.cl_max_mds_cookiesize; 
-                        rc = obd_disconnect(cobd->cache_exp, 0);
-                        if (rc != 0)
-                                CERROR("error %d disconnecting master\n", rc);
-
-                        /* FIXME-WANGDI: should we read from master_dev? */
-                        
-                        rc = connect_to_obd(cobd->master_name, &master_conn);
-                        if (rc != 0)
-                                RETURN(rc); 
-                        cobd->master_exp = class_conn2export(&master_conn);
-                        master_dev = class_exp2obd(cobd->master_exp);
-                        master_dev->u.cli.cl_max_mds_easize = m_easize;
-                        master_dev->u.cli.cl_max_mds_cookiesize = m_cooksize;
-                        cobd->cache_on = 0;
-                }
-                break;
-        case OBD_IOC_COBD_CFLUSH:
-                if (cobd->cache_on) {
-                        cobd->cache_on = 0;
-                        cobd_flush(obd);
-                }
-                break;
-        default:
-                cobd_exp = cobd_get_exp(obd);
-                rc = obd_iocontrol(cmd, cobd_exp, len, karg, uarg);
+        ENTRY;
+
+        if (obd == NULL) {
+                CERROR("invalid client cookie "LPX64"\n", 
+                       exp->exp_handle.h_cookie);
+                RETURN(-EINVAL);
         }
+        cobd_exp = cobd_get_exp(obd);
+        rc = obd_adjust_kms(cobd_exp, lsm, size, shrink);
         
-        return rc;
+        RETURN(rc);
 }
 
-static int cobd_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
-                          struct obd_device *disk_obd, int count, 
-                          struct llog_catid *logid)
+static int cobd_dt_llog_init(struct obd_device *obd,
+                             struct obd_llogs *llogs, 
+                             struct obd_device *disk_obd,
+                             int count, struct llog_catid *logid)
 {
         struct obd_export *cobd_exp;
         struct obd_device *cobd_obd;
+        int rc = 0;
+        ENTRY;
 
         cobd_exp = cobd_get_exp(obd);
         cobd_obd = class_exp2obd(cobd_exp);
         
-        return obd_llog_init(cobd_obd, &cobd_obd->obd_llogs, 
-                             disk_obd, count, logid);
+        rc = obd_llog_init(cobd_obd, &cobd_obd->obd_llogs, 
+                           disk_obd, count, logid);
+        RETURN(rc);
 }
 
-static int cobd_llog_finish(struct obd_device *obd, struct obd_llogs *llogs, 
-                            int count)
+static int cobd_dt_llog_finish(struct obd_device *obd,
+                               struct obd_llogs *llogs, 
+                               int count)
 {
         struct obd_export *cobd_exp;
         struct obd_device *cobd_obd;
+        int rc = 0;
+        ENTRY;
 
         cobd_exp = cobd_get_exp(obd);
         cobd_obd = class_exp2obd(cobd_exp);
 
-        return obd_llog_finish(cobd_obd, &cobd_obd->obd_llogs, count);
+        rc = obd_llog_finish(cobd_obd, &cobd_obd->obd_llogs, count);
+        RETURN(rc);
 }
 
-static int cobd_notify(struct obd_device *obd, struct obd_device *watched,
-                       int active, void *data)
+static int cobd_dt_init_ea_size(struct obd_export *exp, int easize,
+                                int cookiesize)
 {
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
-        cobd_exp = cobd_get_exp(obd);
-
-        return obd_notify(class_exp2obd(cobd_exp), watched, active, data);
+        cobd_exp = cobd_get_exp(exp->exp_obd);
+        rc = obd_init_ea_size(cobd_exp, easize, cookiesize);
+        RETURN(rc);
 }
 
-static int cobd_pin(struct obd_export *exp, obd_id ino, __u32 gen,
-                    int type, struct obd_client_handle *handle, int flag)
+static int cobd_dt_import_event(struct obd_device *obd,
+                                struct obd_import *imp,
+                                enum obd_import_event event)
 {
-        struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        ENTRY;
 
-        if (obd == NULL) {
-                CERROR("invalid client cookie "LPX64"\n", 
-                       exp->exp_handle.h_cookie);
-                return -EINVAL;
-        }
         cobd_exp = cobd_get_exp(obd);
-
-        return obd_pin(cobd_exp, ino, gen, type, handle, flag);
+        obd_import_event(class_exp2obd(cobd_exp), imp, event);
+        RETURN(0); 
 }
 
-static int cobd_unpin(struct obd_export *exp,
-                      struct obd_client_handle *handle, int flag)
+/* metadata related stuff */
+static int cobd_md_getstatus(struct obd_export *exp,
+                             struct lustre_id *rootid)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-
-        return obd_unpin(cobd_exp, handle, flag);
-}
-
-static int cobd_init_ea_size(struct obd_export *exp, int easize, int cookiesize)
-{
-        struct obd_export *cobd_exp;
-
-        cobd_exp = cobd_get_exp(exp->exp_obd);
-        return obd_init_ea_size(cobd_exp, easize, cookiesize);
-}
-
-static int  cobd_import_event(struct obd_device *obd,
-                              struct obd_import *imp,
-                              enum obd_import_event event)
-{
-        struct obd_export *cobd_exp;
-
-        cobd_exp = cobd_get_exp(obd);
-
-        obd_import_event(class_exp2obd(cobd_exp), imp, event);
-        
-        return 0; 
+        rc = md_getstatus(cobd_exp, rootid);
+        RETURN(rc);
 }
 
 static int cobd_md_getattr(struct obd_export *exp, struct lustre_id *id,
-                           __u64 valid, unsigned int ea_size,
+                           __u64 valid, const char *xattr_name,
+                           const void *xattr_data, unsigned int xattr_datalen,
+                           unsigned int ea_size, struct obd_capa *ocapa,
                            struct ptlrpc_request **request)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_getattr(cobd_exp, id, valid, ea_size, request);
+        rc = md_getattr(cobd_exp, id, valid, xattr_name, xattr_data,
+                          xattr_datalen, ea_size, ocapa, request);
+        RETURN(rc);
 }
 
-static int cobd_md_req2lustre_md (struct obd_export *mdc_exp, 
-                                  struct ptlrpc_request *req, unsigned int offset,
-                                  struct obd_export *osc_exp, struct lustre_md *md)
+static int cobd_md_req2lustre_md(struct obd_export *mdc_exp, 
+                                 struct ptlrpc_request *req,
+                                 unsigned int offset,
+                                 struct obd_export *osc_exp,
+                                 struct lustre_md *md)
 {
         struct obd_device *obd = class_exp2obd(mdc_exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        mdc_exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_req2lustre_md(cobd_exp, req, offset, osc_exp, md);
+        rc = md_req2lustre_md(cobd_exp, req, offset, osc_exp, md);
+        RETURN(rc);
 }
 
 static int cobd_md_change_cbdata(struct obd_export *exp, struct lustre_id *id, 
@@ -869,14 +1242,17 @@ static int cobd_md_change_cbdata(struct obd_export *exp, struct lustre_id *id,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_change_cbdata(cobd_exp, id, it, data);
+        rc = md_change_cbdata(cobd_exp, id, it, data);
+        RETURN(rc);
 }
 
 static int cobd_md_getattr_lock(struct obd_export *exp, struct lustre_id *id,
@@ -885,15 +1261,18 @@ static int cobd_md_getattr_lock(struct obd_export *exp, struct lustre_id *id,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_getattr_lock(cobd_exp, id, filename, namelen, valid,
-                               ea_size, request);
+        rc = md_getattr_lock(cobd_exp, id, filename, namelen,
+                             valid, ea_size, request);
+        RETURN(rc);
 }
 
 static int cobd_md_create(struct obd_export *exp, struct mdc_op_data *op_data,
@@ -903,30 +1282,37 @@ static int cobd_md_create(struct obd_export *exp, struct mdc_op_data *op_data,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_create(cobd_exp, op_data, data, datalen, mode,
-                         uid, gid, rdev, request);
+        rc = md_create(cobd_exp, op_data, data, datalen, mode,
+                       uid, gid, rdev, request);
+        RETURN(rc);
 }
 
-static int cobd_md_unlink(struct obd_export *exp, struct mdc_op_data *data,
+static int cobd_md_unlink(struct obd_export *exp,
+                          struct mdc_op_data *data,
                           struct ptlrpc_request **request)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_unlink(cobd_exp, data, request);
+        rc = md_unlink(cobd_exp, data, request);
+        RETURN(rc);
 }
 
 static int cobd_md_valid_attrs(struct obd_export *exp,
@@ -934,14 +1320,17 @@ static int cobd_md_valid_attrs(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_valid_attrs(cobd_exp, id);
+        rc = md_valid_attrs(cobd_exp, id);
+        RETURN(rc);
 }
 
 static int cobd_md_rename(struct obd_export *exp, struct mdc_op_data *data,
@@ -950,14 +1339,17 @@ static int cobd_md_rename(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_rename(cobd_exp, data, old, oldlen, new, newlen, request);
+        rc = md_rename(cobd_exp, data, old, oldlen, new, newlen, request);
+        RETURN(rc);
 }
 
 static int cobd_md_link(struct obd_export *exp, struct mdc_op_data *data,
@@ -965,30 +1357,38 @@ static int cobd_md_link(struct obd_export *exp, struct mdc_op_data *data,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_link(cobd_exp, data, request);
+        rc = md_link(cobd_exp, data, request);
+        RETURN(rc);
 }
 
 static int cobd_md_setattr(struct obd_export *exp, struct mdc_op_data *data,
                            struct iattr *iattr, void *ea, int ealen, void *ea2, 
-                           int ea2len, struct ptlrpc_request **request)
+                           int ea2len, void *ea3, int ea3len, 
+                           struct ptlrpc_request **request)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_setattr(cobd_exp, data, iattr, ea, ealen, ea2, ea2len, request);
+        rc = md_setattr(cobd_exp, data, iattr, ea,
+                          ealen, ea2, ea2len, ea3, ea3len, request);
+        RETURN(rc);
 }
 
 static int cobd_md_readpage(struct obd_export *exp,
@@ -998,44 +1398,54 @@ static int cobd_md_readpage(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_readpage(cobd_exp, mdc_id, offset, page, request);
+        rc = md_readpage(cobd_exp, mdc_id, offset, page, request);
+        RETURN(rc);
 }
 
-static int cobd_md_close(struct obd_export *exp, struct obdo *obdo,
+static int cobd_md_close(struct obd_export *exp, struct mdc_op_data *op_data,
                          struct obd_client_handle *och, 
                          struct ptlrpc_request **request)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_close(cobd_exp, obdo, och, request);
+        rc = md_close(cobd_exp, op_data, och, request);
+        RETURN(rc);
 }
 
-static int cobd_md_done_writing(struct obd_export *exp, struct obdo *obdo)
+static int cobd_md_done_writing(struct obd_export *exp,
+                                struct obdo *obdo)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_done_writing(cobd_exp, obdo);
+        rc = md_done_writing(cobd_exp, obdo);
+        RETURN(rc);
 }
 
 static int cobd_md_sync(struct obd_export *exp, struct lustre_id *id,
@@ -1043,15 +1453,17 @@ static int cobd_md_sync(struct obd_export *exp, struct lustre_id *id,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        
-        return md_sync(cobd_exp, id, request);
+        rc = md_sync(cobd_exp, id, request);
+        RETURN(rc);
 }
 
 static int cobd_md_set_open_replay_data(struct obd_export *exp,
@@ -1060,15 +1472,17 @@ static int cobd_md_set_open_replay_data(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        
-        return md_set_open_replay_data(cobd_exp, och, open_req);
+        rc = md_set_open_replay_data(cobd_exp, och, open_req);
+        RETURN(rc);
 }
 
 static int cobd_md_clear_open_replay_data(struct obd_export *exp,
@@ -1076,15 +1490,17 @@ static int cobd_md_clear_open_replay_data(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_clear_open_replay_data(cobd_exp, och);
+        rc = md_clear_open_replay_data(cobd_exp, och);
+        RETURN(rc);
 }
 
 static int cobd_md_store_inode_generation(struct obd_export *exp,
@@ -1093,30 +1509,35 @@ static int cobd_md_store_inode_generation(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-
-        return md_store_inode_generation(cobd_exp, req, reqoff, repoff);
+        rc = md_store_inode_generation(cobd_exp, req, reqoff, repoff);
+        RETURN(rc);
 }
 
-static int cobd_md_set_lock_data(struct obd_export *exp, __u64 *l, void *data)
+static int cobd_md_set_lock_data(struct obd_export *exp,
+                                 __u64 *l, void *data)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-
-        return md_set_lock_data(cobd_exp, l, data);
+        rc = md_set_lock_data(cobd_exp, l, data);
+        RETURN(rc);
 }
 
 static int cobd_md_enqueue(struct obd_export *exp, int lock_type,
@@ -1128,16 +1549,19 @@ static int cobd_md_enqueue(struct obd_export *exp, int lock_type,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_enqueue(cobd_exp, lock_type, it, lock_mode, data,
-                          lockh, lmm, lmmsize, cb_completion, cb_blocking,
-                          cb_data);
+        rc = md_enqueue(cobd_exp, lock_type, it, lock_mode, data,
+                        lockh, lmm, lmmsize, cb_completion, cb_blocking,
+                        cb_data);
+        RETURN(rc);
 }
 
 static int cobd_md_intent_lock(struct obd_export *exp, struct lustre_id *pid, 
@@ -1148,30 +1572,36 @@ static int cobd_md_intent_lock(struct obd_export *exp, struct lustre_id *pid,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
+        lookup_flags |= LOOKUP_COBD;
         cobd_exp = cobd_get_exp(obd);
-        return md_intent_lock(cobd_exp, pid, name, len, lmm, lmmsize,
-                              cid, it, lookup_flags, reqp, cb_blocking);
+        
+        rc = md_intent_lock(cobd_exp, pid, name, len, lmm, lmmsize,
+                            cid, it, lookup_flags, reqp, cb_blocking);
+        RETURN(rc);
 }
 
 static struct obd_device *cobd_md_get_real_obd(struct obd_export *exp,
-                                               char *name, int len)
+                                               struct lustre_id *id)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        ENTRY;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return NULL;
+                RETURN(NULL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_get_real_obd(cobd_exp, name, len);
+        RETURN(md_get_real_obd(cobd_exp, id));
 }
 
 static int cobd_md_change_cbdata_name(struct obd_export *exp,
@@ -1181,88 +1611,91 @@ static int cobd_md_change_cbdata_name(struct obd_export *exp,
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct obd_export *cobd_exp;
+        int rc = 0;
 
         if (obd == NULL) {
                 CERROR("invalid client cookie "LPX64"\n", 
                        exp->exp_handle.h_cookie);
-                return -EINVAL;
+                RETURN(-EINVAL);
         }
         cobd_exp = cobd_get_exp(obd);
-        return md_change_cbdata_name(cobd_exp, id, name, namelen,
-                                     id2, it, data);
+        rc = md_change_cbdata_name(cobd_exp, id, name, namelen,
+                                   id2, it, data);
+        RETURN(rc);
 }
+
 static struct obd_ops cobd_obd_ops = {
-        .o_owner                = THIS_MODULE,
-        .o_attach               = cobd_attach,
-        .o_detach               = cobd_detach,
-        .o_setup                = cobd_setup,
-        .o_cleanup              = cobd_cleanup,
-        .o_connect              = cobd_connect,
-        .o_disconnect           = cobd_disconnect,
-        .o_set_info             = cobd_set_info,
-        .o_get_info             = cobd_get_info,
-        .o_statfs               = cobd_statfs,
-
-        .o_packmd               = cobd_packmd,
-        .o_unpackmd             = cobd_unpackmd,
-        .o_create               = cobd_create,
-        .o_destroy              = cobd_destroy,
-        .o_precleanup           = cobd_precleanup,
-        .o_getattr              = cobd_getattr,
-        .o_getattr_async        = cobd_getattr_async,
-        .o_setattr              = cobd_setattr,
-
-        .o_brw                  = cobd_brw,
-        .o_brw_async            = cobd_brw_async,
-        .o_prep_async_page      = cobd_prep_async_page,
-        .o_queue_async_io       = cobd_queue_async_io,
-        .o_set_async_flags      = cobd_set_async_flags,
-        .o_queue_group_io       = cobd_queue_group_io,
-        .o_trigger_group_io     = cobd_trigger_group_io,
-        .o_teardown_async_page  = cobd_teardown_async_page,
-        .o_preprw               = cobd_preprw,
-        .o_punch                = cobd_punch,
-        .o_sync                 = cobd_sync,
-        .o_enqueue              = cobd_enqueue,
-        .o_match                = cobd_match,
-        .o_change_cbdata        = cobd_change_cbdata,
-        .o_cancel               = cobd_cancel,
-        .o_cancel_unused        = cobd_cancel_unused,
-        .o_iocontrol            = cobd_iocontrol,
-        .o_commitrw             = cobd_commitrw,
-        .o_llog_init            = cobd_llog_init,
-        .o_llog_finish          = cobd_llog_finish,
-        .o_notify               = cobd_notify,
-        .o_pin                  = cobd_pin,
-        .o_unpin                = cobd_unpin,
-        .o_import_event         = cobd_import_event,
-        .o_init_ea_size         = cobd_init_ea_size,
+        .o_owner                  = THIS_MODULE,
+        .o_attach                 = cobd_attach,
+        .o_detach                 = cobd_detach,
+        .o_setup                  = cobd_setup,
+        .o_cleanup                = cobd_cleanup,
+        .o_connect                = cobd_connect,
+        .o_disconnect             = cobd_disconnect,
+        .o_set_info               = cobd_set_info,
+        .o_get_info               = cobd_get_info,
+        .o_statfs                 = cobd_statfs,
+        .o_iocontrol              = cobd_iocontrol,
+        .o_notify                 = cobd_notify,
+        .o_pin                    = cobd_pin,
+        .o_unpin                  = cobd_unpin,
+
+        .o_packmd                 = cobd_dt_packmd,
+        .o_unpackmd               = cobd_dt_unpackmd,
+        .o_create                 = cobd_dt_create,
+        .o_destroy                = cobd_dt_destroy,
+        .o_precleanup             = cobd_dt_precleanup,
+        .o_getattr                = cobd_dt_getattr,
+        .o_getattr_async          = cobd_dt_getattr_async,
+        .o_setattr                = cobd_dt_setattr,
+        .o_brw                    = cobd_dt_brw,
+        .o_brw_async              = cobd_dt_brw_async,
+        .o_prep_async_page        = cobd_dt_prep_async_page,
+        .o_queue_async_io         = cobd_dt_queue_async_io,
+        .o_set_async_flags        = cobd_dt_set_async_flags,
+        .o_queue_group_io         = cobd_dt_queue_group_io,
+        .o_trigger_group_io       = cobd_dt_trigger_group_io,
+        .o_teardown_async_page    = cobd_dt_teardown_async_page,
+        .o_preprw                 = cobd_dt_preprw,
+        .o_punch                  = cobd_dt_punch,
+        .o_sync                   = cobd_dt_sync,
+        .o_enqueue                = cobd_dt_enqueue,
+        .o_match                  = cobd_dt_match,
+        .o_change_cbdata          = cobd_dt_change_cbdata,
+        .o_cancel                 = cobd_dt_cancel,
+        .o_cancel_unused          = cobd_dt_cancel_unused,
+        .o_commitrw               = cobd_dt_commitrw,
+        .o_llog_init              = cobd_dt_llog_init,
+        .o_llog_finish            = cobd_dt_llog_finish,
+        .o_import_event           = cobd_dt_import_event,
+        .o_init_ea_size           = cobd_dt_init_ea_size,
+        .o_adjust_kms             = cobd_dt_adjust_kms,
 };
 
 struct md_ops cobd_md_ops = {
-        .m_getstatus            = cobd_md_getstatus,
-        .m_getattr              = cobd_md_getattr,
-        .m_req2lustre_md        = cobd_md_req2lustre_md,
-        .m_change_cbdata        = cobd_md_change_cbdata,
-        .m_getattr_lock         = cobd_md_getattr_lock,
-        .m_create               = cobd_md_create,
-        .m_unlink               = cobd_md_unlink,
-        .m_valid_attrs          = cobd_md_valid_attrs,
-        .m_rename               = cobd_md_rename,
-        .m_link                 = cobd_md_link,
-        .m_setattr              = cobd_md_setattr,
-        .m_readpage             = cobd_md_readpage,
-        .m_close                = cobd_md_close,
-        .m_done_writing         = cobd_md_done_writing,
-        .m_sync                 = cobd_md_sync,
-        .m_set_open_replay_data = cobd_md_set_open_replay_data,
+        .m_getstatus              = cobd_md_getstatus,
+        .m_getattr                = cobd_md_getattr,
+        .m_req2lustre_md          = cobd_md_req2lustre_md,
+        .m_change_cbdata          = cobd_md_change_cbdata,
+        .m_getattr_lock           = cobd_md_getattr_lock,
+        .m_create                 = cobd_md_create,
+        .m_unlink                 = cobd_md_unlink,
+        .m_valid_attrs            = cobd_md_valid_attrs,
+        .m_rename                 = cobd_md_rename,
+        .m_link                   = cobd_md_link,
+        .m_setattr                = cobd_md_setattr,
+        .m_readpage               = cobd_md_readpage,
+        .m_close                  = cobd_md_close,
+        .m_done_writing           = cobd_md_done_writing,
+        .m_sync                   = cobd_md_sync,
+        .m_set_open_replay_data   = cobd_md_set_open_replay_data,
         .m_clear_open_replay_data = cobd_md_clear_open_replay_data,
         .m_store_inode_generation = cobd_md_store_inode_generation,
-        .m_set_lock_data        = cobd_md_set_lock_data,
-        .m_enqueue              = cobd_md_enqueue,
-        .m_get_real_obd         = cobd_md_get_real_obd,
-        .m_intent_lock          = cobd_md_intent_lock,
-        .m_change_cbdata_name   = cobd_md_change_cbdata_name,
+        .m_set_lock_data          = cobd_md_set_lock_data,
+        .m_enqueue                = cobd_md_enqueue,
+        .m_get_real_obd           = cobd_md_get_real_obd,
+        .m_intent_lock            = cobd_md_intent_lock,
+        .m_change_cbdata_name     = cobd_md_change_cbdata_name,
 };
 
 static int __init cobd_init(void)