Whamcloud - gitweb
add process_config, fix issue with stack_init/fini (thx Huang Hua) and remove
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 6584f8f..b4df1e2 100644 (file)
  */
 unsigned long mdt_num_threads;
 
-static int mdt_handle(struct ptlrpc_request *req);
-static struct mdt_device *mdt_dev(struct lu_device *d);
+static int                mdt_handle    (struct ptlrpc_request *req);
+static struct mdt_device *mdt_dev       (struct lu_device *d);
+static struct lu_fid     *mdt_object_fid(struct mdt_object *o);
 
 static struct lu_context_key mdt_thread_key;
 
 /* object operations */
-#if 0
 static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
-                        struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
+                        struct lu_fid *pfid, const char *name,
+                        struct lu_fid *cfid)
 {
         struct mdt_object      *o;
         struct mdt_object      *child;
@@ -79,40 +80,23 @@ static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
         lh = &info->mti_lh[MDT_LH_PARENT];
         lh->mlh_mode = LCK_PW;
 
-        o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
+        o = mdt_object_find_lock(info->mti_ctxt,
+                                 d, pfid, lh, MDS_INODELOCK_UPDATE);
         if (IS_ERR(o))
                 return PTR_ERR(o);
 
-        child = mdt_object_find(d, cfid);
+        child = mdt_object_find(info->mti_ctxt, d, cfid);
         if (!IS_ERR(child)) {
                 struct md_object *next = mdt_object_child(o);
 
                 result = next->mo_ops->moo_mkdir(info->mti_ctxt, next, name,
                                                  mdt_object_child(child));
-                mdt_object_put(child);
+                mdt_object_put(info->mti_ctxt, child);
         } else
                 result = PTR_ERR(child);
         mdt_object_unlock(d->mdt_namespace, o, lh);
-        mdt_object_put(o);
-        return result;
-}
-#endif
-static int mdt_md_getattr(struct mdt_thread_info *info, struct lu_fid *fid)
-{
-        struct mdt_device *d = info->mti_mdt;
-        struct mdt_object *o;
-        int               result;
-
-        ENTRY;
-
-        o = mdt_object_find(info->mti_ctxt, d, fid);
-        if (IS_ERR(o))
-                return PTR_ERR(o);
-        /* attr are in mti_ctxt */
-        result = 0;
         mdt_object_put(info->mti_ctxt, o);
-
-        RETURN(result);
+        return result;
 }
 
 static int mdt_getstatus(struct mdt_thread_info *info,
@@ -196,108 +180,35 @@ static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
 static int mdt_getattr(struct mdt_thread_info *info,
                        struct ptlrpc_request *req, int offset)
 {
-        struct mdt_body        *body;
-        int                    size = sizeof (*body);
-        struct lu_attr  *attr;
-        int result;
+        struct mdt_body *body;
+        int              size = sizeof (*body);
+        int              result;
 
-        ENTRY;
+        LASSERT(info->mti_object != NULL);
 
-        OBD_ALLOC_PTR(attr);
-        if (attr == NULL)
-                return -ENOMEM;
+        ENTRY;
 
         result = lustre_pack_reply(req, 1, &size, NULL);
         if (result)
-                CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
-                       size);
+                CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
+                       size, result);
         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
                 result = -ENOMEM;
         } else {
-                body = lustre_msg_buf(req->rq_repmsg, 0, size);
-                result = mdt_md_getattr(info, &body->fid1);
-                if (result == 0)
+                struct md_object *next = mdt_object_child(info->mti_object);
+
+                result = next->mo_ops->moo_attr_get(info->mti_ctxt, next,
+                                                    &info->mti_ctxt->lc_attr);
+                if (result == 0) {
+                        body = lustre_msg_buf(req->rq_repmsg, 0, size);
                         mdt_pack_attr2body(body, &info->mti_ctxt->lc_attr);
+                        body->fid1 = *mdt_object_fid(info->mti_object);
+                }
         }
-        OBD_FREE_PTR(attr);
         RETURN(result);
 }
 
-static int mdt_set_info(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
-{
-        struct md_device *next  = info->mti_mdt->mdt_child;
-        char *key;
-        int keylen, rc = 0;
-        ENTRY;
-
-        key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
-        if (key == NULL) {
-                DEBUG_REQ(D_HA, req, "no set_info key");
-                RETURN(-EFAULT);
-        }
-        keylen = req->rq_reqmsg->buflens[0];
-
-        if (((keylen >= strlen("fld_create") &&
-            memcmp(key, "fld_create", keylen) == 0)) ||
-            ((keylen >= strlen("fld_delete") &&
-            memcmp(key, "fld_delete", keylen) == 0))) {
-                struct md_fld mf, *p;
-                __u32 size = sizeof(struct md_fld);
-
-                rc = lustre_pack_reply(req, 0, NULL, NULL);
-                if (rc)
-                        RETURN(rc);
-
-                p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
-                mf = *p;
-                rc = next->md_ops->mdo_get_info(info->mti_ctxt, next, keylen,
-                                                key, &size, &mf);
-                RETURN(rc);
-        }
-
-        CDEBUG(D_IOCTL, "invalid key\n");
-        RETURN(-EINVAL);
-
-}
-
-static int mdt_get_info(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
-{
-        struct md_device *next  = info->mti_mdt->mdt_child;
-        char *key;
-        int keylen, rc = 0;
-        ENTRY;
-
-        key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
-        if (key == NULL) {
-                DEBUG_REQ(D_HA, req, "no set_info key");
-                RETURN(-EFAULT);
-        }
-        keylen = req->rq_reqmsg->buflens[0];
-
-        if (((keylen >= strlen("fld_get") &&
-            memcmp(key, "fld_get", keylen) == 0))) {
-                struct md_fld mf, *p, *reply;
-                int size = sizeof(*reply);
-
-                rc = lustre_pack_reply(req, 1, &size, NULL);
-                if (rc)
-                        RETURN(rc);
-                p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
-                mf = *p;
-                rc = next->md_ops->mdo_get_info(info->mti_ctxt, next, keylen,
-                                                key, &size, &mf);
-                reply = lustre_msg_buf(req->rq_repmsg, 0, size);
-                *reply = mf;
-                RETURN(rc);
-        }
-
-        CDEBUG(D_IOCTL, "invalid key\n");
-        RETURN(-EINVAL);
-}
-
 static struct lu_device_operations mdt_lu_ops;
 
 static int lu_device_is_mdt(struct lu_device *d)
@@ -337,7 +248,8 @@ static int mdt_connect(struct mdt_thread_info *info,
 static int mdt_disconnect(struct mdt_thread_info *info,
                           struct ptlrpc_request *req, int offset)
 {
-        return -EOPNOTSUPP;
+        //return -EOPNOTSUPP;
+        return target_handle_disconnect(req);
 }
 
 static int mdt_getattr_name(struct mdt_thread_info *info,
@@ -423,7 +335,7 @@ static int mdt_enqueue(struct mdt_thread_info *info,
          * info->mti_dlm_req already contains swapped and (if necessary)
          * converted dlm request.
          */
-        LASSERT(info->mti_dlm_req);
+        LASSERT(info->mti_dlm_req != NULL);
 
         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
         return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
@@ -538,7 +450,7 @@ void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
         lu_object_put(ctxt, &o->mot_obj.mo_lu);
 }
 
-struct lu_fid *mdt_object_fid(struct mdt_object *o)
+static struct lu_fid *mdt_object_fid(struct mdt_object *o)
 {
         return lu_object_fid(&o->mot_obj.mo_lu);
 }
@@ -664,7 +576,6 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 {
         int result;
         int off;
-        int lock_conv;
 
         ENTRY;
 
@@ -678,9 +589,6 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
 
         off = MDS_REQ_REC_OFF + shift;
-        lock_conv =
-                h->mh_flags & HABEO_CLAVIS &&
-                info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME;
 
         result = 0;
         if (h->mh_flags & HABEO_CORPUS) {
@@ -693,13 +601,15 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                         info->mti_object = mdt_object_find(info->mti_ctxt,
                                                            info->mti_mdt,
                                                            &body->fid1);
-                        if (IS_ERR(info->mti_object))
+                        if (IS_ERR(info->mti_object)) {
                                 result = PTR_ERR(info->mti_object);
+                                info->mti_object = NULL;
+                        }
                 } else {
                         CERROR("Can't unpack body\n");
                         result = -EFAULT;
                 }
-        } else if (lock_conv) {
+        } else if (h->mh_flags & HABEO_CLAVIS) {
                 struct ldlm_request *dlm;
 
                 LASSERT(shift == 0);
@@ -707,9 +617,11 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                         lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
                                            sizeof *dlm,
                                            lustre_swab_ldlm_request);
-                if (dlm != NULL)
-                        result = mdt_lock_resname_compat(info->mti_mdt, dlm);
-                else {
+                if (dlm != NULL) {
+                        if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
+                                result = mdt_lock_resname_compat(info->mti_mdt,
+                                                                 dlm);
+                } else {
                         CERROR("Can't unpack dlm request\n");
                         result = -EFAULT;
                 }
@@ -730,7 +642,8 @@ static int mdt_req_handle(struct mdt_thread_info *info,
 
         LASSERT(current->journal_info == NULL);
 
-        if (lock_conv) {
+        if (h->mh_flags & HABEO_CLAVIS &&
+            info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
                 struct ldlm_reply *rep;
 
                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
@@ -1083,28 +996,278 @@ struct lu_seq_mgr_ops seq_mgr_ops = {
         .smo_write = mdt_seq_mgr_write
 };
 
+/* device init/fini methods */
+
+static int mdt_fld(struct mdt_thread_info *info,
+                   struct ptlrpc_request *req, int offset)
+{
+        struct lu_site *ls  = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
+        struct md_fld mf, *p, *reply;
+        int size = sizeof(*reply);
+        __u32 *opt;
+        int rc;
+        ENTRY;
+
+        rc = lustre_pack_reply(req, 1, &size, NULL);
+        if (rc)
+                RETURN(rc);
+
+        opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
+        p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
+        mf = *p;
+
+        rc = fld_handle(ls->ls_fld, *opt, &mf);
+        if (rc)
+                RETURN(rc);
+
+        reply = lustre_msg_buf(req->rq_repmsg, 0, size);
+        *reply = mf;
+        RETURN(rc);
+}
+
+struct dt_device *md2_bottom_dev(struct mdt_device *m)
+{
+        /*FIXME: get dt device here*/
+        RETURN (NULL);
+}
+
+static int mdt_fld_init(struct mdt_device *m)
+{
+        struct dt_device *dt;
+        struct lu_site   *ls;
+        int rc;
+        ENTRY;
+
+        dt = md2_bottom_dev(m);
+
+        ls = m->mdt_md_dev.md_lu_dev.ld_site;
+
+        OBD_ALLOC_PTR(ls->ls_fld);
+
+        if (!ls->ls_fld)
+             RETURN(-ENOMEM);
+
+        rc = fld_server_init(ls->ls_fld, dt);
+
+        RETURN(rc);
+}
+
+static int mdt_fld_fini(struct mdt_device *m)
+{
+        struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+        int rc = 0;
+
+        if (ls && ls->ls_fld) {
+                fld_server_fini(ls->ls_fld);
+                OBD_FREE_PTR(ls->ls_fld);
+        }
+        RETURN(rc);
+}
+
+static void mdt_stop_ptlrpc_service(struct mdt_device *m)
+{
+        if (m->mdt_service != NULL) {
+                ptlrpc_unregister_service(m->mdt_service);
+                m->mdt_service = NULL;
+        }
+        if (m->mdt_fld_service != NULL) {
+                ptlrpc_unregister_service(m->mdt_fld_service);
+                m->mdt_fld_service = NULL;
+        }
+}
+
+static int mdt_start_ptlrpc_service(struct mdt_device *m)
+{
+        int rc;
+        ENTRY;
+
+        m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
+        m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
+        m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
+        m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
+        m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
+        m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
+        m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
+        /*
+         * We'd like to have a mechanism to set this on a per-device basis,
+         * but alas...
+         */
+        m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
+                                                      MDT_MIN_THREADS),
+                                                  MDT_MAX_THREADS);
+
+        ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+                           "mdt_ldlm_client", &m->mdt_ldlm_client);
+
+        m->mdt_service =
+                ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
+                                     LUSTRE_MDT0_NAME,
+                                     m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+                                     NULL);
+        if (m->mdt_service == NULL)
+                RETURN(-ENOMEM);
+
+        rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
+        if (rc)
+                GOTO(err_mdt_svc, rc);
+
+        /*start mdt fld service */
+
+        m->mdt_service_conf.psc_req_portal = MDS_FLD_PORTAL;
+
+        m->mdt_fld_service =
+                ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
+                                     LUSTRE_FLD0_NAME,
+                                     m->mdt_md_dev.md_lu_dev.ld_proc_entry,
+                                     NULL);
+        if (m->mdt_fld_service == NULL)
+                RETURN(-ENOMEM);
+
+        rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, LUSTRE_FLD0_NAME);
+        if (rc)
+                GOTO(err_fld_svc, rc);
+
+        RETURN(rc);
+err_fld_svc:
+        ptlrpc_unregister_service(m->mdt_fld_service);
+        m->mdt_fld_service = NULL;
+err_mdt_svc:
+        ptlrpc_unregister_service(m->mdt_service);
+        m->mdt_service = NULL;
+
+        RETURN(rc);
+}
+
+static void mdt_stack_fini(struct mdt_device *m)
+{
+        struct lu_device *d = md2lu_dev(m->mdt_child);
+        /* goes through all stack */
+        while (d != NULL) {
+                struct lu_device *n;
+                struct obd_type *type;
+                struct lu_device_type *ldt = d->ld_type;
+                
+                lu_device_put(d);
+                
+                /* each fini() returns next device in stack of layers
+                 * * so we can avoid the recursion */
+                n = ldt->ldt_ops->ldto_device_fini(d);
+                ldt->ldt_ops->ldto_device_free(d);
+                
+                type = ldt->obd_type;
+                type->typ_refcnt--;
+                class_put_type(type);
+                /* switch to the next device in the layer */
+                d = n;
+        }
+}
+
+static struct lu_device *mdt_layer_setup(const char *typename,
+                                         struct lu_device *child,
+                                         struct lustre_cfg *cfg)
+{
+        struct obd_type       *type;
+        struct lu_device_type *ldt;
+        struct lu_device      *d;
+        int rc;
+
+        /* find the type */
+        type = class_get_type(typename);
+        if (!type) {
+                CERROR("Unknown type: '%s'\n", typename);
+                GOTO(out, rc = -ENODEV);
+        }
+
+        ldt = type->typ_lu;
+        ldt->obd_type = type;
+        if (ldt == NULL) {
+                CERROR("type: '%s'\n", typename);
+                GOTO(out_type, rc = -EINVAL);
+        }
+
+        d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
+        if (IS_ERR(d)) {
+                CERROR("Cannot allocate device: '%s'\n", typename);
+                GOTO(out_type, rc = -ENODEV);
+        }
+
+        LASSERT(child->ld_site);
+        d->ld_site = child->ld_site;
+
+        type->typ_refcnt++;
+        rc = ldt->ldt_ops->ldto_device_init(d, child);
+        if (rc) {
+                CERROR("can't init device '%s', rc %d\n", typename, rc);
+                GOTO(out_alloc, rc);
+        }
+        lu_device_get(d);
+
+        RETURN(d);
+out_alloc:
+        ldt->ldt_ops->ldto_device_free(d);
+        type->typ_refcnt--;
+out_type:
+        class_put_type(type);
+out:
+        RETURN(ERR_PTR(rc));
+}
+
+static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg)
+{
+        struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
+        struct lu_device  *tmp;
+        int rc;
+
+        /* init the stack */
+        tmp = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
+        if (IS_ERR(tmp)) {
+                RETURN (PTR_ERR(tmp));
+        }
+        d = tmp;
+        tmp = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
+        if (IS_ERR(tmp)) {
+                GOTO(out, rc = PTR_ERR(tmp));
+        }
+        d = tmp;
+        tmp = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
+        if (IS_ERR(tmp)) {
+                GOTO(out, rc = PTR_ERR(tmp));
+        }
+        d = tmp;
+        m->mdt_child = lu2md_dev(d);
+
+        /* process setup config */
+        tmp = &m->mdt_md_dev.md_lu_dev;
+        rc = tmp->ld_ops->ldo_process_config(tmp, cfg);
+        
+out:
+        /* fini from last known good lu_device */
+        if (rc)
+                mdt_stack_fini(d);
+        
+        return rc;
+}
+
 static void mdt_fini(struct mdt_device *m)
 {
         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
 
+        ENTRY;
+
+        mdt_stop_ptlrpc_service(m);
+
+        /* finish the stack */
+        mdt_stack_fini(m);
+
         if (d->ld_site != NULL) {
                 lu_site_fini(d->ld_site);
                 OBD_FREE_PTR(d->ld_site);
                 d->ld_site = NULL;
         }
-        if (m->mdt_service != NULL) {
-                ptlrpc_unregister_service(m->mdt_service);
-                m->mdt_service = NULL;
-        }
         if (m->mdt_namespace != NULL) {
                 ldlm_namespace_free(m->mdt_namespace, 0);
                 m->mdt_namespace = NULL;
         }
-        /* finish the stack */
-        if (m->mdt_child) {
-                struct lu_device *child = md2lu_dev(m->mdt_child);
-                child->ld_type->ldt_ops->ldto_device_fini(child);
-        }
 
         if (m->mdt_seq_mgr) {
                 seq_mgr_fini(m->mdt_seq_mgr);
@@ -1113,6 +1276,7 @@ static void mdt_fini(struct mdt_device *m)
 
         LASSERT(atomic_read(&d->ld_ref) == 0);
         md_device_fini(&m->mdt_md_dev);
+        EXIT;
 }
 
 static int mdt_init0(struct mdt_device *m,
@@ -1121,77 +1285,34 @@ static int mdt_init0(struct mdt_device *m,
         int rc;
         struct lu_site *s;
         char   ns_name[48];
-        struct obd_device *obd;
-        struct lu_device  *mdt_child;
-        const char *top   = lustre_cfg_string(cfg, 0);
-        const char *child = lustre_cfg_string(cfg, 1);
         struct lu_context ctx;
 
         ENTRY;
 
-        /* get next layer */
-        obd = class_name2obd((char *)child);
-        if (obd && obd->obd_lu_dev) {
-                CDEBUG(D_INFO, "Child device is %s\n", child);
-                m->mdt_child = lu2md_dev(obd->obd_lu_dev);
-                mdt_child = md2lu_dev(m->mdt_child);
-        } else {
-                CDEBUG(D_INFO, "Child device %s is not found\n", child);
-                RETURN(-EINVAL);
-        }
-
         OBD_ALLOC_PTR(s);
         if (s == NULL)
                 RETURN(-ENOMEM);
 
         md_device_init(&m->mdt_md_dev, t);
         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
-        lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
-
-        m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
-        m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
-        m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
-        m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
-        m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
-        m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
-        m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
-        /*
-         * We'd like to have a mechanism to set this on a per-device basis,
-         * but alas...
-         */
-        m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
-                                                      MDT_MIN_THREADS),
-                                                  MDT_MAX_THREADS);
-        snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
-        m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
-        if (m->mdt_namespace == NULL)
-                GOTO(err_fini_site, rc = -ENOMEM);
 
-        ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
-
-        ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
-                           "mdt_ldlm_client", &m->mdt_ldlm_client);
-
-        m->mdt_service =
-                ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
-                                     LUSTRE_MDT0_NAME,
-                                     m->mdt_md_dev.md_lu_dev.ld_proc_entry,
-                                     NULL);
-        if (m->mdt_service == NULL)
-                GOTO(err_free_ns, rc = -ENOMEM);
+        rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
+        if (rc) {
+                CERROR("can't init lu_site, rc %d\n", rc);
+                GOTO(err_fini_site, rc);
+        }
 
         /* init the stack */
-        LASSERT(mdt_child->ld_type->ldt_ops->ldto_device_init != NULL);
-        rc = mdt_child->ld_type->ldt_ops->ldto_device_init(mdt_child, top);
+        rc = mdt_stack_init(m, cfg);
         if (rc) {
                 CERROR("can't init device stack, rc %d\n", rc);
-                GOTO(err_free_svc, rc);
+                GOTO(err_fini_site, rc);
         }
 
         m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
         if (!m->mdt_seq_mgr) {
                 CERROR("can't initialize sequence manager\n");
-                GOTO(err_fini_child, rc);
+                GOTO(err_fini_stack, rc);
         }
 
         rc = lu_context_init(&ctx);
@@ -1205,31 +1326,56 @@ static int mdt_init0(struct mdt_device *m,
         if (rc)
                 GOTO(err_fini_ctx, rc);
 
-        rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
+        lu_context_fini(&ctx);
+
+        snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
+        m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+        if (m->mdt_namespace == NULL)
+                GOTO(err_fini_site, rc = -ENOMEM);
+
+        ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
+
+        rc = mdt_fld_init(m);
         if (rc)
-                GOTO(err_fini_ctx, rc);
+                GOTO(err_free_ns, rc);
 
-        lu_context_fini(&ctx);
+        rc = mdt_start_ptlrpc_service(m);
+        if (rc)
+                GOTO(err_free_fld, rc);
         RETURN(0);
 
+err_free_fld:
+        mdt_fld_fini(m);
+err_free_ns:
+        ldlm_namespace_free(m->mdt_namespace, 0);
+        m->mdt_namespace = NULL;
 err_fini_ctx:
         lu_context_fini(&ctx);
 err_fini_mgr:
         seq_mgr_fini(m->mdt_seq_mgr);
         m->mdt_seq_mgr = NULL;
-err_fini_child:
-        mdt_child->ld_type->ldt_ops->ldto_device_fini(mdt_child);
-err_free_svc:
-        ptlrpc_unregister_service(m->mdt_service);
-        m->mdt_service = NULL;
-err_free_ns:
-        ldlm_namespace_free(m->mdt_namespace, 0);
-        m->mdt_namespace = NULL;
+err_fini_stack:
+        mdt_stack_fini(m);
 err_fini_site:
         lu_site_fini(s);
         OBD_FREE_PTR(s);
         RETURN(rc);
 }
+/* used by MGS to process specific configurations */
+static int mdt_process_config(struct lu_device *d, struct lustre_cfg *cfg)
+{
+        struct lu_device *next = md2lu_dev(mdt_dev(d)->mdt_child);
+        int err;
+        ENTRY;
+        switch(cfg->lcfg_command) {
+                /* all MDT specific commands should be here */
+        default:
+                /* others are passed further */
+                err = next->ld_ops->ldo_process_config(next, cfg);
+        }
+out:
+        RETURN(err);
+}
 
 static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
                                           struct lu_device *d)
@@ -1268,11 +1414,13 @@ static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
 
 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
 {
+        struct mdt_object *mo = mdt_obj(o);
         struct lu_object_header *h;
 
         h = o->lo_header;
         lu_object_fini(o);
         lu_object_header_fini(h);
+        OBD_FREE_PTR(mo);
 }
 
 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
@@ -1290,7 +1438,8 @@ static struct lu_device_operations mdt_lu_ops = {
         .ldo_object_init    = mdt_object_init,
         .ldo_object_free    = mdt_object_free,
         .ldo_object_release = mdt_object_release,
-        .ldo_object_print   = mdt_object_print
+        .ldo_object_print   = mdt_object_print,
+        .ldo_process_config = mdt_process_config
 };
 
 /* mds_connect copy */
@@ -1299,11 +1448,10 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
                            struct obd_connect_data *data)
 {
         struct obd_export *exp;
-        int rc, abort_recovery;
+        int rc;
         struct mdt_device *mdt;
         struct mds_export_data *med;
         struct mds_client_data *mcd = NULL;
-
         ENTRY;
 
         if (!conn || !obd || !cluuid)
@@ -1311,31 +1459,15 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         mdt = mdt_dev(obd->obd_lu_dev);
 
-        /* Check for aborted recovery. */
-        spin_lock_bh(&obd->obd_processing_task_lock);
-        abort_recovery = obd->obd_abort_recovery;
-        spin_unlock_bh(&obd->obd_processing_task_lock);
-        if (abort_recovery)
-                target_abort_recovery(obd);
-
-        /* XXX There is a small race between checking the list and adding a
-         * new connection for the same UUID, but the real threat (list
-         * corruption when multiple different clients connect) is solved.
-         *
-         * There is a second race between adding the export to the list,
-         * and filling in the client data below.  Hence skipping the case
-         * of NULL mcd above.  We should already be controlling multiple
-         * connects at the client, and we can't hold the spinlock over
-         * memory allocations without risk of deadlocking.
-         */
         rc = class_connect(conn, obd, cluuid);
         if (rc)
                 RETURN(rc);
+
         exp = class_conn2export(conn);
         LASSERT(exp);
         med = &exp->exp_mds_data;
 
-        OBD_ALLOC(mcd, sizeof(*mcd));
+        OBD_ALLOC_PTR(mcd);
         if (!mcd)
                 GOTO(out, rc = -ENOMEM);
 
@@ -1344,10 +1476,6 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
 
 out:
         if (rc) {
-                if (mcd) {
-                        OBD_FREE(mcd, sizeof(*mcd));
-                        med->med_mcd = NULL;
-                }
                 class_disconnect(exp);
         } else {
                 class_export_put(exp);
@@ -1356,9 +1484,45 @@ out:
         RETURN(rc);
 }
 
+static int mdt_obd_disconnect(struct obd_export *exp)
+{
+        struct mds_export_data *med = &exp->exp_mds_data;
+        unsigned long irqflags;
+        int rc;
+        ENTRY;
+
+        LASSERT(exp);
+        class_export_get(exp);
+
+        /* Disconnect early so that clients can't keep using export */
+        rc = class_disconnect(exp);
+        //ldlm_cancel_locks_for_export(exp);
+
+        /* complete all outstanding replies */
+        spin_lock_irqsave(&exp->exp_lock, irqflags);
+        while (!list_empty(&exp->exp_outstanding_replies)) {
+                struct ptlrpc_reply_state *rs =
+                        list_entry(exp->exp_outstanding_replies.next,
+                                   struct ptlrpc_reply_state, rs_exp_list);
+                struct ptlrpc_service *svc = rs->rs_service;
+
+                spin_lock(&svc->srv_lock);
+                list_del_init(&rs->rs_exp_list);
+                ptlrpc_schedule_difficult_reply(rs);
+                spin_unlock(&svc->srv_lock);
+        }
+        spin_unlock_irqrestore(&exp->exp_lock, irqflags);
+
+        OBD_FREE_PTR(med->med_mcd);
+
+        class_export_put(exp);
+        RETURN(rc);
+}
+
 static struct obd_ops mdt_obd_device_ops = {
         .o_owner = THIS_MODULE,
-        .o_connect = mdt_obd_connect
+        .o_connect = mdt_obd_connect,
+        .o_disconnect = mdt_obd_disconnect,
 };
 
 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
@@ -1491,8 +1655,7 @@ static struct mdt_handler mdt_mds_ops[] = {
         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
-        DEF_MDT_HNDL(0,            SET_INFO,       mdt_set_info),
-        DEF_MDT_HNDL(0,            GET_INFO,       mdt_get_info),
+        DEF_MDT_HNDL(0,            FLD,            mdt_fld),
         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
         DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
 };