Whamcloud - gitweb
b=16777
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 2e35988..bc457d4 100644 (file)
@@ -1,35 +1,49 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  lustre/mdt/mdt_handler.c
- *  Lustre Metadata Target (mdt) request handler
+ * GPL HEADER START
  *
- *  Copyright (c) 2006 Cluster File Systems, Inc.
- *   Author: Peter Braam <braam@clusterfs.com>
- *   Author: Andreas Dilger <adilger@clusterfs.com>
- *   Author: Phil Schwan <phil@clusterfs.com>
- *   Author: Mike Shaver <shaver@clusterfs.com>
- *   Author: Nikita Danilov <nikita@clusterfs.com>
- *   Author: Huang Hua <huanghua@clusterfs.com>
- *   Author: Yury Umanets <umka@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/mdt/mdt_handler.c
+ *
+ * Lustre Metadata Target (mdt) request handler
+ *
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ * Author: Mike Shaver <shaver@clusterfs.com>
+ * Author: Nikita Danilov <nikita@clusterfs.com>
+ * Author: Huang Hua <huanghua@clusterfs.com>
+ * Author: Yury Umanets <umka@clusterfs.com>
  */
 
 #ifndef EXPORT_SYMTAB
@@ -53,7 +67,7 @@
 #include <lustre_mds.h>
 #include <lustre_mdt.h>
 #include "mdt_internal.h"
-#include <linux/lustre_acl.h>
+#include <lustre_acl.h>
 #include <lustre_param.h>
 
 mdl_mode_t mdt_mdl_lock_modes[] = {
@@ -319,15 +333,19 @@ static int mdt_getstatus(struct mdt_thread_info *info)
 
 static int mdt_statfs(struct mdt_thread_info *info)
 {
-        struct md_device  *next  = info->mti_mdt->mdt_child;
-        struct obd_statfs *osfs;
-        int                rc;
+        struct md_device      *next  = info->mti_mdt->mdt_child;
+        struct ptlrpc_service *svc;
+        struct obd_statfs     *osfs;
+        int                    rc;
 
         ENTRY;
 
+        svc = info->mti_pill->rc_req->rq_rqbd->rqbd_service;
+
         /* This will trigger a watchdog timeout */
         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
-                         (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
+                         (MDT_SERVICE_WATCHDOG_FACTOR *
+                          at_get(&svc->srv_at_estimate) / 1000) + 1);
 
         rc = mdt_check_ucred(info);
         if (rc)
@@ -923,8 +941,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 LDLM_LOCK_PUT(lock);
                 rc = 0;
         } else {
-                struct md_attr *ma = &info->mti_attr;
+                struct md_attr *ma;
 relock:
+                ma = &info->mti_attr;
+
                 mdt_lock_handle_init(lhc);
                 mdt_lock_reg_init(lhc, LCK_PR);
 
@@ -932,6 +952,7 @@ relock:
                         LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
                                         &child->mot_obj.mo_lu,
                                         "Object doesn't exist!\n");
+                        GOTO(out_child, rc = -ESTALE);
                 }
 
                 ma->ma_valid = 0;
@@ -966,7 +987,6 @@ relock:
                 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
                 if (lock) {
                         struct mdt_body *repbody;
-                        struct lu_attr *ma;
 
                         /* Debugging code. */
                         res_id = &lock->l_resource->lr_name;
@@ -984,7 +1004,6 @@ relock:
                          */
                         repbody = req_capsule_server_get(info->mti_pill,
                                                          &RMF_MDT_BODY);
-                        ma = &info->mti_attr.ma_attr;
                         if (lock->l_policy_data.l_inodebits.bits &
                             MDS_INODELOCK_UPDATE)
                                 mdt_pack_size2body(info, child);
@@ -1068,8 +1087,7 @@ static int mdt_set_info(struct mdt_thread_info *info)
                 RETURN(-EFAULT);
         }
 
-        if (keylen != (sizeof(KEY_READ_ONLY) - 1) ||
-            memcmp(key, KEY_READ_ONLY, keylen) != 0)
+        if (!KEY_IS(KEY_READ_ONLY))
                 RETURN(-EINVAL);
 
         req->rq_status = 0;
@@ -1123,6 +1141,7 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
         int                      tmpcount;
         int                      tmpsize;
+        int                      timeout;
         int                      i;
         int                      rc;
         ENTRY;
@@ -1144,9 +1163,13 @@ static int mdt_sendpage(struct mdt_thread_info *info,
                 GOTO(free_desc, rc);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
-                GOTO(abort_bulk, rc);
+                GOTO(abort_bulk, rc = 0);
 
-        *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
+        timeout = (int) req->rq_deadline - cfs_time_current_sec();
+        if (timeout < 0)
+                CERROR("Req deadline already passed %lu (now: %lu)\n",
+                       req->rq_deadline, cfs_time_current_sec());
+        *lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL);
         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
         LASSERT (rc == 0 || rc == -ETIMEDOUT);
 
@@ -1507,12 +1530,6 @@ static int mdt_reint(struct mdt_thread_info *info)
 
         ENTRY;
 
-        if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MDS_REINT_NET,
-                                 OBD_FAIL_MDS_REINT_NET)) {
-                info->mti_fail_id = OBD_FAIL_MDS_REINT_NET;
-                RETURN(0);
-        }
-
         opc = mdt_reint_opcode(info, reint_fmts);
         if (opc >= 0) {
                 /*
@@ -1528,18 +1545,33 @@ static int mdt_reint(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
-/* TODO these two methods not available now. */
-
 /* this should sync the whole device */
-static int mdt_device_sync(struct mdt_thread_info *info)
+static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
 {
-        return 0;
+        struct dt_device *dt = mdt->mdt_bottom;
+        int rc;
+        ENTRY;
+
+        rc = dt->dd_ops->dt_sync(env, dt);
+        RETURN(rc);
 }
 
 /* this should sync this object */
 static int mdt_object_sync(struct mdt_thread_info *info)
 {
-        return 0;
+        struct md_object *next;
+        int rc;
+        ENTRY;
+
+        if (!mdt_object_exists(info->mti_object)) {
+                CWARN("Non existing object  "DFID"!\n",
+                      PFID(mdt_object_fid(info->mti_object)));
+                RETURN(-ESTALE);
+        }
+        next = mdt_object_child(info->mti_object);
+        rc = mo_object_sync(info->mti_env, next);
+
+        RETURN(rc);
 }
 
 static int mdt_sync(struct mdt_thread_info *info)
@@ -1563,7 +1595,7 @@ static int mdt_sync(struct mdt_thread_info *info)
                 /* sync the whole device */
                 rc = req_capsule_server_pack(pill);
                 if (rc == 0)
-                        rc = mdt_device_sync(info);
+                        rc = mdt_device_sync(info->mti_env, info->mti_mdt);
                 else
                         rc = err_serious(rc);
         } else {
@@ -1652,11 +1684,6 @@ static int mdt_enqueue(struct mdt_thread_info *info)
          */
         LASSERT(info->mti_dlm_req != NULL);
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) {
-                info->mti_fail_id = OBD_FAIL_LDLM_ENQUEUE;
-                return 0;
-        }
-
         req = mdt_info_req(info);
 
         /*
@@ -1718,6 +1745,8 @@ static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
                         sptlrpc_svc_ctx_invalidate(req);
         }
 
+        OBD_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, obd_fail_val);
+
         return rc;
 }
 
@@ -1832,6 +1861,23 @@ out:
         RETURN(rc);
 }
 
+static inline
+void mdt_save_lock(struct ptlrpc_request *req, struct lustre_handle *h,
+                   ldlm_mode_t mode, int decref)
+{
+        ENTRY;
+
+        if (lustre_handle_is_used(h)) {
+                if (decref)
+                        mdt_fid_unlock(h, mode);
+                else
+                        ptlrpc_save_lock(req, h, mode);
+                h->cookie = 0ull;
+        }
+
+        EXIT;
+}
+
 /*
  * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
  * to save this lock in req.  when transaction committed, req will be released,
@@ -1843,23 +1889,8 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
         struct ptlrpc_request *req = mdt_info_req(info);
         ENTRY;
 
-        if (lustre_handle_is_used(&lh->mlh_pdo_lh)) {
-                /* Do not save PDO locks to request, just decref. */
-                mdt_fid_unlock(&lh->mlh_pdo_lh,
-                               lh->mlh_pdo_mode);
-                lh->mlh_pdo_lh.cookie = 0ull;
-        }
-
-        if (lustre_handle_is_used(&lh->mlh_reg_lh)) {
-                if (decref) {
-                        mdt_fid_unlock(&lh->mlh_reg_lh,
-                                       lh->mlh_reg_mode);
-                } else {
-                        ptlrpc_save_lock(req, &lh->mlh_reg_lh,
-                                         lh->mlh_reg_mode);
-                }
-                lh->mlh_reg_lh.cookie = 0ull;
-        }
+        mdt_save_lock(req, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
+        mdt_save_lock(req, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
 
         EXIT;
 }
@@ -2050,22 +2081,18 @@ static int mdt_req_handle(struct mdt_thread_info *info,
         LASSERT(current->journal_info == NULL);
 
         /*
-         * Mask out OBD_FAIL_ONCE, because that will stop
-         * correct handling of failed req later in ldlm due to doing
-         * obd_fail_loc |= OBD_FAIL_ONCE without actually
-         * correct actions like it is done in target_send_reply_msg().
+         * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try 
+         * to put same checks into handlers like mdt_close(), mdt_reint(), 
+         * etc., without talking to mdt authors first. Checking same thing
+         * there again is useless and returning 0 error wihtout packing reply
+         * is buggy! Handlers either pack reply or return error.
+         *
+         * We return 0 here and do not send any reply in order to emulate
+         * network failure. Do not send any reply in case any of NET related
+         * fail_id has occured.
          */
-        if (h->mh_fail_id != 0) {
-                /*
-                 * Set to info->mti_fail_id to handler fail_id, it will be used
-                 * later, and better than use default fail_id.
-                 */
-                if (OBD_FAIL_CHECK_RESET(h->mh_fail_id && OBD_FAIL_MASK_LOC,
-                                         h->mh_fail_id & ~OBD_FAILED)) {
-                        info->mti_fail_id = h->mh_fail_id;
-                        RETURN(0);
-                }
-        }
+        if (OBD_FAIL_CHECK_ORSET(h->mh_fail_id, OBD_FAIL_ONCE))
+                RETURN(0);
 
         rc = 0;
         flags = h->mh_flags;
@@ -2112,6 +2139,13 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                  * only
                  */
                 rc = h->mh_act(info);
+                if (rc == 0 &&
+                    !req->rq_no_reply && req->rq_reply_state == NULL) {
+                        DEBUG_REQ(D_ERROR, req, "MDT \"handler\" %s did not "
+                                  "pack reply and returned 0 error\n",
+                                  h->mh_name);
+                        LBUG();
+                }
                 serious = is_serious(rc);
                 rc = clear_serious(rc);
         } else
@@ -2147,7 +2181,8 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                 LBUG();
         }
 
-        RETURN(rc);
+        target_send_reply(req, rc, info->mti_fail_id);
+        RETURN(0);
 }
 
 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
@@ -2341,21 +2376,6 @@ static int mdt_recovery(struct mdt_thread_info *info)
         RETURN(+1);
 }
 
-static int mdt_reply(struct ptlrpc_request *req, int rc,
-                     struct mdt_thread_info *info)
-{
-        ENTRY;
-
-#if 0
-        if (req->rq_reply_state == NULL && rc == 0) {
-                req->rq_status = rc;
-                lustre_pack_reply(req, 1, NULL, NULL);
-        }
-#endif
-        target_send_reply(req, rc, info->mti_fail_id);
-        RETURN(0);
-}
-
 static int mdt_msg_check_version(struct lustre_msg *msg)
 {
         int rc;
@@ -2459,7 +2479,6 @@ static int mdt_handle0(struct ptlrpc_request *req,
                                              supported);
                         if (likely(h != NULL)) {
                                 rc = mdt_req_handle(info, h, req);
-                                rc = mdt_reply(req, rc, info);
                         } else {
                                 CERROR("The unsupported opc: 0x%x\n", lustre_msg_get_opc(msg) );
                                 req->rq_status = -ENOTSUPP;
@@ -2693,27 +2712,23 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
                 RETURN(ELDLM_LOCK_REPLACED);
         }
 
-        /* This lock might already be given to the client by an resent req,
-         * in this case we should return ELDLM_LOCK_ABORTED,
-         * so we should check led_held_locks here, but it will affect
-         * performance, FIXME
+        /* 
+         * Fixup the lock to be given to the client. 
          */
-        /* Fixup the lock to be given to the client */
         lock_res_and_lock(new_lock);
         new_lock->l_readers = 0;
         new_lock->l_writers = 0;
 
         new_lock->l_export = class_export_get(req->rq_export);
-        spin_lock(&req->rq_export->exp_ldlm_data.led_lock);
-        list_add(&new_lock->l_export_chain,
-                 &new_lock->l_export->exp_ldlm_data.led_held_locks);
-        spin_unlock(&req->rq_export->exp_ldlm_data.led_lock);
-
         new_lock->l_blocking_ast = lock->l_blocking_ast;
         new_lock->l_completion_ast = lock->l_completion_ast;
         new_lock->l_remote_handle = lock->l_remote_handle;
         new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
+        lustre_hash_add(new_lock->l_export->exp_lock_hash,
+                        &new_lock->l_remote_handle, 
+                        &new_lock->l_exp_hash);
+
         unlock_res_and_lock(new_lock);
         LDLM_LOCK_PUT(new_lock);
         lh->mlh_reg_lh.cookie = 0;
@@ -2730,7 +2745,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
         struct obd_export      *exp = req->rq_export;
         struct lustre_handle    remote_hdl;
         struct ldlm_request    *dlmreq;
-        struct list_head       *iter;
+        struct ldlm_lock       *lock;
 
         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
                 return;
@@ -2738,27 +2753,24 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
         dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
         remote_hdl = dlmreq->lock_handle[0];
 
-        spin_lock(&exp->exp_ldlm_data.led_lock);
-        list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
-                struct ldlm_lock *lock;
-                lock = list_entry(iter, struct ldlm_lock, l_export_chain);
-                if (lock == new_lock)
-                        continue;
-                if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
+        lock = lustre_hash_lookup(exp->exp_lock_hash, &remote_hdl);
+        if (lock) {
+                if (lock != new_lock) {
                         lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
                         lh->mlh_reg_mode = lock->l_granted_mode;
 
-                        LDLM_DEBUG(lock, "restoring lock cookie");
+                        LDLM_DEBUG(lock, "Restoring lock cookie");
                         DEBUG_REQ(D_DLMTRACE, req,
                                   "restoring lock cookie "LPX64,
                                   lh->mlh_reg_lh.cookie);
                         if (old_lock)
                                 *old_lock = LDLM_LOCK_GET(lock);
-                        spin_unlock(&exp->exp_ldlm_data.led_lock);
+                        lh_put(exp->exp_lock_hash, &lock->l_exp_hash);
                         return;
                 }
+
+                lh_put(exp->exp_lock_hash, &lock->l_exp_hash);
         }
-        spin_unlock(&exp->exp_ldlm_data.led_lock);
 
         /*
          * If the xid matches, then we know this is a resent request, and allow
@@ -2899,8 +2911,25 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
         rep->lock_policy_res2 = clear_serious(rc);
 
         lhc->mlh_reg_lh.cookie = 0ull;
-        rc = ELDLM_LOCK_ABORTED;
-        RETURN(rc);
+        if (rc == -ENOTCONN || rc == -ENODEV) {
+                /* 
+                 * If it is the disconnect error (ENODEV & ENOCONN), the error
+                 * will be returned by rq_status, and client at ptlrpc layer
+                 * will detect this, then disconnect, reconnect the import
+                 * immediately, instead of impacting the following the rpc.
+                 */
+                RETURN(rc);
+        } else {
+                /* 
+                 * For other cases, the error will be returned by intent.
+                 * and client will retrieve the result from intent.
+                 */ 
+                 /* 
+                  * FIXME: when open lock is finished, that should be
+                  * checked here.
+                  */
+                RETURN(ELDLM_LOCK_ABORTED); 
+        }
 }
 
 static int mdt_intent_code(long itcode)
@@ -3004,12 +3033,6 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
                 if (it != NULL) {
                         const struct ldlm_request *dlmreq;
                         __u64 req_bits;
-#if 0
-                        struct ldlm_lock       *lock = *lockp;
-
-                        LDLM_DEBUG(lock, "intent policy opc: %s\n",
-                                   ldlm_it2str(it->opc));
-#endif
 
                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
                         if (rc == 0)
@@ -3360,21 +3383,21 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
         procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry;
 
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
-                .psc_req_portal       = MDS_REQUEST_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = MDS_MAXREQSIZE,
+                .psc_max_reply_size  = MDS_MAXREPSIZE,
+                .psc_req_portal      = MDS_REQUEST_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
                 /*
                  * We'd like to have a mechanism to set this on a per-device
                  * basis, but alas...
                  */
-                .psc_min_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
-                                       MDT_MAX_THREADS),
-                .psc_max_threads   = MDT_MAX_THREADS,
-                .psc_ctx_tags      = LCT_MD_THREAD
+                .psc_min_threads    = min(max(mdt_num_threads, MDT_MIN_THREADS),
+                                          MDT_MAX_THREADS),
+                .psc_max_threads     = MDT_MAX_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD
         };
 
         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
@@ -3383,7 +3406,8 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
 
         m->mdt_regular_service =
                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
-                                     procfs_entry, NULL, LUSTRE_MDT_NAME);
+                                     procfs_entry, target_print_req,
+                                     LUSTRE_MDT_NAME);
         if (m->mdt_regular_service == NULL)
                 RETURN(-ENOMEM);
 
@@ -3396,22 +3420,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * ideally.
          */
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
-                .psc_req_portal       = MDS_READPAGE_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
-                                       MDT_MAX_THREADS),
-                .psc_max_threads   = MDT_MAX_THREADS,
-                .psc_ctx_tags      = LCT_MD_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = MDS_MAXREQSIZE,
+                .psc_max_reply_size  = MDS_MAXREPSIZE,
+                .psc_req_portal      = MDS_READPAGE_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads    = min(max(mdt_num_threads, MDT_MIN_THREADS),
+                                          MDT_MAX_THREADS),
+                .psc_max_threads     = MDT_MAX_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD
         };
         m->mdt_readpage_service =
                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
                                      LUSTRE_MDT_NAME "_readpage",
-                                     procfs_entry, NULL, "mdt_rdpg");
+                                     procfs_entry, target_print_req,"mdt_rdpg");
 
         if (m->mdt_readpage_service == NULL) {
                 CERROR("failed to start readpage service\n");
@@ -3424,23 +3448,23 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * setattr service configuration.
          */
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
-                .psc_req_portal       = MDS_SETATTR_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = MDS_MAXREQSIZE,
+                .psc_max_reply_size  = MDS_MAXREPSIZE,
+                .psc_req_portal      = MDS_SETATTR_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
                 .psc_min_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
-                                       MDT_MAX_THREADS),
-                .psc_max_threads   = MDT_MAX_THREADS,
-                .psc_ctx_tags      = LCT_MD_THREAD
+                                         MDT_MAX_THREADS),
+                .psc_max_threads     = MDT_MAX_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD
         };
 
         m->mdt_setattr_service =
                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
                                      LUSTRE_MDT_NAME "_setattr",
-                                     procfs_entry, NULL, "mdt_attr");
+                                     procfs_entry, target_print_req,"mdt_attr");
 
         if (!m->mdt_setattr_service) {
                 CERROR("failed to start setattr service\n");
@@ -3455,22 +3479,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * sequence controller service configuration
          */
         conf = (typeof(conf)) {
-                .psc_nbufs = MDS_NBUFS,
-                .psc_bufsize = MDS_BUFSIZE,
-                .psc_max_req_size = SEQ_MAXREQSIZE,
-                .psc_max_reply_size = SEQ_MAXREPSIZE,
-                .psc_req_portal = SEQ_CONTROLLER_PORTAL,
-                .psc_rep_portal = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads = SEQ_NUM_THREADS,
-                .psc_max_threads = SEQ_NUM_THREADS,
-                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = SEQ_MAXREQSIZE,
+                .psc_max_reply_size  = SEQ_MAXREPSIZE,
+                .psc_req_portal      = SEQ_CONTROLLER_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads     = SEQ_NUM_THREADS,
+                .psc_max_threads     = SEQ_NUM_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD|LCT_DT_THREAD
         };
 
         m->mdt_mdsc_service =
                 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
                                      LUSTRE_MDT_NAME"_mdsc",
-                                     procfs_entry, NULL, "mdt_mdsc");
+                                     procfs_entry, target_print_req,"mdt_mdsc");
         if (!m->mdt_mdsc_service) {
                 CERROR("failed to start seq controller service\n");
                 GOTO(err_mdt_svc, rc = -ENOMEM);
@@ -3484,22 +3508,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * metadata sequence server service configuration
          */
         conf = (typeof(conf)) {
-                .psc_nbufs = MDS_NBUFS,
-                .psc_bufsize = MDS_BUFSIZE,
-                .psc_max_req_size = SEQ_MAXREQSIZE,
-                .psc_max_reply_size = SEQ_MAXREPSIZE,
-                .psc_req_portal = SEQ_METADATA_PORTAL,
-                .psc_rep_portal = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads = SEQ_NUM_THREADS,
-                .psc_max_threads = SEQ_NUM_THREADS,
-                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = SEQ_MAXREQSIZE,
+                .psc_max_reply_size  = SEQ_MAXREPSIZE,
+                .psc_req_portal      = SEQ_METADATA_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads     = SEQ_NUM_THREADS,
+                .psc_max_threads     = SEQ_NUM_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD|LCT_DT_THREAD
         };
 
         m->mdt_mdss_service =
                 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
                                      LUSTRE_MDT_NAME"_mdss",
-                                     procfs_entry, NULL, "mdt_mdss");
+                                     procfs_entry, target_print_req,"mdt_mdss");
         if (!m->mdt_mdss_service) {
                 CERROR("failed to start metadata seq server service\n");
                 GOTO(err_mdt_svc, rc = -ENOMEM);
@@ -3516,22 +3540,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * controller which manages space.
          */
         conf = (typeof(conf)) {
-                .psc_nbufs = MDS_NBUFS,
-                .psc_bufsize = MDS_BUFSIZE,
-                .psc_max_req_size = SEQ_MAXREQSIZE,
-                .psc_max_reply_size = SEQ_MAXREPSIZE,
-                .psc_req_portal = SEQ_DATA_PORTAL,
-                .psc_rep_portal = OSC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads = SEQ_NUM_THREADS,
-                .psc_max_threads = SEQ_NUM_THREADS,
-                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = SEQ_MAXREQSIZE,
+                .psc_max_reply_size  = SEQ_MAXREPSIZE,
+                .psc_req_portal      = SEQ_DATA_PORTAL,
+                .psc_rep_portal      = OSC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads     = SEQ_NUM_THREADS,
+                .psc_max_threads     = SEQ_NUM_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD|LCT_DT_THREAD
         };
 
         m->mdt_dtss_service =
                 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
                                      LUSTRE_MDT_NAME"_dtss",
-                                     procfs_entry, NULL, "mdt_dtss");
+                                     procfs_entry, target_print_req,"mdt_dtss");
         if (!m->mdt_dtss_service) {
                 CERROR("failed to start data seq server service\n");
                 GOTO(err_mdt_svc, rc = -ENOMEM);
@@ -3543,22 +3567,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
 
         /* FLD service start */
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = FLD_MAXREQSIZE,
-                .psc_max_reply_size   = FLD_MAXREPSIZE,
-                .psc_req_portal       = FLD_REQUEST_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads      = FLD_NUM_THREADS,
-                .psc_max_threads      = FLD_NUM_THREADS,
-                .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = FLD_MAXREQSIZE,
+                .psc_max_reply_size  = FLD_MAXREPSIZE,
+                .psc_req_portal      = FLD_REQUEST_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads     = FLD_NUM_THREADS,
+                .psc_max_threads     = FLD_NUM_THREADS,
+                .psc_ctx_tags        = LCT_DT_THREAD|LCT_MD_THREAD
         };
 
         m->mdt_fld_service =
                 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
                                      LUSTRE_MDT_NAME"_fld",
-                                     procfs_entry, NULL, "mdt_fld");
+                                     procfs_entry, target_print_req, "mdt_fld");
         if (!m->mdt_fld_service) {
                 CERROR("failed to start fld service\n");
                 GOTO(err_mdt_svc, rc = -ENOMEM);
@@ -3573,21 +3597,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * mds-mds requests be not blocked during recovery.
          */
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
-                .psc_req_portal       = MDS_MDS_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads      = min(max(mdt_num_threads, MDT_MIN_THREADS),
-                                            MDT_MAX_THREADS),
-                .psc_max_threads      = MDT_MAX_THREADS,
-                .psc_ctx_tags         = LCT_MD_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = MDS_MAXREQSIZE,
+                .psc_max_reply_size  = MDS_MAXREPSIZE,
+                .psc_req_portal      = MDS_MDS_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads    = min(max(mdt_num_threads, MDT_MIN_THREADS),
+                                          MDT_MAX_THREADS),
+                .psc_max_threads     = MDT_MAX_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD
         };
-        m->mdt_xmds_service = ptlrpc_init_svc_conf(&conf, mdt_xmds_handle,
-                                                  LUSTRE_MDT_NAME "_mds",
-                                                  procfs_entry, NULL, "mdt_xmds");
+        m->mdt_xmds_service =
+                ptlrpc_init_svc_conf(&conf, mdt_xmds_handle,
+                                     LUSTRE_MDT_NAME "_mds",
+                                     procfs_entry, target_print_req,"mdt_xmds");
 
         if (m->mdt_xmds_service == NULL) {
                 CERROR("failed to start readpage service\n");
@@ -3968,7 +3993,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
                  LUSTRE_MDT_NAME"-%p", m);
-        m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
+        m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name,
                                               LDLM_NAMESPACE_SERVER,
                                               LDLM_NAMESPACE_GREEDY);
         if (m->mdt_namespace == NULL)
@@ -4018,8 +4043,11 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         mdt_init_capa_ctxt(env, m);
 
+        /* Reduce the initial timeout on an MDS because it doesn't need such
+         * a long timeout as an OST does. Adaptive timeouts will adjust this
+         * value appropriately. */
         if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
-                ldlm_timeout = 6;
+                ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
 
         RETURN(0);
 
@@ -4045,6 +4073,7 @@ err_fini_stack:
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 err_fini_proc:
         mdt_procfs_fini(m);
+        ptlrpc_lprocfs_unregister_obd(obd);
         lprocfs_obd_cleanup(obd);
 err_fini_site:
         lu_site_fini(s);
@@ -4287,7 +4316,7 @@ static int mdt_obd_connect(const struct lu_env *env,
                            void *localdata)
 {
         struct mdt_thread_info *info;
-        struct mdt_client_data *mcd;
+        struct lsd_client_data *lcd;
         struct obd_export      *exp;
         struct mdt_device      *mdt;
         struct ptlrpc_request  *req;
@@ -4331,19 +4360,19 @@ static int mdt_obd_connect(const struct lu_env *env,
 
         rc = mdt_connect_internal(exp, mdt, data);
         if (rc == 0) {
-                OBD_ALLOC_PTR(mcd);
-                if (mcd != NULL) {
+                OBD_ALLOC_PTR(lcd);
+                if (lcd != NULL) {
                         struct mdt_thread_info *mti;
                         mti = lu_context_key_get(&env->le_ctx,
                                                  &mdt_thread_key);
                         LASSERT(mti != NULL);
                         mti->mti_exp = exp;
-                        memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
-                        exp->exp_mdt_data.med_mcd = mcd;
+                        memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
+                        exp->exp_mdt_data.med_lcd = lcd;
                         rc = mdt_client_new(env, mdt);
                         if (rc != 0) {
-                                OBD_FREE_PTR(mcd);
-                                exp->exp_mdt_data.med_mcd = NULL;
+                                OBD_FREE_PTR(lcd);
+                                exp->exp_mdt_data.med_lcd = NULL;
                         }
                 } else
                         rc = -ENOMEM;
@@ -4440,6 +4469,7 @@ static int mdt_obd_disconnect(struct obd_export *exp)
 static int mdt_init_export(struct obd_export *exp)
 {
         struct mdt_export_data *med = &exp->exp_mdt_data;
+        int                     rc;
         ENTRY;
 
         CFS_INIT_LIST_HEAD(&med->med_open_head);
@@ -4449,7 +4479,10 @@ static int mdt_init_export(struct obd_export *exp)
         spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         spin_unlock(&exp->exp_lock);
-        RETURN(0);
+        rc = ldlm_init_export(exp);
+        if (rc)
+                CERROR("Error %d while initializing export\n", rc);
+        RETURN(rc);
 }
 
 static int mdt_destroy_export(struct obd_export *export)
@@ -4470,6 +4503,7 @@ static int mdt_destroy_export(struct obd_export *export)
                 mdt_cleanup_idmap(med);
 
         target_destroy_export(export);
+        ldlm_destroy_export(export);
 
         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
                 RETURN(0);
@@ -4594,7 +4628,7 @@ static int mdt_obd_notify(struct obd_device *host,
 
         switch (ev) {
         case OBD_NOTIFY_CONFIG:
-                mdt_allow_cli(mdt_dev(host->obd_lu_dev), (unsigned int)data);
+                mdt_allow_cli(mdt_dev(host->obd_lu_dev), (unsigned long)data);
                 break;
         default:
                 CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
@@ -4619,7 +4653,7 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         switch (cmd) {
         case OBD_IOC_SYNC:
-                rc = dt->dd_ops->dt_sync(&env, dt);
+                rc = mdt_device_sync(&env, mdt);
                 break;
         case OBD_IOC_SET_READONLY:
                 rc = dt->dd_ops->dt_sync(&env, dt);
@@ -4986,7 +5020,7 @@ static struct mdt_opc_slice mdt_fld_handlers[] = {
         }
 };
 
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
 MODULE_LICENSE("GPL");