Whamcloud - gitweb
add process_config, fix issue with stack_init/fini (thx Huang Hua) and remove
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 7c62010..b4df1e2 100644 (file)
 /* struct obd_device */
 #include <linux/obd.h>
 
-#include <linux/lu_object.h>
-
 /* struct mds_client_data */
 #include "../mds/mds_internal.h"
-#include "mdt.h"
+#include "mdt_internal.h"
 
 /*
  * Initialized in mdt_mod_init().
  */
 unsigned long mdt_num_threads;
 
-static int mdt_handle(struct ptlrpc_request *req);
+static int                mdt_handle    (struct ptlrpc_request *req);
+static struct mdt_device *mdt_dev       (struct lu_device *d);
+static struct lu_fid     *mdt_object_fid(struct mdt_object *o);
+
+static struct lu_context_key mdt_thread_key;
+
+/* object operations */
+static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
+                        struct lu_fid *pfid, const char *name,
+                        struct lu_fid *cfid)
+{
+        struct mdt_object      *o;
+        struct mdt_object      *child;
+        struct mdt_lock_handle *lh;
+
+        int result;
+
+        lh = &info->mti_lh[MDT_LH_PARENT];
+        lh->mlh_mode = LCK_PW;
+
+        o = mdt_object_find_lock(info->mti_ctxt,
+                                 d, pfid, lh, MDS_INODELOCK_UPDATE);
+        if (IS_ERR(o))
+                return PTR_ERR(o);
+
+        child = mdt_object_find(info->mti_ctxt, d, cfid);
+        if (!IS_ERR(child)) {
+                struct md_object *next = mdt_object_child(o);
+
+                result = next->mo_ops->moo_mkdir(info->mti_ctxt, next, name,
+                                                 mdt_object_child(child));
+                mdt_object_put(info->mti_ctxt, child);
+        } else
+                result = PTR_ERR(child);
+        mdt_object_unlock(d->mdt_namespace, o, lh);
+        mdt_object_put(info->mti_ctxt, o);
+        return result;
+}
 
 static int mdt_getstatus(struct mdt_thread_info *info,
                          struct ptlrpc_request *req, int offset)
 {
-        struct md_device *mdd  = info->mti_mdt->mdt_child;
-        struct mds_body  *body;
+        struct md_device *next  = info->mti_mdt->mdt_child;
+        struct mdt_body  *body;
         int               size = sizeof *body;
         int               result;
 
@@ -82,7 +117,8 @@ static int mdt_getstatus(struct mdt_thread_info *info,
                 result = -ENOMEM;
         else {
                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
-                result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
+                result = next->md_ops->mdo_root_get(info->mti_ctxt,
+                                                    next, &body->fid1);
         }
 
         /* the last_committed and last_xid fields are filled in for all
@@ -91,22 +127,129 @@ static int mdt_getstatus(struct mdt_thread_info *info,
         RETURN(result);
 }
 
-static int mdt_connect(struct mdt_thread_info *info,
-                       struct ptlrpc_request *req, int offset)
+static int mdt_statfs(struct mdt_thread_info *info,
+                      struct ptlrpc_request *req, int offset)
 {
-        return target_handle_connect(req, mdt_handle);
+        struct md_device  *next  = info->mti_mdt->mdt_child;
+        struct obd_statfs *osfs;
+        struct kstatfs    sfs;
+        int               result;
+        int               size = sizeof(struct obd_statfs);
+
+        ENTRY;
+
+        result = lustre_pack_reply(req, 1, &size, NULL);
+        if (result)
+                CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
+                       size);
+        else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
+                CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
+                result = -ENOMEM;
+        } else {
+                osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
+                /* XXX max_age optimisation is needed here. See mds_statfs */
+                result = next->md_ops->mdo_statfs(info->mti_ctxt, next, &sfs);
+                statfs_pack(osfs, &sfs);
+        }
+
+        RETURN(result);
 }
 
-static int mdt_disconnect(struct mdt_thread_info *info,
-                          struct ptlrpc_request *req, int offset)
+static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
 {
-        return -EOPNOTSUPP;
+        b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
+                    OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
+                    OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
+
+        if (!S_ISREG(attr->la_mode))
+                b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
+                            OBD_MD_FLMTIME;
+
+        b->atime      = attr->la_atime;
+        b->mtime      = attr->la_mtime;
+        b->ctime      = attr->la_ctime;
+        b->mode       = attr->la_mode;
+        b->size       = attr->la_size;
+        b->blocks     = attr->la_blocks;
+        b->uid        = attr->la_uid;
+        b->gid        = attr->la_gid;
+        b->flags      = attr->la_flags;
+        b->nlink      = attr->la_nlink;
 }
 
 static int mdt_getattr(struct mdt_thread_info *info,
                        struct ptlrpc_request *req, int offset)
 {
-        return -EOPNOTSUPP;
+        struct mdt_body *body;
+        int              size = sizeof (*body);
+        int              result;
+
+        LASSERT(info->mti_object != NULL);
+
+        ENTRY;
+
+        result = lustre_pack_reply(req, 1, &size, NULL);
+        if (result)
+                CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
+                       size, result);
+        else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
+                CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
+                result = -ENOMEM;
+        } else {
+                struct md_object *next = mdt_object_child(info->mti_object);
+
+                result = next->mo_ops->moo_attr_get(info->mti_ctxt, next,
+                                                    &info->mti_ctxt->lc_attr);
+                if (result == 0) {
+                        body = lustre_msg_buf(req->rq_repmsg, 0, size);
+                        mdt_pack_attr2body(body, &info->mti_ctxt->lc_attr);
+                        body->fid1 = *mdt_object_fid(info->mti_object);
+                }
+        }
+        RETURN(result);
+}
+
+static struct lu_device_operations mdt_lu_ops;
+
+static int lu_device_is_mdt(struct lu_device *d)
+{
+        /*
+         * XXX for now. Tags in lu_device_type->ldt_something are needed.
+         */
+        return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
+}
+
+static struct mdt_device *mdt_dev(struct lu_device *d)
+{
+        LASSERT(lu_device_is_mdt(d));
+        return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
+}
+
+static int mdt_connect(struct mdt_thread_info *info,
+                       struct ptlrpc_request *req, int offset)
+{
+        int result;
+
+        result = target_handle_connect(req, mdt_handle);
+        if (result == 0) {
+                struct obd_connect_data *data;
+
+                LASSERT(req->rq_export != NULL);
+                info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+
+                data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
+                result = seq_mgr_alloc(info->mti_ctxt,
+                                       info->mti_mdt->mdt_seq_mgr,
+                                       &data->ocd_seq);
+        }
+        return result;
+}
+
+static int mdt_disconnect(struct mdt_thread_info *info,
+                          struct ptlrpc_request *req, int offset)
+{
+        //return -EOPNOTSUPP;
+        return target_handle_disconnect(req);
 }
 
 static int mdt_getattr_name(struct mdt_thread_info *info,
@@ -127,14 +270,8 @@ static int mdt_getxattr(struct mdt_thread_info *info,
         return -EOPNOTSUPP;
 }
 
-static int mdt_statfs(struct mdt_thread_info *info,
-                      struct ptlrpc_request *req, int offset)
-{
-        return -EOPNOTSUPP;
-}
-
 static int mdt_readpage(struct mdt_thread_info *info,
-                       struct ptlrpc_request *req, int offset)
+                        struct ptlrpc_request *req, int offset)
 {
         return -EOPNOTSUPP;
 }
@@ -169,12 +306,6 @@ static int mdt_sync(struct mdt_thread_info *info,
         return -EOPNOTSUPP;
 }
 
-static int mdt_set_info(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
-{
-        return -EOPNOTSUPP;
-}
-
 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
                                  struct ptlrpc_request *req, int offset)
 {
@@ -187,25 +318,112 @@ static int mdt_handle_quotactl(struct mdt_thread_info *info,
         return -EOPNOTSUPP;
 }
 
+/*
+ * DLM handlers.
+ */
 
-int fid_lock(const struct ll_fid *f, struct lustre_handle *lh, ldlm_mode_t mode)
+static struct ldlm_callback_suite cbs = {
+        .lcs_completion = ldlm_server_completion_ast,
+        .lcs_blocking   = ldlm_server_blocking_ast,
+        .lcs_glimpse    = NULL
+};
+
+static int mdt_enqueue(struct mdt_thread_info *info,
+                       struct ptlrpc_request *req, int offset)
 {
-        return 0;
+        /*
+         * info->mti_dlm_req already contains swapped and (if necessary)
+         * converted dlm request.
+         */
+        LASSERT(info->mti_dlm_req != NULL);
+
+        info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
+        return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
 }
 
-void fid_unlock(const struct ll_fid *f,
-                struct lustre_handle *lh, ldlm_mode_t mode)
+static int mdt_convert(struct mdt_thread_info *info,
+                       struct ptlrpc_request *req, int offset)
 {
+        LASSERT(info->mti_dlm_req);
+        return ldlm_handle_convert0(req, info->mti_dlm_req);
 }
 
-static struct lu_device_operations mdt_lu_ops;
+static int mdt_bl_callback(struct mdt_thread_info *info,
+                           struct ptlrpc_request *req, int offset)
+{
+        CERROR("bl callbacks should not happen on MDS\n");
+        LBUG();
+        return -EOPNOTSUPP;
+}
 
-static int lu_device_is_mdt(struct lu_device *d)
+static int mdt_cp_callback(struct mdt_thread_info *info,
+                           struct ptlrpc_request *req, int offset)
 {
-        /*
-         * XXX for now. Tags in lu_device_type->ldt_something are needed.
-         */
-        return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
+        CERROR("cp callbacks should not happen on MDS\n");
+        LBUG();
+        return -EOPNOTSUPP;
+}
+
+/*
+ * Build (DLM) resource name from fid.
+ */
+struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
+                                       struct ldlm_res_id *name)
+{
+        memset(name, 0, sizeof *name);
+        /* we use fid_num() whoch includes also object version instread of raw
+         * fid_oid(). */
+        name->name[0] = fid_seq(f);
+        name->name[1] = fid_num(f);
+        return name;
+}
+
+/*
+ * Return true if resource is for object identified by fid.
+ */
+int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
+{
+        return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
+}
+
+/* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
+int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
+             struct lustre_handle *lh, ldlm_mode_t mode,
+             ldlm_policy_data_t *policy)
+{
+        struct ldlm_res_id res_id;
+        int flags = 0, rc;
+        ENTRY;
+
+        LASSERT(ns != NULL);
+        LASSERT(lh != NULL);
+        LASSERT(f != NULL);
+
+        /* FIXME: is that correct to have @flags=0 here? */
+        rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
+                              LDLM_IBITS, policy, mode, &flags,
+                              ldlm_blocking_ast, ldlm_completion_ast, NULL,
+                              NULL, NULL, 0, NULL, lh);
+        RETURN (rc == ELDLM_OK ? 0 : -EIO);
+}
+
+void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
+                struct lustre_handle *lh, ldlm_mode_t mode)
+{
+        struct ldlm_lock *lock;
+        ENTRY;
+
+        /* FIXME: this is debug stuff, remove it later. */
+        lock = ldlm_handle2lock(lh);
+        if (!lock) {
+                CERROR("invalid lock handle "LPX64, lh->cookie);
+                LBUG();
+        }
+
+        LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
+
+        ldlm_lock_decref(lh, mode);
+        EXIT;
 }
 
 static struct mdt_object *mdt_obj(struct lu_object *o)
@@ -214,55 +432,67 @@ static struct mdt_object *mdt_obj(struct lu_object *o)
         return container_of(o, struct mdt_object, mot_obj.mo_lu);
 }
 
-struct mdt_object *mdt_object_find(struct mdt_device *d, struct ll_fid *f)
+struct mdt_object *mdt_object_find(struct lu_context *ctxt,
+                                   struct mdt_device *d,
+                                   struct lu_fid *f)
 {
         struct lu_object *o;
 
-        o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
+        o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
         if (IS_ERR(o))
                 return (struct mdt_object *)o;
         else
                 return mdt_obj(o);
 }
 
-void mdt_object_put(struct mdt_object *o)
+void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
 {
-        lu_object_put(&o->mot_obj.mo_lu);
+        lu_object_put(ctxt, &o->mot_obj.mo_lu);
 }
 
-static struct ll_fid *mdt_object_fid(struct mdt_object *o)
+static struct lu_fid *mdt_object_fid(struct mdt_object *o)
 {
         return lu_object_fid(&o->mot_obj.mo_lu);
 }
 
-static int mdt_object_lock(struct mdt_object *o, struct mdt_lock_handle *lh)
+int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
+                    struct mdt_lock_handle *lh, __u64 ibits)
 {
+        ldlm_policy_data_t p = {
+                .l_inodebits = {
+                        .bits = ibits
+                }
+        };
         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
         LASSERT(lh->mlh_mode != LCK_MINMODE);
 
-        return fid_lock(mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
+        return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
 }
 
-static void mdt_object_unlock(struct mdt_object *o, struct mdt_lock_handle *lh)
+void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
+                              struct mdt_lock_handle *lh)
 {
         if (lustre_handle_is_used(&lh->mlh_lh)) {
-                fid_unlock(mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
+                fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
                 lh->mlh_lh.cookie = 0;
         }
 }
 
-struct mdt_object *mdt_object_find_lock(struct mdt_device *d, struct ll_fid *f,
-                                        struct mdt_lock_handle *lh)
+struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
+                                        struct mdt_device *d,
+                                        struct lu_fid *f,
+                                        struct mdt_lock_handle *lh,
+                                        __u64 ibits)
 {
         struct mdt_object *o;
 
-        o = mdt_object_find(d, f);
+        o = mdt_object_find(ctxt, d, f);
         if (!IS_ERR(o)) {
                 int result;
 
-                result = mdt_object_lock(o, lh);
+                result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
                 if (result != 0) {
-                        mdt_object_put(o);
+                        mdt_object_put(ctxt, o);
                         o = ERR_PTR(result);
                 }
         }
@@ -280,9 +510,14 @@ struct mdt_handler {
 
 enum mdt_handler_flags {
         /*
-         * struct mds_body is passed in the 0-th incoming buffer.
+         * struct mdt_body is passed in the 0-th incoming buffer.
          */
-        HABEO_CORPUS = (1 << 0)
+        HABEO_CORPUS = (1 << 0),
+        /*
+         * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
+         * incoming buffer.
+         */
+        HABEO_CLAVIS   = (1 << 1)
 };
 
 struct mdt_opc_slice {
@@ -293,7 +528,7 @@ struct mdt_opc_slice {
 
 static struct mdt_opc_slice mdt_handlers[];
 
-struct mdt_handler *mdt_handler_find(__u32 opc)
+static struct mdt_handler *mdt_handler_find(__u32 opc)
 {
         struct mdt_opc_slice *s;
         struct mdt_handler   *h;
@@ -317,6 +552,19 @@ static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
 }
 
+static int mdt_lock_resname_compat(struct mdt_device *m,
+                                   struct ldlm_request *req)
+{
+        /* XXX something... later. */
+        return 0;
+}
+
+static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
+{
+        /* XXX something... later. */
+        return 0;
+}
+
 /*
  * Invoke handler for this request opc. Also do necessary preprocessing
  * (according to handler ->mh_flags), and post-processing (setting of
@@ -341,19 +589,42 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
 
         off = MDS_REQ_REC_OFF + shift;
+
         result = 0;
         if (h->mh_flags & HABEO_CORPUS) {
-                info->mti_body = lustre_swab_reqbuf(req, off,
-                                                    sizeof *info->mti_body,
-                                                    lustre_swab_mds_body);
-                if (info->mti_body == NULL) {
+                struct mdt_body *body;
+
+                body = info->mti_body =
+                        lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
+                                           lustre_swab_mdt_body);
+                if (body != NULL) {
+                        info->mti_object = mdt_object_find(info->mti_ctxt,
+                                                           info->mti_mdt,
+                                                           &body->fid1);
+                        if (IS_ERR(info->mti_object)) {
+                                result = PTR_ERR(info->mti_object);
+                                info->mti_object = NULL;
+                        }
+                } else {
                         CERROR("Can't unpack body\n");
-                        result = req->rq_status = -EFAULT;
+                        result = -EFAULT;
+                }
+        } else if (h->mh_flags & HABEO_CLAVIS) {
+                struct ldlm_request *dlm;
+
+                LASSERT(shift == 0);
+                dlm = info->mti_dlm_req =
+                        lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
+                                           sizeof *dlm,
+                                           lustre_swab_ldlm_request);
+                if (dlm != NULL) {
+                        if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
+                                result = mdt_lock_resname_compat(info->mti_mdt,
+                                                                 dlm);
+                } else {
+                        CERROR("Can't unpack dlm request\n");
+                        result = -EFAULT;
                 }
-                info->mti_object = mdt_object_find(info->mti_mdt,
-                                                   &info->mti_body->fid1);
-                if (IS_ERR(info->mti_object))
-                        result = PTR_ERR(info->mti_object);
         }
         if (result == 0)
                 /*
@@ -371,6 +642,15 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
         LASSERT(current->journal_info == NULL);
 
+        if (h->mh_flags & HABEO_CLAVIS &&
+            info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
+                struct ldlm_reply *rep;
+
+                rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
+                if (rep != NULL)
+                        result = mdt_lock_reply_compat(info->mti_mdt, rep);
+        }
+
         /* If we're DISCONNECTing, the mds_export_data is already freed */
         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
@@ -394,7 +674,6 @@ static void mdt_thread_info_init(struct mdt_thread_info *info)
 {
         int i;
 
-        memset(info, 0, sizeof *info);
         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
         /*
          * Poison size array.
@@ -404,14 +683,16 @@ static void mdt_thread_info_init(struct mdt_thread_info *info)
         info->mti_rep_buf_nr = i;
         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
                 mdt_lock_handle_init(&info->mti_lh[i]);
+        lu_context_enter(info->mti_ctxt);
 }
 
 static void mdt_thread_info_fini(struct mdt_thread_info *info)
 {
         int i;
 
+        lu_context_exit(info->mti_ctxt);
         if (info->mti_object != NULL) {
-                mdt_object_put(info->mti_object);
+                mdt_object_put(info->mti_ctxt, info->mti_object);
                 info->mti_object = NULL;
         }
         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
@@ -516,8 +797,8 @@ static int mdt_filter_recovery_request(struct ptlrpc_request *req,
 
 /*
  * Handle recovery. Return:
- *       +ve: continue request processing;
- *       -ve: abort immediately with given (negated) error code;
+ *        +1: continue request processing;
+ *       -ve: abort immediately with the given error code;
  *         0: send reply with error code in req->rq_status;
  */
 static int mdt_recovery(struct ptlrpc_request *req)
@@ -599,8 +880,9 @@ static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
 
 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
 {
-        struct lustre_msg *msg;
-        int                result;
+        struct mdt_handler *h;
+        struct lustre_msg  *msg;
+        int                 result;
 
         ENTRY;
 
@@ -610,46 +892,48 @@ static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
 
         msg = req->rq_reqmsg;
         result = mds_msg_check_version(msg);
-        if (result != 0) {
-                CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
-                RETURN(result);
-        }
-
-        result = mdt_recovery(req);
-        if (result > 0) {
-                struct mdt_handler *h;
-
-                h = mdt_handler_find(msg->opc);
-                if (h != NULL)
-                        result = mdt_req_handle(info, h, req, 0);
-                else {
-                        req->rq_status = -ENOTSUPP;
-                        result = ptlrpc_error(req);
-                        RETURN(result);
+        if (result == 0) {
+                result = mdt_recovery(req);
+                switch (result) {
+                case +1:
+                        h = mdt_handler_find(msg->opc);
+                        if (h != NULL)
+                                result = mdt_req_handle(info, h, req, 0);
+                        else {
+                                req->rq_status = -ENOTSUPP;
+                                result = ptlrpc_error(req);
+                                break;
+                        }
+                        /* fall through */
+                case 0:
+                        result = mdt_reply(req, info);
                 }
-        } else if (result < 0)
-                RETURN(result);
-
-        RETURN(mdt_reply(req, info));
-}
-
-static struct mdt_device *mdt_dev(struct lu_device *d)
-{
-        LASSERT(lu_device_is_mdt(d));
-        return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
+        } else
+                CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
+        RETURN(result);
 }
 
 static int mdt_handle(struct ptlrpc_request *req)
 {
         int result;
+        struct lu_context      *ctx;
+        struct mdt_thread_info *info;
+        ENTRY;
 
-        struct mdt_thread_info info; /* XXX on stack for now */
-        mdt_thread_info_init(&info);
-        info.mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+        ctx = req->rq_svc_thread->t_ctx;
+        LASSERT(ctx != NULL);
+        LASSERT(ctx->lc_thread == req->rq_svc_thread);
 
-        result = mdt_handle0(req, &info);
+        info = lu_context_key_get(ctx, &mdt_thread_key);
+        LASSERT(info != NULL);
 
-        mdt_thread_info_fini(&info);
+        mdt_thread_info_init(info);
+        /* it can be NULL while CONNECT */
+        if (req->rq_export)
+                info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+
+        result = mdt_handle0(req, info);
+        mdt_thread_info_fini(info);
         return result;
 }
 
@@ -657,7 +941,8 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
                              struct ldlm_lock **lockp, void *req_cookie,
                              ldlm_mode_t mode, int flags, void *data)
 {
-        return ELDLM_LOCK_ABORTED;
+        ENTRY;
+        RETURN(ELDLM_LOCK_ABORTED);
 }
 
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
@@ -673,53 +958,129 @@ struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                prntfn, c->psc_num_threads);
 }
 
-int md_device_init(struct md_device *md, struct lu_device_type *t)
+static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
+                      const char *name, void *buf, int size, int mode)
+{
+        struct md_device *child = m->mdt_child;
+        ENTRY;
+        RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
+}
+
+static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
+                           int mode)
 {
-        return lu_device_init(&md->md_lu_dev, t);
+        struct mdt_device *m = opaque;
+        int rc;
+        ENTRY;
+
+        rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
+                        seq, sizeof(*seq),
+                        mode);
+        RETURN(rc);
 }
 
-void md_device_fini(struct md_device *md)
+static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
 {
-        lu_device_fini(&md->md_lu_dev);
+        ENTRY;
+        RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
 }
 
-static void mdt_fini(struct lu_device *d)
+static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
 {
-        struct mdt_device *m = mdt_dev(d);
+        ENTRY;
+        RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
+}
 
-        if (d->ld_site != NULL) {
-                lu_site_fini(d->ld_site);
-                d->ld_site = NULL;
+struct lu_seq_mgr_ops seq_mgr_ops = {
+        .smo_read  = mdt_seq_mgr_read,
+        .smo_write = mdt_seq_mgr_write
+};
+
+/* device init/fini methods */
+
+static int mdt_fld(struct mdt_thread_info *info,
+                   struct ptlrpc_request *req, int offset)
+{
+        struct lu_site *ls  = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
+        struct md_fld mf, *p, *reply;
+        int size = sizeof(*reply);
+        __u32 *opt;
+        int rc;
+        ENTRY;
+
+        rc = lustre_pack_reply(req, 1, &size, NULL);
+        if (rc)
+                RETURN(rc);
+
+        opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
+        p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
+        mf = *p;
+
+        rc = fld_handle(ls->ls_fld, *opt, &mf);
+        if (rc)
+                RETURN(rc);
+
+        reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+        *reply = mf;
+        RETURN(rc);
+}
+
+struct dt_device *md2_bottom_dev(struct mdt_device *m)
+{
+        /*FIXME: get dt device here*/
+        RETURN (NULL);
+}
+
+static int mdt_fld_init(struct mdt_device *m)
+{
+        struct dt_device *dt;
+        struct lu_site   *ls;
+        int rc;
+        ENTRY;
+
+        dt = md2_bottom_dev(m);
+
+        ls = m->mdt_md_dev.md_lu_dev.ld_site;
+
+        OBD_ALLOC_PTR(ls->ls_fld);
+
+        if (!ls->ls_fld)
+             RETURN(-ENOMEM);
+
+        rc = fld_server_init(ls->ls_fld, dt);
+
+        RETURN(rc);
+}
+
+static int mdt_fld_fini(struct mdt_device *m)
+{
+        struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+        int rc = 0;
+
+        if (ls && ls->ls_fld) {
+                fld_server_fini(ls->ls_fld);
+                OBD_FREE_PTR(ls->ls_fld);
         }
+        RETURN(rc);
+}
+
+static void mdt_stop_ptlrpc_service(struct mdt_device *m)
+{
         if (m->mdt_service != NULL) {
                 ptlrpc_unregister_service(m->mdt_service);
                 m->mdt_service = NULL;
         }
-        if (m->mdt_namespace != NULL) {
-                ldlm_namespace_free(m->mdt_namespace, 0);
-                m->mdt_namespace = NULL;
+        if (m->mdt_fld_service != NULL) {
+                ptlrpc_unregister_service(m->mdt_fld_service);
+                m->mdt_fld_service = NULL;
         }
-
-        LASSERT(atomic_read(&d->ld_ref) == 0);
-        md_device_fini(&m->mdt_md_dev);
 }
 
-static int mdt_init0(struct mdt_device *m,
-                     struct lu_device_type *t, struct lustre_cfg *cfg)
+static int mdt_start_ptlrpc_service(struct mdt_device *m)
 {
-        struct lu_site *s;
-        char   ns_name[48];
-
+        int rc;
         ENTRY;
 
-        OBD_ALLOC_PTR(s);
-        if (s == NULL)
-                return -ENOMEM;
-
-        md_device_init(&m->mdt_md_dev, t);
-
-        m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
-
         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
         m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
         m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
@@ -734,13 +1095,6 @@ static int mdt_init0(struct mdt_device *m,
         m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
                                                       MDT_MIN_THREADS),
                                                   MDT_MAX_THREADS);
-        lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
-
-        snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
-        m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
-        if (m->mdt_namespace == NULL)
-                return -ENOMEM;
-        ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
 
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "mdt_ldlm_client", &m->mdt_ldlm_client);
@@ -751,12 +1105,280 @@ static int mdt_init0(struct mdt_device *m,
                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
                                      NULL);
         if (m->mdt_service == NULL)
-                return -ENOMEM;
+                RETURN(-ENOMEM);
+
+        rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
+        if (rc)
+                GOTO(err_mdt_svc, rc);
+
+        /*start mdt fld service */
+
+        m->mdt_service_conf.psc_req_portal = MDS_FLD_PORTAL;
+
+        m->mdt_fld_service =
+                ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
+                                     LUSTRE_FLD0_NAME,
+                                     m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+                                     NULL);
+        if (m->mdt_fld_service == NULL)
+                RETURN(-ENOMEM);
+
+        rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, LUSTRE_FLD0_NAME);
+        if (rc)
+                GOTO(err_fld_svc, rc);
+
+        RETURN(rc);
+err_fld_svc:
+        ptlrpc_unregister_service(m->mdt_fld_service);
+        m->mdt_fld_service = NULL;
+err_mdt_svc:
+        ptlrpc_unregister_service(m->mdt_service);
+        m->mdt_service = NULL;
+
+        RETURN(rc);
+}
+
+static void mdt_stack_fini(struct mdt_device *m)
+{
+        struct lu_device *d = md2lu_dev(m->mdt_child);
+        /* goes through all stack */
+        while (d != NULL) {
+                struct lu_device *n;
+                struct obd_type *type;
+                struct lu_device_type *ldt = d->ld_type;
+                
+                lu_device_put(d);
+                
+                /* each fini() returns next device in stack of layers
+                 * * so we can avoid the recursion */
+                n = ldt->ldt_ops->ldto_device_fini(d);
+                ldt->ldt_ops->ldto_device_free(d);
+                
+                type = ldt->obd_type;
+                type->typ_refcnt--;
+                class_put_type(type);
+                /* switch to the next device in the layer */
+                d = n;
+        }
+}
+
+static struct lu_device *mdt_layer_setup(const char *typename,
+                                         struct lu_device *child,
+                                         struct lustre_cfg *cfg)
+{
+        struct obd_type       *type;
+        struct lu_device_type *ldt;
+        struct lu_device      *d;
+        int rc;
+
+        /* find the type */
+        type = class_get_type(typename);
+        if (!type) {
+                CERROR("Unknown type: '%s'\n", typename);
+                GOTO(out, rc = -ENODEV);
+        }
+
+        ldt = type->typ_lu;
+        ldt->obd_type = type;
+        if (ldt == NULL) {
+                CERROR("type: '%s'\n", typename);
+                GOTO(out_type, rc = -EINVAL);
+        }
+
+        d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
+        if (IS_ERR(d)) {
+                CERROR("Cannot allocate device: '%s'\n", typename);
+                GOTO(out_type, rc = -ENODEV);
+        }
+
+        LASSERT(child->ld_site);
+        d->ld_site = child->ld_site;
+
+        type->typ_refcnt++;
+        rc = ldt->ldt_ops->ldto_device_init(d, child);
+        if (rc) {
+                CERROR("can't init device '%s', rc %d\n", typename, rc);
+                GOTO(out_alloc, rc);
+        }
+        lu_device_get(d);
+
+        RETURN(d);
+out_alloc:
+        ldt->ldt_ops->ldto_device_free(d);
+        type->typ_refcnt--;
+out_type:
+        class_put_type(type);
+out:
+        RETURN(ERR_PTR(rc));
+}
+
+static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg)
+{
+        struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
+        struct lu_device  *tmp;
+        int rc;
+
+        /* init the stack */
+        tmp = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
+        if (IS_ERR(tmp)) {
+                RETURN (PTR_ERR(tmp));
+        }
+        d = tmp;
+        tmp = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
+        if (IS_ERR(tmp)) {
+                GOTO(out, rc = PTR_ERR(tmp));
+        }
+        d = tmp;
+        tmp = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
+        if (IS_ERR(tmp)) {
+                GOTO(out, rc = PTR_ERR(tmp));
+        }
+        d = tmp;
+        m->mdt_child = lu2md_dev(d);
+
+        /* process setup config */
+        tmp = &m->mdt_md_dev.md_lu_dev;
+        rc = tmp->ld_ops->ldo_process_config(tmp, cfg);
+        
+out:
+        /* fini from last known good lu_device */
+        if (rc)
+                mdt_stack_fini(d);
+        
+        return rc;
+}
+
+static void mdt_fini(struct mdt_device *m)
+{
+        struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
+
+        ENTRY;
+
+        mdt_stop_ptlrpc_service(m);
 
-        return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
+        /* finish the stack */
+        mdt_stack_fini(m);
+
+        if (d->ld_site != NULL) {
+                lu_site_fini(d->ld_site);
+                OBD_FREE_PTR(d->ld_site);
+                d->ld_site = NULL;
+        }
+        if (m->mdt_namespace != NULL) {
+                ldlm_namespace_free(m->mdt_namespace, 0);
+                m->mdt_namespace = NULL;
+        }
+
+        if (m->mdt_seq_mgr) {
+                seq_mgr_fini(m->mdt_seq_mgr);
+                m->mdt_seq_mgr = NULL;
+        }
+
+        LASSERT(atomic_read(&d->ld_ref) == 0);
+        md_device_fini(&m->mdt_md_dev);
+        EXIT;
+}
+
+static int mdt_init0(struct mdt_device *m,
+                     struct lu_device_type *t, struct lustre_cfg *cfg)
+{
+        int rc;
+        struct lu_site *s;
+        char   ns_name[48];
+        struct lu_context ctx;
+
+        ENTRY;
+
+        OBD_ALLOC_PTR(s);
+        if (s == NULL)
+                RETURN(-ENOMEM);
+
+        md_device_init(&m->mdt_md_dev, t);
+        m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
+
+        rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
+        if (rc) {
+                CERROR("can't init lu_site, rc %d\n", rc);
+                GOTO(err_fini_site, rc);
+        }
+
+        /* init the stack */
+        rc = mdt_stack_init(m, cfg);
+        if (rc) {
+                CERROR("can't init device stack, rc %d\n", rc);
+                GOTO(err_fini_site, rc);
+        }
+
+        m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
+        if (!m->mdt_seq_mgr) {
+                CERROR("can't initialize sequence manager\n");
+                GOTO(err_fini_stack, rc);
+        }
+
+        rc = lu_context_init(&ctx);
+        if (rc != 0)
+                GOTO(err_fini_mgr, rc);
+
+        lu_context_enter(&ctx);
+        /* init sequence info after device stack is initialized. */
+        rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
+        lu_context_exit(&ctx);
+        if (rc)
+                GOTO(err_fini_ctx, rc);
+
+        lu_context_fini(&ctx);
+
+        snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
+        m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+        if (m->mdt_namespace == NULL)
+                GOTO(err_fini_site, rc = -ENOMEM);
+
+        ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
+
+        rc = mdt_fld_init(m);
+        if (rc)
+                GOTO(err_free_ns, rc);
+
+        rc = mdt_start_ptlrpc_service(m);
+        if (rc)
+                GOTO(err_free_fld, rc);
+        RETURN(0);
+
+err_free_fld:
+        mdt_fld_fini(m);
+err_free_ns:
+        ldlm_namespace_free(m->mdt_namespace, 0);
+        m->mdt_namespace = NULL;
+err_fini_ctx:
+        lu_context_fini(&ctx);
+err_fini_mgr:
+        seq_mgr_fini(m->mdt_seq_mgr);
+        m->mdt_seq_mgr = NULL;
+err_fini_stack:
+        mdt_stack_fini(m);
+err_fini_site:
+        lu_site_fini(s);
+        OBD_FREE_PTR(s);
+        RETURN(rc);
+}
+/* used by MGS to process specific configurations */
+static int mdt_process_config(struct lu_device *d, struct lustre_cfg *cfg)
+{
+        struct lu_device *next = md2lu_dev(mdt_dev(d)->mdt_child);
+        int err;
+        ENTRY;
+        switch(cfg->lcfg_command) {
+                /* all MDT specific commands should be here */
+        default:
+                /* others are passed further */
+                err = next->ld_ops->ldo_process_config(next, cfg);
+        }
+out:
+        RETURN(err);
 }
 
-struct lu_object *mdt_object_alloc(struct lu_device *d)
+static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
+                                          struct lu_device *d)
 {
         struct mdt_object *mo;
 
@@ -769,21 +1391,20 @@ struct lu_object *mdt_object_alloc(struct lu_device *d)
                 h = &mo->mot_header;
                 lu_object_header_init(h);
                 lu_object_init(o, h, d);
-                /* ->lo_depth and ->lo_flags are automatically 0 */
                 lu_object_add_top(h, o);
                 return o;
         } else
                 return NULL;
 }
 
-int mdt_object_init(struct lu_object *o)
+static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
 {
         struct mdt_device *d = mdt_dev(o->lo_dev);
         struct lu_device  *under;
         struct lu_object  *below;
 
         under = &d->mdt_child->md_lu_dev;
-        below = under->ld_ops->ldo_object_alloc(under);
+        below = under->ld_ops->ldo_object_alloc(ctxt, under);
         if (below != NULL) {
                 lu_object_add(o, below);
                 return 0;
@@ -791,20 +1412,23 @@ int mdt_object_init(struct lu_object *o)
                 return -ENOMEM;
 }
 
-void mdt_object_free(struct lu_object *o)
+static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
 {
+        struct mdt_object *mo = mdt_obj(o);
         struct lu_object_header *h;
 
         h = o->lo_header;
         lu_object_fini(o);
         lu_object_header_fini(h);
+        OBD_FREE_PTR(mo);
 }
 
-void mdt_object_release(struct lu_object *o)
+static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
 {
 }
 
-int mdt_object_print(struct seq_file *f, const struct lu_object *o)
+static int mdt_object_print(struct lu_context *ctxt,
+                            struct seq_file *f, const struct lu_object *o)
 {
         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
 }
@@ -814,52 +1438,95 @@ static struct lu_device_operations mdt_lu_ops = {
         .ldo_object_init    = mdt_object_init,
         .ldo_object_free    = mdt_object_free,
         .ldo_object_release = mdt_object_release,
-        .ldo_object_print   = mdt_object_print
+        .ldo_object_print   = mdt_object_print,
+        .ldo_process_config = mdt_process_config
 };
 
-struct md_object *mdt_object_child(struct mdt_object *o)
+/* mds_connect copy */
+static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
+                           struct obd_uuid *cluuid,
+                           struct obd_connect_data *data)
 {
-        return lu2md(lu_object_next(&o->mot_obj.mo_lu));
-}
+        struct obd_export *exp;
+        int rc;
+        struct mdt_device *mdt;
+        struct mds_export_data *med;
+        struct mds_client_data *mcd = NULL;
+        ENTRY;
 
-static inline struct md_device_operations *mdt_child_ops(struct mdt_device *d)
-{
-        return d->mdt_child->md_ops;
+        if (!conn || !obd || !cluuid)
+                RETURN(-EINVAL);
+
+        mdt = mdt_dev(obd->obd_lu_dev);
+
+        rc = class_connect(conn, obd, cluuid);
+        if (rc)
+                RETURN(rc);
+
+        exp = class_conn2export(conn);
+        LASSERT(exp);
+        med = &exp->exp_mds_data;
+
+        OBD_ALLOC_PTR(mcd);
+        if (!mcd)
+                GOTO(out, rc = -ENOMEM);
+
+        memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
+        med->med_mcd = mcd;
+
+out:
+        if (rc) {
+                class_disconnect(exp);
+        } else {
+                class_export_put(exp);
+        }
+
+        RETURN(rc);
 }
 
-int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
-              struct ll_fid *pfid, const char *name, struct ll_fid *cfid)
+static int mdt_obd_disconnect(struct obd_export *exp)
 {
-        struct mdt_object      *o;
-        struct mdt_object      *child;
-        struct mdt_lock_handle *lh;
-
-        int result;
+        struct mds_export_data *med = &exp->exp_mds_data;
+        unsigned long irqflags;
+        int rc;
+        ENTRY;
 
-        (lh = &info->mti_lh[MDT_LH_PARENT])->mlh_mode = LCK_PW;
+        LASSERT(exp);
+        class_export_get(exp);
+
+        /* Disconnect early so that clients can't keep using export */
+        rc = class_disconnect(exp);
+        //ldlm_cancel_locks_for_export(exp);
+
+        /* complete all outstanding replies */
+        spin_lock_irqsave(&exp->exp_lock, irqflags);
+        while (!list_empty(&exp->exp_outstanding_replies)) {
+                struct ptlrpc_reply_state *rs =
+                        list_entry(exp->exp_outstanding_replies.next,
+                                   struct ptlrpc_reply_state, rs_exp_list);
+                struct ptlrpc_service *svc = rs->rs_service;
+
+                spin_lock(&svc->srv_lock);
+                list_del_init(&rs->rs_exp_list);
+                ptlrpc_schedule_difficult_reply(rs);
+                spin_unlock(&svc->srv_lock);
+        }
+        spin_unlock_irqrestore(&exp->exp_lock, irqflags);
 
-        o = mdt_object_find_lock(d, pfid, lh);
-        if (IS_ERR(o))
-                return PTR_ERR(o);
+        OBD_FREE_PTR(med->med_mcd);
 
-        child = mdt_object_find(d, cfid);
-        if (!IS_ERR(child)) {
-                result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
-                                                     mdt_object_child(child));
-                mdt_object_put(child);
-        } else
-                result = PTR_ERR(child);
-        mdt_object_unlock(o, lh);
-        mdt_object_put(o);
-        return result;
+        class_export_put(exp);
+        RETURN(rc);
 }
 
 static struct obd_ops mdt_obd_device_ops = {
-        .o_owner = THIS_MODULE
+        .o_owner = THIS_MODULE,
+        .o_connect = mdt_obd_connect,
+        .o_disconnect = mdt_obd_disconnect,
 };
 
-struct lu_device *mdt_device_alloc(struct lu_device_type *t,
-                                   struct lustre_cfg *cfg)
+static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
+                                          struct lustre_cfg *cfg)
 {
         struct lu_device  *l;
         struct mdt_device *m;
@@ -871,27 +1538,54 @@ struct lu_device *mdt_device_alloc(struct lu_device_type *t,
                 l = &m->mdt_md_dev.md_lu_dev;
                 result = mdt_init0(m, t, cfg);
                 if (result != 0) {
-                        mdt_fini(l);
-                        m = ERR_PTR(result);
+                        mdt_fini(m);
+                        return ERR_PTR(result);
                 }
+
         } else
                 l = ERR_PTR(-ENOMEM);
         return l;
 }
 
-void mdt_device_free(struct lu_device *m)
+static void mdt_device_free(struct lu_device *d)
 {
+        struct mdt_device *m = mdt_dev(d);
+
         mdt_fini(m);
         OBD_FREE_PTR(m);
 }
 
-int mdt_type_init(struct lu_device_type *t)
+static void *mdt_thread_init(struct lu_context *ctx)
 {
-        return 0;
+        struct mdt_thread_info *info;
+
+        OBD_ALLOC_PTR(info);
+        if (info != NULL)
+                info->mti_ctxt = ctx;
+        else
+                info = ERR_PTR(-ENOMEM);
+        return info;
+}
+
+static void mdt_thread_fini(struct lu_context *ctx, void *data)
+{
+        struct mdt_thread_info *info = data;
+        OBD_FREE_PTR(info);
+}
+
+static struct lu_context_key mdt_thread_key = {
+        .lct_init = mdt_thread_init,
+        .lct_fini = mdt_thread_fini
+};
+
+static int mdt_type_init(struct lu_device_type *t)
+{
+        return lu_context_key_register(&mdt_thread_key);
 }
 
-void mdt_type_fini(struct lu_device_type *t)
+static void mdt_type_fini(struct lu_device_type *t)
 {
+        lu_context_key_degister(&mdt_thread_key);
 }
 
 static struct lu_device_type_operations mdt_device_type_ops = {
@@ -908,11 +1602,11 @@ static struct lu_device_type mdt_device_type = {
         .ldt_ops  = &mdt_device_type_ops
 };
 
-struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
+static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
         { 0 }
 };
 
-struct lprocfs_vars lprocfs_mdt_module_vars[] = {
+static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
         { 0 }
 };
 
@@ -921,22 +1615,11 @@ LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
 static int __init mdt_mod_init(void)
 {
         struct lprocfs_static_vars lvars;
-        struct obd_type *type;
-        int result;
 
         mdt_num_threads = MDT_NUM_THREADS;
         lprocfs_init_vars(mdt, &lvars);
-        result = class_register_type(&mdt_obd_device_ops,
-                                     lvars.module_vars, LUSTRE_MDT0_NAME);
-        if (result == 0) {
-                type = class_get_type(LUSTRE_MDT0_NAME);
-                LASSERT(type != NULL);
-                type->typ_lu = &mdt_device_type;
-                result = type->typ_lu->ldt_ops->ldto_init(type->typ_lu);
-                if (result != 0)
-                        class_unregister_type(LUSTRE_MDT0_NAME);
-        }
-        return result;
+        return class_register_type(&mdt_obd_device_ops, lvars.module_vars,
+                                   LUSTRE_MDT0_NAME, &mdt_device_type);
 }
 
 static void __exit mdt_mod_exit(void)
@@ -945,16 +1628,17 @@ static void __exit mdt_mod_exit(void)
 }
 
 
-#define DEF_HNDL(prefix, base, flags, opc, fn)                        \
-[prefix ## _ ## opc - prefix ## _ ## base] = {                        \
-        .mh_name    = #opc,                                        \
-        .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## _NET,        \
-        .mh_opc     = prefix ## _  ## opc,                        \
-        .mh_flags   = flags,                                        \
-        .mh_act     = fn                                        \
+#define DEF_HNDL(prefix, base, suffix, flags, opc, fn)                  \
+[prefix ## _ ## opc - prefix ## _ ## base] = {                          \
+        .mh_name    = #opc,                                             \
+        .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
+        .mh_opc     = prefix ## _  ## opc,                              \
+        .mh_flags   = flags,                                            \
+        .mh_act     = fn                                                \
 }
 
-#define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn)
+#define DEF_MDT_HNDL(flags, name, fn)                   \
+        DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
 
 static struct mdt_handler mdt_mds_ops[] = {
         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
@@ -971,15 +1655,22 @@ static struct mdt_handler mdt_mds_ops[] = {
         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
-        DEF_MDT_HNDL(0,            SET_INFO,       mdt_set_info),
+        DEF_MDT_HNDL(0,            FLD,            mdt_fld),
         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
-        DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl),
+        DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
 };
 
 static struct mdt_handler mdt_obd_ops[] = {
 };
 
+#define DEF_DLM_HNDL(flags, name, fn)                   \
+        DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
+
 static struct mdt_handler mdt_dlm_ops[] = {
+        DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
+        DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT,        mdt_convert),
+        DEF_DLM_HNDL(0,            BL_CALLBACK,    mdt_bl_callback),
+        DEF_DLM_HNDL(0,            CP_CALLBACK,    mdt_cp_callback)
 };
 
 static struct mdt_handler mdt_llog_ops[] = {
@@ -1018,4 +1709,4 @@ MODULE_LICENSE("GPL");
 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
                 "number of mdt service threads to start");
 
-cfs_module(mdt, "0.0.2", mdt_mod_init, mdt_mod_exit);
+cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);