Whamcloud - gitweb
LU-1095 debug: no console message for long symlink
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 0a18671..671ac7a 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -48,9 +46,6 @@
  * Author: Yury Umanets <umka@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
@@ -102,8 +97,6 @@ ldlm_mode_t mdt_dlm_lock_modes[] = {
  * Initialized in mdt_mod_init().
  */
 static unsigned long mdt_num_threads;
-static unsigned long mdt_min_threads;
-static unsigned long mdt_max_threads;
 
 /* ptlrpc request handler for MDT. All handlers are
  * grouped into several slices - struct mdt_opc_slice,
@@ -343,20 +336,20 @@ static int mdt_getstatus(struct mdt_thread_info *info)
 
 static int mdt_statfs(struct mdt_thread_info *info)
 {
-        struct ptlrpc_request *req = mdt_info_req(info);
-        struct md_device      *next  = info->mti_mdt->mdt_child;
-        struct ptlrpc_service *svc;
-        struct obd_statfs     *osfs;
-        int                    rc;
+       struct ptlrpc_request           *req = mdt_info_req(info);
+       struct md_device                *next = info->mti_mdt->mdt_child;
+       struct ptlrpc_service_part      *svcpt;
+       struct obd_statfs               *osfs;
+       int                             rc;
 
-        ENTRY;
+       ENTRY;
 
-        svc = info->mti_pill->rc_req->rq_rqbd->rqbd_service;
+       svcpt = info->mti_pill->rc_req->rq_rqbd->rqbd_svcpt;
 
-        /* This will trigger a watchdog timeout */
-        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
-                         (MDT_SERVICE_WATCHDOG_FACTOR *
-                          at_get(&svc->srv_at_estimate)) + 1);
+       /* This will trigger a watchdog timeout */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
+                        (MDT_SERVICE_WATCHDOG_FACTOR *
+                         at_get(&svcpt->scp_at_estimate)) + 1);
 
         rc = mdt_check_ucred(info);
         if (rc)
@@ -366,13 +359,11 @@ static int mdt_statfs(struct mdt_thread_info *info)
                 rc = err_serious(-ENOMEM);
         } else {
                 osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
-                rc = next->md_ops->mdo_statfs(info->mti_env, next,
-                                              &info->mti_u.ksfs);
-                statfs_pack(osfs, &info->mti_u.ksfs);
+                rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs);
         }
 
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_STATFS);
+               mdt_counter_incr(req, LPROC_MDT_STATFS);
 
         RETURN(rc);
 }
@@ -521,7 +512,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                 /* This object is located on remote node.*/
                 repbody->fid1 = *mdt_object_fid(o);
                 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
-                RETURN(0);
+                GOTO(out, rc = 0);
         }
 
         buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
@@ -580,21 +571,35 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
         } else if (S_ISLNK(la->la_mode) &&
                    reqbody->valid & OBD_MD_LINKNAME) {
                 buffer->lb_buf = ma->ma_lmm;
-                buffer->lb_len = reqbody->eadatasize;
+                /* eadatasize from client includes NULL-terminator, so
+                 * there is no need to read it */
+                buffer->lb_len = reqbody->eadatasize - 1;
                 rc = mo_readlink(env, next, buffer);
                 if (unlikely(rc <= 0)) {
                         CERROR("readlink failed: %d\n", rc);
                         rc = -EFAULT;
                 } else {
-                        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
-                                 rc -= 2;
-                        repbody->valid |= OBD_MD_LINKNAME;
-                        repbody->eadatasize = rc;
-                        /* NULL terminate */
-                        ((char*)ma->ma_lmm)[rc - 1] = 0;
-                        CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
-                               (char*)ma->ma_lmm, rc);
-                        rc = 0;
+                       int print_limit = min_t(int, CFS_PAGE_SIZE - 128, rc);
+
+                       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
+                               rc -= 2;
+                       repbody->valid |= OBD_MD_LINKNAME;
+                       /* we need to report back size with NULL-terminator
+                        * because client expects that */
+                       repbody->eadatasize = rc + 1;
+                       if (repbody->eadatasize != reqbody->eadatasize)
+                               CERROR("Read shorter symlink %d, expected %d\n",
+                                      rc, reqbody->eadatasize - 1);
+                       /* NULL terminate */
+                       ((char *)ma->ma_lmm)[rc] = 0;
+
+                       /* If the total CDEBUG() size is larger than a page, it
+                        * will print a warning to the console, avoid this by
+                        * printing just the last part of the symlink. */
+                       CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
+                              print_limit < rc ? "..." : "", print_limit,
+                              (char *)ma->ma_lmm + rc - print_limit, rc);
+                       rc = 0;
                 }
         }
 
@@ -663,6 +668,11 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                         RETURN(rc);
                 repbody->valid |= OBD_MD_FLMDSCAPA;
         }
+
+out:
+        if (rc == 0)
+               mdt_counter_incr(req, LPROC_MDT_GETATTR);
+
         RETURN(rc);
 }
 
@@ -700,14 +710,12 @@ static int mdt_renew_capa(struct mdt_thread_info *info)
 
 static int mdt_getattr(struct mdt_thread_info *info)
 {
-        struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_object       *obj = info->mti_object;
         struct req_capsule      *pill = info->mti_pill;
         struct mdt_body         *reqbody;
         struct mdt_body         *repbody;
         mode_t                   mode;
-        int                      md_size;
-        int rc;
+        int rc, rc2;
         ENTRY;
 
         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
@@ -725,13 +733,12 @@ static int mdt_getattr(struct mdt_thread_info *info)
         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
 
         mode = lu_object_attr(&obj->mot_obj.mo_lu);
-        if (S_ISLNK(mode) && (reqbody->valid & OBD_MD_LINKNAME) &&
-            (reqbody->eadatasize > info->mti_mdt->mdt_max_mdsize))
-                md_size = reqbody->eadatasize;
-        else
-                md_size = info->mti_mdt->mdt_max_mdsize;
 
-        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, md_size);
+        /* old clients may not report needed easize, use max value then */
+        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
+                             reqbody->eadatasize == 0 ?
+                             info->mti_mdt->mdt_max_mdsize :
+                             reqbody->eadatasize);
 
         rc = req_capsule_server_pack(pill);
         if (unlikely(rc != 0))
@@ -762,11 +769,10 @@ static int mdt_getattr(struct mdt_thread_info *info)
                 mdt_exit_ucred(info);
         EXIT;
 out_shrink:
-        if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_GETATTR);
-
         mdt_client_compatibility(info);
-        mdt_shrink_reply(info);
+        rc2 = mdt_fix_reply(info);
+        if (rc == 0)
+                rc = rc2;
         return rc;
 }
 
@@ -815,6 +821,7 @@ static int mdt_raw_lookup(struct mdt_thread_info *info,
         LASSERT(!info->mti_cross_ref);
 
         /* Only got the fid of this obj by name */
+        fid_zero(child_fid);
         rc = mdo_lookup(info->mti_env, next, lname, child_fid,
                         &info->mti_spec);
 #if 0
@@ -906,16 +913,16 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         }
         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
 
-        rc = mdt_object_exists(parent);
-        if (unlikely(rc == 0)) {
-                LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
-                                &parent->mot_obj.mo_lu,
-                                "Parent doesn't exist!\n");
-                RETURN(-ESTALE);
-        } else if (!info->mti_cross_ref) {
-                LASSERTF(rc > 0, "Parent "DFID" is on remote server\n",
-                         PFID(mdt_object_fid(parent)));
-        }
+       rc = mdt_object_exists(parent);
+       if (unlikely(rc == 0)) {
+               LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+                               &parent->mot_obj.mo_lu,
+                               "Parent doesn't exist!\n");
+               RETURN(-ESTALE);
+       } else if (!info->mti_cross_ref) {
+               LASSERTF(rc > 0, "Parent "DFID" is on remote server\n",
+                        PFID(mdt_object_fid(parent)));
+       }
         if (lname) {
                 rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
                 if (rc != 0) {
@@ -979,6 +986,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 }
 
                 /* step 2: lookup child's fid by name */
+                fid_zero(child_fid);
                 rc = mdo_lookup(info->mti_env, next, lname, child_fid,
                                 &info->mti_spec);
 
@@ -1122,7 +1130,7 @@ static int mdt_getattr_name(struct mdt_thread_info *info)
         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
         struct mdt_body        *reqbody;
         struct mdt_body        *repbody;
-        int rc;
+        int rc, rc2;
         ENTRY;
 
         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
@@ -1148,7 +1156,9 @@ static int mdt_getattr_name(struct mdt_thread_info *info)
         EXIT;
 out_shrink:
         mdt_client_compatibility(info);
-        mdt_shrink_reply(info);
+        rc2 = mdt_fix_reply(info);
+        if (rc == 0)
+                rc = rc2;
         return rc;
 }
 
@@ -1536,19 +1546,26 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                               __u32 op)
 {
         struct req_capsule      *pill = info->mti_pill;
-        struct mdt_device       *mdt = info->mti_mdt;
         struct md_quota         *mq = md_quota(info->mti_env);
         struct mdt_body         *repbody;
-        int                      rc = 0;
+        int                      rc = 0, rc2;
         ENTRY;
 
-        /* pack reply */
+
+        rc = mdt_reint_unpack(info, op);
+        if (rc != 0) {
+                CERROR("Can't unpack reint, rc %d\n", rc);
+                RETURN(err_serious(rc));
+        }
+
+        /* for replay (no_create) lmm is not needed, client has it already */
         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
-                                     mdt->mdt_max_mdsize);
+                                     info->mti_rr.rr_eadatalen);
+
         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
-                                     mdt->mdt_max_cookiesize);
+                                     info->mti_mdt->mdt_max_cookiesize);
 
         rc = req_capsule_server_pack(pill);
         if (rc != 0) {
@@ -1563,27 +1580,13 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                 repbody->aclsize = 0;
         }
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
-                GOTO(out_shrink, rc = err_serious(-EFAULT));
-
-        rc = mdt_reint_unpack(info, op);
-        if (rc != 0) {
-                CERROR("Can't unpack reint, rc %d\n", rc);
-                GOTO(out_shrink, rc = err_serious(rc));
-        }
-
         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
 
         /* for replay no cookkie / lmm need, because client have this already */
-        if (info->mti_spec.no_create == 1)  {
+        if (info->mti_spec.no_create)
                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
 
-                if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
-                        req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
-                                             0);
-        }
-
         rc = mdt_init_ucred_reint(info);
         if (rc)
                 GOTO(out_shrink, rc);
@@ -1603,7 +1606,9 @@ out_ucred:
         mdt_exit_ucred(info);
 out_shrink:
         mdt_client_compatibility(info);
-        mdt_shrink_reply(info);
+        rc2 = mdt_fix_reply(info);
+        if (rc == 0)
+                rc = rc2;
         return rc;
 }
 
@@ -1740,7 +1745,7 @@ static int mdt_sync(struct mdt_thread_info *info)
                         rc = err_serious(rc);
         }
         if (rc == 0)
-                mdt_counter_incr(req->rq_export, LPROC_MDT_SYNC);
+               mdt_counter_incr(req, LPROC_MDT_SYNC);
 
         RETURN(rc);
 }
@@ -2076,6 +2081,24 @@ static struct mdt_object *mdt_obj(struct lu_object *o)
         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
 }
 
+struct mdt_object *mdt_object_new(const struct lu_env *env,
+                                 struct mdt_device *d,
+                                 const struct lu_fid *f)
+{
+       struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
+       struct lu_object *o;
+       struct mdt_object *m;
+       ENTRY;
+
+       CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
+       o = lu_object_find(env, &d->mdt_md_dev.md_lu_dev, f, &conf);
+       if (unlikely(IS_ERR(o)))
+               m = (struct mdt_object *)o;
+       else
+               m = mdt_obj(o);
+       RETURN(m);
+}
+
 struct mdt_object *mdt_object_find(const struct lu_env *env,
                                    struct mdt_device *d,
                                    const struct lu_fid *f)
@@ -2184,7 +2207,7 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
                 struct lu_env env;
 
-                rc = lu_env_init(&env, LCT_MD_THREAD);
+                rc = lu_env_init(&env, LCT_LOCAL);
                 if (unlikely(rc != 0))
                         CWARN("lu_env initialization failed with rc = %d,"
                               "cannot start asynchronous commit\n", rc);
@@ -2507,16 +2530,13 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
                 rc = 0;
 
         if (rc == 0 && (flags & HABEO_REFERO)) {
-                struct mdt_device *mdt = info->mti_mdt;
-
                 /* Pack reply. */
-
                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
-                                             mdt->mdt_max_mdsize);
+                                             info->mti_body->eadatasize);
                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                         req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
-                                             mdt->mdt_max_cookiesize);
+                                             info->mti_mdt->mdt_max_cookiesize);
 
                 rc = req_capsule_server_pack(pill);
         }
@@ -3319,7 +3339,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         struct ptlrpc_request  *req;
         struct mdt_body        *reqbody;
         struct mdt_body        *repbody;
-        int                     rc;
+        int                     rc, rc2;
         ENTRY;
 
         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
@@ -3383,7 +3403,9 @@ out_ucred:
         mdt_exit_ucred(info);
 out_shrink:
         mdt_client_compatibility(info);
-        mdt_shrink_reply(info);
+        rc2 = mdt_fix_reply(info);
+        if (rc == 0)
+                rc = rc2;
         return rc;
 }
 
@@ -3904,73 +3926,90 @@ static void mdt_stop_ptlrpc_service(struct mdt_device *m)
 
 static int mdt_start_ptlrpc_service(struct mdt_device *m)
 {
-        int rc;
         static struct ptlrpc_service_conf conf;
         cfs_proc_dir_entry_t *procfs_entry;
-        ENTRY;
-
-        procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry;
-
-        conf = (typeof(conf)) {
-                .psc_nbufs           = MDS_NBUFS,
-                .psc_bufsize         = MDS_BUFSIZE,
-                .psc_max_req_size    = MDS_MAXREQSIZE,
-                .psc_max_reply_size  = MDS_MAXREPSIZE,
-                .psc_req_portal      = MDS_REQUEST_PORTAL,
-                .psc_rep_portal      = MDC_REPLY_PORTAL,
-                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
-                /*
-                 * We'd like to have a mechanism to set this on a per-device
-                 * basis, but alas...
-                 */
-                .psc_min_threads     = mdt_min_threads,
-                .psc_max_threads     = mdt_max_threads,
-                .psc_ctx_tags        = LCT_MD_THREAD
-        };
-
-        m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
-        ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
-                           "mdt_ldlm_client", m->mdt_ldlm_client);
-
-        m->mdt_regular_service =
-                ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
-                                     procfs_entry, target_print_req,
-                                     LUSTRE_MDT_NAME);
-        if (m->mdt_regular_service == NULL)
-                RETURN(-ENOMEM);
-
-        rc = ptlrpc_start_threads(m->mdt_regular_service);
-        if (rc)
-                GOTO(err_mdt_svc, rc);
-
-        /*
-         * readpage service configuration. Parameters have to be adjusted,
-         * ideally.
-         */
-        conf = (typeof(conf)) {
-                .psc_nbufs           = MDS_NBUFS,
-                .psc_bufsize         = MDS_BUFSIZE,
-                .psc_max_req_size    = MDS_MAXREQSIZE,
-                .psc_max_reply_size  = MDS_MAXREPSIZE,
-                .psc_req_portal      = MDS_READPAGE_PORTAL,
-                .psc_rep_portal      = MDC_REPLY_PORTAL,
-                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
-                .psc_min_threads     = mdt_min_threads,
-                .psc_max_threads     = mdt_max_threads,
-                .psc_ctx_tags        = LCT_MD_THREAD
-        };
-        m->mdt_readpage_service =
-                ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
-                                     LUSTRE_MDT_NAME "_readpage",
-                                     procfs_entry, target_print_req,"mdt_rdpg");
-
-        if (m->mdt_readpage_service == NULL) {
-                CERROR("failed to start readpage service\n");
-                GOTO(err_mdt_svc, rc = -ENOMEM);
+       int rc = 0;
+       ENTRY;
+
+       m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
+       ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+                          "mdt_ldlm_client", m->mdt_ldlm_client);
+
+       procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry;
+
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME,
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = MDS_MAXREQSIZE,
+                       .bc_rep_max_size        = MDS_MAXREPSIZE,
+                       .bc_req_portal          = MDS_REQUEST_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               /*
+                * We'd like to have a mechanism to set this on a per-device
+                * basis, but alas...
+                */
+               .psc_thr                = {
+                       .tc_thr_name            = LUSTRE_MDT_NAME,
+                       .tc_nthrs_min           = MDT_MIN_THREADS,
+                       .tc_nthrs_max           = MDT_MAX_THREADS,
+                       .tc_nthrs_user          = mdt_num_threads,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mdt_regular_handle,
+                       .so_req_printer         = target_print_req,
+               },
+       };
+       m->mdt_regular_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mdt_regular_service)) {
+               rc = PTR_ERR(m->mdt_regular_service);
+               CERROR("failed to start regular mdt service: %d\n", rc);
+               m->mdt_regular_service = NULL;
+
+               RETURN(rc);
+       }
+
+       /*
+        * readpage service configuration. Parameters have to be adjusted,
+        * ideally.
+        */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_readpage",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = MDS_MAXREQSIZE,
+                       .bc_rep_max_size        = MDS_MAXREPSIZE,
+                       .bc_req_portal          = MDS_READPAGE_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = "mdt_rdpg",
+                       .tc_nthrs_min           = MDT_MIN_THREADS,
+                       .tc_nthrs_max           = MDT_MAX_THREADS,
+                       .tc_nthrs_user          = mdt_num_threads,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mdt_readpage_handle,
+                       .so_req_printer         = target_print_req,
+               },
+       };
+       m->mdt_readpage_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mdt_readpage_service)) {
+               rc = PTR_ERR(m->mdt_readpage_service);
+               CERROR("failed to start readpage service: %d\n", rc);
+               m->mdt_readpage_service = NULL;
+
+               GOTO(err_mdt_svc, rc);
         }
 
-        rc = ptlrpc_start_threads(m->mdt_readpage_service);
-
         /*
          * setattr service configuration.
          *
@@ -3978,180 +4017,220 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * preserve this portal for a certain time, it should be removed
          * eventually. LU-617.
          */
-        conf = (typeof(conf)) {
-                .psc_nbufs           = MDS_NBUFS,
-                .psc_bufsize         = MDS_BUFSIZE,
-                .psc_max_req_size    = MDS_MAXREQSIZE,
-                .psc_max_reply_size  = MDS_MAXREPSIZE,
-                .psc_req_portal      = MDS_SETATTR_PORTAL,
-                .psc_rep_portal      = MDC_REPLY_PORTAL,
-                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
-                .psc_min_threads     = mdt_min_threads,
-                .psc_max_threads     = mdt_max_threads,
-                .psc_ctx_tags        = LCT_MD_THREAD
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_setattr",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = MDS_MAXREQSIZE,
+                       .bc_rep_max_size        = MDS_MAXREPSIZE,
+                       .bc_req_portal          = MDS_SETATTR_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = "mdt_attr",
+                       .tc_nthrs_min           = MDT_MIN_THREADS,
+                       .tc_nthrs_max           = MDT_MAX_THREADS,
+                       .tc_nthrs_user          = mdt_num_threads,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mdt_regular_handle,
+                       .so_req_printer         = target_print_req,
+               },
+       };
+       m->mdt_setattr_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mdt_setattr_service)) {
+               rc = PTR_ERR(m->mdt_setattr_service);
+               CERROR("failed to start setattr service: %d\n", rc);
+               m->mdt_setattr_service = NULL;
+
+               GOTO(err_mdt_svc, rc);
+       }
+
+       /*
+        * sequence controller service configuration
+        */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_mdsc",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = SEQ_MAXREQSIZE,
+                       .bc_rep_max_size        = SEQ_MAXREPSIZE,
+                       .bc_req_portal          = SEQ_CONTROLLER_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = "mdt_mdsc",
+                       .tc_nthrs_min           = MDT_MIN_THREADS,
+                       .tc_nthrs_max           = MDT_MAX_THREADS,
+                       .tc_nthrs_user          = mdt_num_threads,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mdt_mdsc_handle,
+                       .so_req_printer         = target_print_req,
+               },
+       };
+       m->mdt_mdsc_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mdt_mdsc_service)) {
+               rc = PTR_ERR(m->mdt_mdsc_service);
+               CERROR("failed to start seq controller service: %d\n", rc);
+               m->mdt_mdsc_service = NULL;
+
+               GOTO(err_mdt_svc, rc);
+       }
+
+       /*
+        * metadata sequence server service configuration
+        */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_mdss",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = SEQ_MAXREQSIZE,
+                       .bc_rep_max_size        = SEQ_MAXREPSIZE,
+                       .bc_req_portal          = SEQ_METADATA_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = "mdt_mdss",
+                       .tc_nthrs_min           = MDT_MIN_THREADS,
+                       .tc_nthrs_max           = MDT_MAX_THREADS,
+                       .tc_nthrs_user          = mdt_num_threads,
+                       .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mdt_mdss_handle,
+                       .so_req_printer         = target_print_req,
+               },
         };
-
-        m->mdt_setattr_service =
-                ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
-                                     LUSTRE_MDT_NAME "_setattr",
-                                     procfs_entry, target_print_req,"mdt_attr");
-
-        if (!m->mdt_setattr_service) {
-                CERROR("failed to start setattr service\n");
-                GOTO(err_mdt_svc, rc = -ENOMEM);
-        }
-
-        rc = ptlrpc_start_threads(m->mdt_setattr_service);
-        if (rc)
-                GOTO(err_mdt_svc, rc);
-
-        /*
-         * sequence controller service configuration
-         */
-        conf = (typeof(conf)) {
-                .psc_nbufs           = MDS_NBUFS,
-                .psc_bufsize         = MDS_BUFSIZE,
-                .psc_max_req_size    = SEQ_MAXREQSIZE,
-                .psc_max_reply_size  = SEQ_MAXREPSIZE,
-                .psc_req_portal      = SEQ_CONTROLLER_PORTAL,
-                .psc_rep_portal      = MDC_REPLY_PORTAL,
-                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
-                .psc_min_threads     = mdt_min_threads,
-                .psc_max_threads     = mdt_max_threads,
-                .psc_ctx_tags        = LCT_MD_THREAD|LCT_DT_THREAD
+       m->mdt_mdss_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mdt_mdss_service)) {
+               rc = PTR_ERR(m->mdt_mdss_service);
+               CERROR("failed to start metadata seq server service: %d\n", rc);
+               m->mdt_mdss_service = NULL;
+
+               GOTO(err_mdt_svc, rc);
+       }
+
+       /*
+        * Data sequence server service configuration. We want to have really
+        * cluster-wide sequences space. This is why we start only one sequence
+        * controller which manages space.
+        */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_dtss",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = SEQ_MAXREQSIZE,
+                       .bc_rep_max_size        = SEQ_MAXREPSIZE,
+                       .bc_req_portal          = SEQ_DATA_PORTAL,
+                       .bc_rep_portal          = OSC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = "mdt_dtss",
+                       .tc_nthrs_min           = MDT_MIN_THREADS,
+                       .tc_nthrs_max           = MDT_MAX_THREADS,
+                       .tc_nthrs_user          = mdt_num_threads,
+                       .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mdt_dtss_handle,
+                       .so_req_printer         = target_print_req,
+               },
         };
-
-        m->mdt_mdsc_service =
-                ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
-                                     LUSTRE_MDT_NAME"_mdsc",
-                                     procfs_entry, target_print_req,"mdt_mdsc");
-        if (!m->mdt_mdsc_service) {
-                CERROR("failed to start seq controller service\n");
-                GOTO(err_mdt_svc, rc = -ENOMEM);
-        }
-
-        rc = ptlrpc_start_threads(m->mdt_mdsc_service);
-        if (rc)
-                GOTO(err_mdt_svc, rc);
-
-        /*
-         * metadata sequence server service configuration
-         */
-        conf = (typeof(conf)) {
-                .psc_nbufs           = MDS_NBUFS,
-                .psc_bufsize         = MDS_BUFSIZE,
-                .psc_max_req_size    = SEQ_MAXREQSIZE,
-                .psc_max_reply_size  = SEQ_MAXREPSIZE,
-                .psc_req_portal      = SEQ_METADATA_PORTAL,
-                .psc_rep_portal      = MDC_REPLY_PORTAL,
-                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
-                .psc_min_threads     = mdt_min_threads,
-                .psc_max_threads     = mdt_max_threads,
-                .psc_ctx_tags        = LCT_MD_THREAD|LCT_DT_THREAD
-        };
-
-        m->mdt_mdss_service =
-                ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
-                                     LUSTRE_MDT_NAME"_mdss",
-                                     procfs_entry, target_print_req,"mdt_mdss");
-        if (!m->mdt_mdss_service) {
-                CERROR("failed to start metadata seq server service\n");
-                GOTO(err_mdt_svc, rc = -ENOMEM);
-        }
-
-        rc = ptlrpc_start_threads(m->mdt_mdss_service);
-        if (rc)
-                GOTO(err_mdt_svc, rc);
-
-
-        /*
-         * Data sequence server service configuration. We want to have really
-         * cluster-wide sequences space. This is why we start only one sequence
-         * controller which manages space.
-         */
-        conf = (typeof(conf)) {
-                .psc_nbufs           = MDS_NBUFS,
-                .psc_bufsize         = MDS_BUFSIZE,
-                .psc_max_req_size    = SEQ_MAXREQSIZE,
-                .psc_max_reply_size  = SEQ_MAXREPSIZE,
-                .psc_req_portal      = SEQ_DATA_PORTAL,
-                .psc_rep_portal      = OSC_REPLY_PORTAL,
+       m->mdt_dtss_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mdt_dtss_service)) {
+               rc = PTR_ERR(m->mdt_dtss_service);
+               CERROR("failed to start data seq server service: %d\n", rc);
+               m->mdt_dtss_service = NULL;
+
+               GOTO(err_mdt_svc, rc);
+       }
+
+       /* FLD service start */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name            = LUSTRE_MDT_NAME "_fld",
                 .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
-                .psc_min_threads     = mdt_min_threads,
-                .psc_max_threads     = mdt_max_threads,
-                .psc_ctx_tags        = LCT_MD_THREAD|LCT_DT_THREAD
-        };
-
-        m->mdt_dtss_service =
-                ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
-                                     LUSTRE_MDT_NAME"_dtss",
-                                     procfs_entry, target_print_req,"mdt_dtss");
-        if (!m->mdt_dtss_service) {
-                CERROR("failed to start data seq server service\n");
-                GOTO(err_mdt_svc, rc = -ENOMEM);
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = FLD_MAXREQSIZE,
+                       .bc_rep_max_size        = FLD_MAXREPSIZE,
+                       .bc_req_portal          = FLD_REQUEST_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = "mdt_fld",
+                       .tc_nthrs_min           = MDT_MIN_THREADS,
+                       .tc_nthrs_max           = MDT_MAX_THREADS,
+                       .tc_nthrs_user          = mdt_num_threads,
+                       .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mdt_fld_handle,
+                       .so_req_printer         = target_print_req,
+               },
+       };
+       m->mdt_fld_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mdt_fld_service)) {
+               rc = PTR_ERR(m->mdt_fld_service);
+               CERROR("failed to start fld service: %d\n", rc);
+               m->mdt_fld_service = NULL;
+
+               GOTO(err_mdt_svc, rc);
+       }
+
+       /*
+        * mds-mds service configuration. Separate portal is used to allow
+        * mds-mds requests be not blocked during recovery.
+        */
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_mds",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = MDS_NBUFS,
+                       .bc_buf_size            = MDS_BUFSIZE,
+                       .bc_req_max_size        = MDS_MAXREQSIZE,
+                       .bc_rep_max_size        = MDS_MAXREPSIZE,
+                       .bc_req_portal          = MDS_MDS_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = "mdt_mds",
+                       .tc_nthrs_min           = MDT_MIN_THREADS,
+                       .tc_nthrs_max           = MDT_MAX_THREADS,
+                       .tc_nthrs_user          = mdt_num_threads,
+                       .tc_ctx_tags            = LCT_MD_THREAD,
+               },
+               .psc_ops                = {
+                       .so_req_handler         = mdt_xmds_handle,
+                       .so_req_printer         = target_print_req,
+               },
+       };
+       m->mdt_xmds_service = ptlrpc_register_service(&conf, procfs_entry);
+       if (IS_ERR(m->mdt_xmds_service)) {
+               rc = PTR_ERR(m->mdt_xmds_service);
+               CERROR("failed to start xmds service: %d\n", rc);
+               m->mdt_xmds_service = NULL;
+
+               GOTO(err_mdt_svc, rc);
         }
 
-        rc = ptlrpc_start_threads(m->mdt_dtss_service);
-        if (rc)
-                GOTO(err_mdt_svc, rc);
-
-        /* FLD service start */
-        conf = (typeof(conf)) {
-                .psc_nbufs           = MDS_NBUFS,
-                .psc_bufsize         = MDS_BUFSIZE,
-                .psc_max_req_size    = FLD_MAXREQSIZE,
-                .psc_max_reply_size  = FLD_MAXREPSIZE,
-                .psc_req_portal      = FLD_REQUEST_PORTAL,
-                .psc_rep_portal      = MDC_REPLY_PORTAL,
-                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
-                .psc_min_threads     = mdt_min_threads,
-                .psc_max_threads     = mdt_max_threads,
-                .psc_ctx_tags        = LCT_DT_THREAD|LCT_MD_THREAD
-        };
-
-        m->mdt_fld_service =
-                ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
-                                     LUSTRE_MDT_NAME"_fld",
-                                     procfs_entry, target_print_req, "mdt_fld");
-        if (!m->mdt_fld_service) {
-                CERROR("failed to start fld service\n");
-                GOTO(err_mdt_svc, rc = -ENOMEM);
-        }
-
-        rc = ptlrpc_start_threads(m->mdt_fld_service);
-        if (rc)
-                GOTO(err_mdt_svc, rc);
-
-        /*
-         * mds-mds service configuration. Separate portal is used to allow
-         * mds-mds requests be not blocked during recovery.
-         */
-        conf = (typeof(conf)) {
-                .psc_nbufs           = MDS_NBUFS,
-                .psc_bufsize         = MDS_BUFSIZE,
-                .psc_max_req_size    = MDS_MAXREQSIZE,
-                .psc_max_reply_size  = MDS_MAXREPSIZE,
-                .psc_req_portal      = MDS_MDS_PORTAL,
-                .psc_rep_portal      = MDC_REPLY_PORTAL,
-                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
-                .psc_min_threads     = mdt_min_threads,
-                .psc_max_threads     = mdt_max_threads,
-                .psc_ctx_tags        = LCT_MD_THREAD
-        };
-        m->mdt_xmds_service =
-                ptlrpc_init_svc_conf(&conf, mdt_xmds_handle,
-                                     LUSTRE_MDT_NAME "_mds",
-                                     procfs_entry, target_print_req,"mdt_xmds");
-
-        if (m->mdt_xmds_service == NULL) {
-                CERROR("failed to start xmds service\n");
-                GOTO(err_mdt_svc, rc = -ENOMEM);
-        }
-
-        rc = ptlrpc_start_threads(m->mdt_xmds_service);
-        if (rc)
-                GOTO(err_mdt_svc, rc);
-
         EXIT;
 err_mdt_svc:
         if (rc)
@@ -4502,8 +4581,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         obd = class_name2obd(dev);
         LASSERT(obd != NULL);
 
-        m->mdt_max_mdsize = MAX_MD_SIZE;
+        m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
+
         m->mdt_som_conf = 0;
 
         m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
@@ -4821,8 +4901,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                 lu_object_init(o, h, d);
                 lu_object_add_top(h, o);
                 o->lo_ops = &mdt_obj_ops;
-                cfs_sema_init(&mo->mot_ioepoch_sem, 1);
-                cfs_sema_init(&mo->mot_lov_sem, 1);
+                cfs_mutex_init(&mo->mot_ioepoch_mutex);
+                cfs_mutex_init(&mo->mot_lov_mutex);
                 RETURN(o);
         } else
                 RETURN(NULL);
@@ -4887,7 +4967,8 @@ static const struct lu_object_operations mdt_obj_ops = {
         .loo_object_print   = mdt_object_print
 };
 
-static int mdt_obd_set_info_async(struct obd_export *exp,
+static int mdt_obd_set_info_async(const struct lu_env *env,
+                                  struct obd_export *exp,
                                   __u32 keylen, void *key,
                                   __u32 vallen, void *val,
                                   struct ptlrpc_request_set *set)
@@ -5064,14 +5145,12 @@ static int mdt_obd_connect(const struct lu_env *env,
 
         rc = mdt_connect_internal(lexp, mdt, data);
         if (rc == 0) {
-                struct mdt_thread_info *mti;
                 struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd;
+
                 LASSERT(lcd);
-                mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-                LASSERT(mti != NULL);
-                mti->mti_exp = lexp;
-                memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
-                rc = mdt_client_new(env, mdt);
+               info->mti_exp = lexp;
+               memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
+               rc = lut_client_new(env, lexp);
                 if (rc == 0)
                         mdt_export_stats_init(obd, lexp, localdata);
         }
@@ -5116,6 +5195,7 @@ static int mdt_obd_reconnect(const struct lu_env *env,
 
         RETURN(rc);
 }
+
 static int mdt_export_cleanup(struct obd_export *exp)
 {
         struct mdt_export_data *med = &exp->exp_mdt_data;
@@ -5195,7 +5275,7 @@ out_lmm:
         /* cleanup client slot early */
         /* Do not erase record for recoverable client. */
         if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
-                mdt_client_del(&env, mdt);
+               lut_client_del(&env, exp);
         lu_env_fini(&env);
 
         RETURN(rc);
@@ -5227,7 +5307,7 @@ static int mdt_init_export(struct obd_export *exp)
 
         CFS_INIT_LIST_HEAD(&med->med_open_head);
         cfs_spin_lock_init(&med->med_open_lock);
-        cfs_sema_init(&med->med_idmap_sem, 1);
+        cfs_mutex_init(&med->med_idmap_mutex);
         med->med_idmap = NULL;
         cfs_spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
@@ -5239,12 +5319,21 @@ static int mdt_init_export(struct obd_export *exp)
                 RETURN(0);
 
         rc = lut_client_alloc(exp);
-        if (rc == 0)
-                rc = ldlm_init_export(exp);
         if (rc)
-                CERROR("%s: Error %d while initializing export\n",
-                       exp->exp_obd->obd_name, rc);
+               GOTO(err, rc);
+
+       rc = ldlm_init_export(exp);
+       if (rc)
+               GOTO(err_free, rc);
+
         RETURN(rc);
+
+err_free:
+       lut_client_free(exp);
+err:
+       CERROR("%s: Error %d while initializing export\n",
+              exp->exp_obd->obd_name, rc);
+       return rc;
 }
 
 static int mdt_destroy_export(struct obd_export *exp)
@@ -5552,7 +5641,7 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 rc = mdt_device_sync(&env, mdt);
                 break;
         case OBD_IOC_SET_READONLY:
-                dt->dd_ops->dt_ro(&env, dt);
+                rc = dt->dd_ops->dt_ro(&env, dt);
                 break;
         case OBD_IOC_ABORT_RECOVERY:
                 CERROR("Aborting recovery for device %s\n", obd->obd_name);
@@ -5756,7 +5845,7 @@ void mdt_enable_cos(struct mdt_device *mdt, int val)
         int rc;
 
         mdt->mdt_opts.mo_cos = !!val;
-        rc = lu_env_init(&env, LCT_MD_THREAD);
+        rc = lu_env_init(&env, LCT_LOCAL);
         if (unlikely(rc != 0)) {
                 CWARN("lu_env initialization failed with rc = %d,"
                       "cannot sync\n", rc);
@@ -5800,30 +5889,11 @@ static struct lu_device_type mdt_device_type = {
         .ldt_ctx_tags = LCT_MD_THREAD
 };
 
-static struct lu_local_obj_desc mdt_last_recv = {
-        .llod_name      = LAST_RCVD,
-        .llod_oid       = MDT_LAST_RECV_OID,
-        .llod_is_index  = 0,
-};
-
 static int __init mdt_mod_init(void)
 {
         struct lprocfs_static_vars lvars;
         int rc;
 
-        llo_local_obj_register(&mdt_last_recv);
-
-        if (mdt_num_threads > 0) {
-                if (mdt_num_threads > MDT_MAX_THREADS)
-                        mdt_num_threads = MDT_MAX_THREADS;
-                if (mdt_num_threads < MDT_MIN_THREADS)
-                        mdt_num_threads = MDT_MIN_THREADS;
-                mdt_max_threads = mdt_min_threads = mdt_num_threads;
-        } else {
-                mdt_max_threads = MDT_MAX_THREADS;
-                mdt_min_threads = MDT_MIN_THREADS;
-        }
-
         lprocfs_mdt_init_vars(&lvars);
         rc = class_register_type(&mdt_obd_device_ops, NULL,
                                  lvars.module_vars, LUSTRE_MDT_NAME,
@@ -5834,7 +5904,6 @@ static int __init mdt_mod_init(void)
 
 static void __exit mdt_mod_exit(void)
 {
-        llo_local_obj_unregister(&mdt_last_recv);
         class_unregister_type(LUSTRE_MDT_NAME);
 }