Whamcloud - gitweb
use special macro for print time_t, cleanup in includes.
[fs/lustre-release.git] / lustre / mds / handler.c
index 5f59ab3..9c33a98 100644 (file)
 #include <linux/random.h>
 #include <linux/fs.h>
 #include <linux/jbd.h>
-#include <linux/ext3_fs.h>
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
-# include <linux/smp_lock.h>
-# include <linux/buffer_head.h>
-# include <linux/workqueue.h>
-# include <linux/mount.h>
-#else
-# include <linux/locks.h>
-#endif
+#include <linux/smp_lock.h>
+#include <linux/buffer_head.h>
+#include <linux/workqueue.h>
+#include <linux/mount.h>
 
+#include <linux/lustre_acl.h>
 #include <obd_class.h>
 #include <lustre_dlm.h>
 #include <obd_lov.h>
@@ -66,6 +62,10 @@ int mds_num_threads;
 CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
                 "number of MDS service threads to start");
 
+__u32 mds_max_ost_index=0xFFFF;
+CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444,
+                "maximal OST index");
+
 static int mds_intent_policy(struct ldlm_namespace *ns,
                              struct ldlm_lock **lockp, void *req_cookie,
                              ldlm_mode_t mode, int flags, void *data);
@@ -97,7 +97,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
                 tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
 
-                pages[i] = alloc_pages(GFP_KERNEL, 0);
+                OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD);
                 if (pages[i] == NULL)
                         GOTO(cleanup_buf, rc = -ENOMEM);
 
@@ -108,7 +108,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
                 tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
-                       file->f_dentry->d_inode->i_size);
+                       i_size_read(file->f_dentry->d_inode));
 
                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
                                      kmap(pages[i]), tmpsize, &offset);
@@ -142,7 +142,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
         }
 
-        DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
+        DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s",
                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
                   desc->bd_nob_transferred, count,
                   req->rq_export->exp_client_uuid.uuid,
@@ -156,7 +156,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
  cleanup_buf:
         for (i = 0; i < npages; i++)
                 if (pages[i])
-                        __free_pages(pages[i], 0);
+                        OBD_PAGE_FREE(pages[i]);
 
         ptlrpc_free_bulk(desc);
  out_free:
@@ -169,13 +169,13 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
                                      struct lustre_handle *lockh,
-                                     char *name, int namelen, __u64 lockpart)
+                                     __u64 lockpart)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
         struct ldlm_res_id res_id = { .name = {0} };
         int flags = LDLM_FL_ATOMIC_CB, rc;
-        ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; 
+        ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
         ENTRY;
 
         if (IS_ERR(de))
@@ -183,8 +183,8 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
 
         res_id.name[0] = de->d_inode->i_ino;
         res_id.name[1] = de->d_inode->i_generation;
-        rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id, 
-                                    LDLM_IBITS, &policy, lock_mode, &flags, 
+        rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
+                                    LDLM_IBITS, &policy, lock_mode, &flags,
                                     ldlm_blocking_ast, ldlm_completion_ast,
                                     NULL, NULL, 0, NULL, lockh);
         if (rc != ELDLM_OK) {
@@ -254,12 +254,12 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
         RETURN(result);
 }
 
-static int mds_connect_internal(struct obd_export *exp, 
+static int mds_connect_internal(struct obd_export *exp,
                                 struct obd_connect_data *data)
 {
         struct obd_device *obd = exp->exp_obd;
         if (data != NULL) {
-                data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
+                data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
 
                 /* If no known bits (which should not happen, probably,
@@ -289,7 +289,8 @@ static int mds_connect_internal(struct obd_export *exp,
         return 0;
 }
 
-static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
+static int mds_reconnect(const struct lu_env *env,
+                         struct obd_export *exp, struct obd_device *obd,
                          struct obd_uuid *cluuid,
                          struct obd_connect_data *data)
 {
@@ -310,25 +311,20 @@ static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
  * about that client, like open files, the last operation number it did
  * on the server, etc.
  */
-static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
-                       struct obd_uuid *cluuid, struct obd_connect_data *data)
+static int mds_connect(const struct lu_env *env,
+                       struct lustre_handle *conn, struct obd_device *obd,
+                       struct obd_uuid *cluuid, struct obd_connect_data *data,
+                       void *localdata)
 {
         struct obd_export *exp;
         struct mds_export_data *med;
         struct mds_client_data *mcd = NULL;
-        int rc, abort_recovery;
+        int rc;
         ENTRY;
 
         if (!conn || !obd || !cluuid)
                 RETURN(-EINVAL);
 
-        /* Check for aborted recovery. */
-        spin_lock_bh(&obd->obd_processing_task_lock);
-        abort_recovery = obd->obd_abort_recovery;
-        spin_unlock_bh(&obd->obd_processing_task_lock);
-        if (abort_recovery)
-                target_abort_recovery(obd);
-
         /* XXX There is a small race between checking the list and adding a
          * new connection for the same UUID, but the real threat (list
          * corruption when multiple different clients connect) is solved.
@@ -346,6 +342,8 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
         LASSERT(exp);
         med = &exp->exp_mds_data;
 
+        exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
+
         rc = mds_connect_internal(exp, data);
         if (rc)
                 GOTO(out, rc);
@@ -357,7 +355,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
         med->med_mcd = mcd;
 
-        rc = mds_client_add(obd, exp, -1);
+        rc = mds_client_add(obd, exp, -1, localdata);
         GOTO(out, rc);
 
 out:
@@ -378,9 +376,13 @@ int mds_init_export(struct obd_export *exp)
 {
         struct mds_export_data *med = &exp->exp_mds_data;
 
-        INIT_LIST_HEAD(&med->med_open_head);
+        CFS_INIT_LIST_HEAD(&med->med_open_head);
         spin_lock_init(&med->med_open_lock);
+
+        spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
+        spin_unlock(&exp->exp_lock);
+
         RETURN(0);
 }
 
@@ -416,7 +418,7 @@ static int mds_destroy_export(struct obd_export *export)
                 CWARN("%s: allocation failure during cleanup; can not force "
                       "close file handles on this service.\n", obd->obd_name);
                 OBD_FREE(lmm, mds->mds_max_mdsize);
-                GOTO(out, rc = -ENOMEM);
+                GOTO(out_lmm, rc = -ENOMEM);
         }
 
         spin_lock(&med->med_open_lock);
@@ -440,7 +442,7 @@ static int mds_destroy_export(struct obd_export *export)
                        mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name,
                        mfd->mfd_dentry->d_inode->i_ino);
 
-                rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm,&lmm_size,1);
+                rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm, &lmm_size, 1);
                 if (rc < 0)
                         CWARN("mds_get_md failure, rc=%d\n", rc);
                 else
@@ -449,7 +451,6 @@ static int mds_destroy_export(struct obd_export *export)
                 /* child orphan sem protects orphan_dec_test and
                  * is_orphan race, mds_mfd_close drops it */
                 MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode);
-
                 rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd,
                                    !(export->exp_flags & OBD_OPT_FAILOVER),
                                    lmm, lmm_size, logcookies,
@@ -472,16 +473,14 @@ static int mds_destroy_export(struct obd_export *export)
 
                 spin_lock(&med->med_open_lock);
         }
+        spin_unlock(&med->med_open_lock);
 
         OBD_FREE(logcookies, mds->mds_max_cookiesize);
+out_lmm:
         OBD_FREE(lmm, mds->mds_max_mdsize);
-
-        spin_unlock(&med->med_open_lock);
-
+out:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         mds_client_free(export);
-
- out:
         RETURN(rc);
 }
 
@@ -524,12 +523,11 @@ static int mds_getstatus(struct ptlrpc_request *req)
         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         ENTRY;
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
+                RETURN(req->rq_status = -ENOMEM);
         rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
-                CERROR("mds: out of memory for message\n");
-                req->rq_status = -ENOMEM;       /* superfluous? */
-                RETURN(-ENOMEM);
-        }
+        if (rc)
+                RETURN(req->rq_status = rc);
 
         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
@@ -542,7 +540,7 @@ static int mds_getstatus(struct ptlrpc_request *req)
 
 /* get the LOV EA from @inode and store it into @md.  It can be at most
  * @size bytes, and @size is updated with the actual EA size.
- * The EA size is also returned on success, and -ve errno on failure. 
+ * The EA size is also returned on success, and -ve errno on failure.
  * If there is no EA then 0 is returned. */
 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
                int *size, int lock)
@@ -640,11 +638,9 @@ int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg,
         if (!inode->i_op || !inode->i_op->getxattr)
                 GOTO(out, 0);
 
-        lock_24kernel();
         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
                                    lustre_msg_buf(repmsg, repoff, buflen),
                                    buflen);
-        unlock_24kernel();
 
         if (rc >= 0)
                 repbody->aclsize = rc;
@@ -775,8 +771,8 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
         LASSERT(offset == REQ_REC_OFF); /* non-intent */
 
         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
-        LASSERT(body != NULL);                 /* checked by caller */
-        LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
+        LASSERT(body != NULL);                    /* checked by caller */
+        LASSERT(lustre_req_swabbed(req, offset)); /* swabbed by caller */
 
         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
@@ -802,13 +798,14 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
                 }
                 bufcount++;
         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
-                if (inode->i_size + 1 != body->eadatasize)
+                if (i_size_read(inode) + 1 != body->eadatasize)
                         CERROR("symlink size: %Lu, reply space: %d\n",
-                               inode->i_size + 1, body->eadatasize);
-                size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
+                               i_size_read(inode) + 1, body->eadatasize);
+                size[bufcount] = min_t(int, i_size_read(inode) + 1,
+                                       body->eadatasize);
                 bufcount++;
                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
-                       inode->i_size + 1, body->eadatasize);
+                       i_size_read(inode) + 1, body->eadatasize);
         }
 
 #ifdef CONFIG_FS_POSIX_ACL
@@ -818,10 +815,8 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
 
                 size[bufcount] = 0;
                 if (inode->i_op && inode->i_op->getxattr) {
-                        lock_24kernel();
                         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
                                                    NULL, 0);
-                        unlock_24kernel();
 
                         if (rc < 0) {
                                 if (rc != -ENODATA) {
@@ -843,7 +838,6 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
 
         rc = lustre_pack_reply(req, bufcount, size, NULL);
         if (rc) {
-                CERROR("lustre_pack_reply failed: rc %d\n", rc);
                 req->rq_status = rc;
                 RETURN(rc);
         }
@@ -860,7 +854,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
         struct lvfs_run_ctxt saved;
         struct mds_body *body;
         struct dentry *dparent = NULL, *dchild = NULL;
-        struct lvfs_ucred uc = {NULL,};
+        struct lvfs_ucred uc = {0,};
         struct lustre_handle parent_lockh;
         int namesize;
         int rc = 0, cleanup_phase = 0, resent_req = 0;
@@ -877,7 +871,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                 RETURN(-EFAULT);
         }
 
-        LASSERT_REQSWAB(req, offset + 1);
+        lustre_set_req_swabbed(req, offset + 1);
         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
         if (name == NULL) {
                 CERROR("Can't unpack name\n");
@@ -935,10 +929,10 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
 
         if (resent_req == 0) {
                 if (name) {
-                        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
-                        rc = mds_get_parent_child_locked(obd, &obd->u.mds, 
+                        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
+                        rc = mds_get_parent_child_locked(obd, &obd->u.mds,
                                                          &body->fid1,
-                                                         &parent_lockh, 
+                                                         &parent_lockh,
                                                          &dparent, LCK_CR,
                                                          MDS_INODELOCK_UPDATE,
                                                          name, namesize,
@@ -948,11 +942,11 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                         /* For revalidate by fid we always take UPDATE lock */
                         dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
                                                        LCK_CR, child_lockh,
-                                                       NULL, 0, child_part);
+                                                       child_part);
                         LASSERT(dchild);
                         if (IS_ERR(dchild))
                                 rc = PTR_ERR(dchild);
-                } 
+                }
                 if (rc)
                         GOTO(cleanup, rc);
         } else {
@@ -1013,8 +1007,10 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
         default:
                 mds_exit_ucred(&uc, mds);
                 if (req->rq_reply_state == NULL) {
+                        int rc2 = lustre_pack_reply(req, 1, NULL, NULL);
+                        if (rc == 0)
+                                rc = rc2;
                         req->rq_status = rc;
-                        lustre_pack_reply(req, 1, NULL, NULL);
                 }
         }
         return rc;
@@ -1027,7 +1023,7 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
         struct lvfs_run_ctxt saved;
         struct dentry *de;
         struct mds_body *body;
-        struct lvfs_ucred uc = { NULL, };
+        struct lvfs_ucred uc = {0,};
         int rc = 0;
         ENTRY;
 
@@ -1055,7 +1051,8 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
                 GOTO(out_pop, rc);
         }
 
-        req->rq_status = mds_getattr_internal(obd, de, req, body,REPLY_REC_OFF);
+        req->rq_status = mds_getattr_internal(obd, de, req, body,
+                                              REPLY_REC_OFF);
 
         l_dput(de);
         GOTO(out_pop, rc);
@@ -1063,15 +1060,17 @@ out_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
 out_ucred:
         if (req->rq_reply_state == NULL) {
+                int rc2 = lustre_pack_reply(req, 1, NULL, NULL);
+                if (rc == 0)
+                        rc = rc2;
                 req->rq_status = rc;
-                lustre_pack_reply(req, 1, NULL, NULL);
         }
         mds_exit_ucred(&uc, mds);
         return rc;
 }
 
 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
-                          __u64 max_age)
+                          __u64 max_age, __u32 flags)
 {
         int rc;
 
@@ -1096,16 +1095,16 @@ static int mds_statfs(struct ptlrpc_request *req)
                          (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
         OBD_COUNTER_INCREMENT(obd, statfs);
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
+                GOTO(out, rc = -ENOMEM);
         rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
-                CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
+        if (rc)
                 GOTO(out, rc);
-        }
 
         /* We call this so that we can cache a bit - 1 jiffie worth */
         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
                                                 size[REPLY_REC_OFF]),
-                            cfs_time_current_64() - HZ);
+                            cfs_time_current_64() - HZ, 0);
         if (rc) {
                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
                 GOTO(out, rc);
@@ -1130,36 +1129,28 @@ static int mds_sync(struct ptlrpc_request *req, int offset)
         if (body == NULL)
                 GOTO(out, rc = -EFAULT);
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
+                GOTO(out, rc = -ENOMEM);
         rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
-                CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
+        if (rc)
                 GOTO(out, rc);
-        }
 
-        if (body->fid1.id == 0) {
-                /* a fid of zero is taken to mean "sync whole filesystem" */
-                rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
-                GOTO(out, rc);
-        } else {
+        rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
+        if (rc == 0 && body->fid1.id != 0) {
                 struct dentry *de;
 
                 de = mds_fid2dentry(mds, &body->fid1, NULL);
                 if (IS_ERR(de))
                         GOTO(out, rc = PTR_ERR(de));
 
-                /* The file parameter isn't used for anything */
-                if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
-                        rc = de->d_inode->i_fop->fsync(NULL, de, 1);
-                if (rc == 0) {
-                        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                              sizeof(*body));
-                        mds_pack_inode2fid(&body->fid1, de->d_inode);
-                        mds_pack_inode2body(body, de->d_inode);
-                }
+                body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
+                                      sizeof(*body));
+                mds_pack_inode2fid(&body->fid1, de->d_inode);
+                mds_pack_inode2body(body, de->d_inode);
 
                 l_dput(de);
-                GOTO(out, rc);
         }
+        GOTO(out, rc);
 out:
         req->rq_status = rc;
         return 0;
@@ -1180,17 +1171,14 @@ static int mds_readpage(struct ptlrpc_request *req, int offset)
         struct mds_body *body, *repbody;
         struct lvfs_run_ctxt saved;
         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
-        struct lvfs_ucred uc = {NULL,};
+        struct lvfs_ucred uc = {0,};
         ENTRY;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
                 RETURN(-ENOMEM);
-
         rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc) {
-                CERROR("error packing readpage reply: rc %d\n", rc);
+        if (rc)
                 GOTO(out, rc);
-        }
 
         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
                                   lustre_swab_mds_body);
@@ -1214,22 +1202,22 @@ static int mds_readpage(struct ptlrpc_request *req, int offset)
                 GOTO(out_pop, rc = PTR_ERR(file));
 
         /* body->size is actually the offset -eeb */
-        if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
+        if ((body->size & (de->d_inode->i_sb->s_blocksize - 1)) != 0) {
                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
-                       body->size, de->d_inode->i_blksize);
+                       body->size, de->d_inode->i_sb->s_blocksize);
                 GOTO(out_file, rc = -EFAULT);
         }
 
         /* body->nlink is actually the #bytes to read -eeb */
-        if (body->nlink & (de->d_inode->i_blksize - 1)) {
+        if (body->nlink & (de->d_inode->i_sb->s_blocksize - 1)) {
                 CERROR("size %u is not multiple of blocksize %lu\n",
-                       body->nlink, de->d_inode->i_blksize);
+                       body->nlink, de->d_inode->i_sb->s_blocksize);
                 GOTO(out_file, rc = -EFAULT);
         }
 
         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
                                  sizeof(*repbody));
-        repbody->size = file->f_dentry->d_inode->i_size;
+        repbody->size = i_size_read(file->f_dentry->d_inode);
         repbody->valid = OBD_MD_FLSIZE;
 
         /* to make this asynchronous make sure that the handling function
@@ -1271,8 +1259,8 @@ int mds_reint(struct ptlrpc_request *req, int offset,
         return rc;
 }
 
-static int mds_filter_recovery_request(struct ptlrpc_request *req,
-                                       struct obd_device *obd, int *process)
+int mds_filter_recovery_request(struct ptlrpc_request *req,
+                                struct obd_device *obd, int *process)
 {
         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
         case MDS_CONNECT: /* This will never get here, but for completeness. */
@@ -1283,21 +1271,23 @@ static int mds_filter_recovery_request(struct ptlrpc_request *req,
                RETURN(0);
 
         case MDS_CLOSE:
+        case MDS_DONE_WRITING:
         case MDS_SYNC: /* used in unmounting */
         case OBD_PING:
         case MDS_REINT:
+        case SEQ_QUERY:
+        case FLD_QUERY:
         case LDLM_ENQUEUE:
                 *process = target_queue_recovery_request(req, obd);
                 RETURN(0);
 
         default:
                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
-                *process = 0;
-                /* XXX what should we set rq_status to here? */
-                req->rq_status = -EAGAIN;
-                RETURN(ptlrpc_error(req));
+                *process = -EAGAIN;
+                RETURN(0);
         }
 }
+EXPORT_SYMBOL(mds_filter_recovery_request);
 
 static char *reint_names[] = {
         [REINT_SETATTR] "setattr",
@@ -1310,9 +1300,8 @@ static char *reint_names[] = {
 
 static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
 {
-        char *key;
-        __u32 *val;
-        int keylen, rc = 0;
+        void *key, *val;
+        int keylen, vallen, rc = 0;
         ENTRY;
 
         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
@@ -1322,25 +1311,28 @@ static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
         }
         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
 
-        val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*val));
-        if (val == NULL) {
-                DEBUG_REQ(D_HA, req, "no set_info val");
-                RETURN(-EFAULT);
-        }
+        val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
+        vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
 
         rc = lustre_pack_reply(req, 1, NULL, NULL);
         if (rc)
                 RETURN(rc);
+
         lustre_msg_set_status(req->rq_repmsg, 0);
 
-        if (keylen < strlen("read-only") ||
-            memcmp(key, "read-only", keylen) != 0)
-                RETURN(-EINVAL);
+        if (KEY_IS("read-only")) {
+                if (val == NULL || vallen < sizeof(__u32)) {
+                        DEBUG_REQ(D_HA, req, "no set_info val");
+                        RETURN(-EFAULT);
+                }
 
-        if (*val)
-                exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
-        else
-                exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
+                if (*(__u32 *)val)
+                        exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
+                else
+                        exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
+        } else {
+                RETURN(-EINVAL);
+        }
 
         RETURN(0);
 }
@@ -1357,10 +1349,8 @@ static int mds_handle_quotacheck(struct ptlrpc_request *req)
                 RETURN(-EPROTO);
 
         rc = lustre_pack_reply(req, 1, NULL, NULL);
-        if (rc) {
-                CERROR("mds: out of memory while packing quotacheck reply\n");
+        if (rc)
                 RETURN(rc);
-        }
 
         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
         RETURN(0);
@@ -1388,7 +1378,7 @@ static int mds_handle_quotactl(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int mds_msg_check_version(struct lustre_msg *msg)
+int mds_msg_check_version(struct lustre_msg *msg)
 {
         int rc;
 
@@ -1396,6 +1386,9 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_CONNECT:
         case MDS_DISCONNECT:
         case OBD_PING:
+        case SEC_CTX_INIT:
+        case SEC_CTX_INIT_CONT:
+        case SEC_CTX_FINI:
                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
@@ -1408,6 +1401,8 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_GETATTR_NAME:
         case MDS_STATFS:
         case MDS_READPAGE:
+        case MDS_WRITEPAGE:
+        case MDS_IS_SUBDIR:
         case MDS_REINT:
         case MDS_CLOSE:
         case MDS_DONE_WRITING:
@@ -1420,6 +1415,8 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_QUOTACTL:
         case QUOTA_DQACQ:
         case QUOTA_DQREL:
+        case SEQ_QUERY:
+        case FLD_QUERY:
                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
@@ -1459,16 +1456,18 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         }
         return rc;
 }
+EXPORT_SYMBOL(mds_msg_check_version);
 
 int mds_handle(struct ptlrpc_request *req)
 {
         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
-        int rc = 0;
+        int rc;
         struct mds_obd *mds = NULL; /* quell gcc overwarning */
         struct obd_device *obd = NULL;
         ENTRY;
 
-        OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
+        if (OBD_FAIL_CHECK_ORSET(OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_ONCE))
+                RETURN(0);
 
         LASSERT(current->journal_info == NULL);
 
@@ -1481,7 +1480,7 @@ int mds_handle(struct ptlrpc_request *req)
         /* XXX identical to OST */
         if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) {
                 struct mds_export_data *med;
-                int recovering, abort_recovery;
+                int recovering;
 
                 if (req->rq_export == NULL) {
                         CERROR("operation %d on unconnected MDS from %s\n",
@@ -1493,7 +1492,7 @@ int mds_handle(struct ptlrpc_request *req)
 
                 med = &req->rq_export->exp_mds_data;
                 obd = req->rq_export->exp_obd;
-                mds = &obd->u.mds;
+                mds = mds_req2mds(req);
 
                 /* sanity check: if the xid matches, the request must
                  * be marked as a resent or replayed */
@@ -1516,26 +1515,35 @@ int mds_handle(struct ptlrpc_request *req)
 
                 /* Check for aborted recovery. */
                 spin_lock_bh(&obd->obd_processing_task_lock);
-                abort_recovery = obd->obd_abort_recovery;
                 recovering = obd->obd_recovering;
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-                if (abort_recovery) {
-                        target_abort_recovery(obd);
-                } else if (recovering) {
+                if (recovering) {
                         rc = mds_filter_recovery_request(req, obd,
                                                          &should_process);
                         if (rc || !should_process)
                                 RETURN(rc);
+                        else if (should_process < 0) {
+                                req->rq_status = should_process;
+                                rc = ptlrpc_error(req);
+                                RETURN(rc);
+                        }
                 }
         }
 
         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
         case MDS_CONNECT:
                 DEBUG_REQ(D_INODE, req, "connect");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
-                rc = target_handle_connect(req, mds_handle);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_NET))
+                        RETURN(0);
+                rc = target_handle_connect(req);
                 if (!rc) {
                         /* Now that we have an export, set mds. */
+                        /*
+                         * XXX nikita: these assignments are useless: mds is
+                         * never used below, and obd is only used for
+                         * MSG_LAST_REPLAY case, which never happens for
+                         * MDS_CONNECT.
+                         */
                         obd = req->rq_export->exp_obd;
                         mds = mds_req2mds(req);
                 }
@@ -1543,39 +1551,45 @@ int mds_handle(struct ptlrpc_request *req)
 
         case MDS_DISCONNECT:
                 DEBUG_REQ(D_INODE, req, "disconnect");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DISCONNECT_NET))
+                        RETURN(0);
                 rc = target_handle_disconnect(req);
                 req->rq_status = rc;            /* superfluous? */
                 break;
 
         case MDS_GETSTATUS:
                 DEBUG_REQ(D_INODE, req, "getstatus");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_NET))
+                        RETURN(0);
                 rc = mds_getstatus(req);
                 break;
 
         case MDS_GETATTR:
                 DEBUG_REQ(D_INODE, req, "getattr");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NET))
+                        RETURN(0);
                 rc = mds_getattr(req, REQ_REC_OFF);
                 break;
 
         case MDS_SETXATTR:
                 DEBUG_REQ(D_INODE, req, "setxattr");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_SETXATTR_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR_NET))
+                        RETURN(0);
                 rc = mds_setxattr(req);
                 break;
 
         case MDS_GETXATTR:
                 DEBUG_REQ(D_INODE, req, "getxattr");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETXATTR_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_NET))
+                        RETURN(0);
                 rc = mds_getxattr(req);
                 break;
 
         case MDS_GETATTR_NAME: {
                 struct lustre_handle lockh = { 0 };
                 DEBUG_REQ(D_INODE, req, "getattr_name");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NAME_NET))
+                        RETURN(0);
 
                 /* If this request gets a reconstructed reply, we won't be
                  * acquiring any new locks in mds_getattr_lock, so we don't
@@ -1591,16 +1605,18 @@ int mds_handle(struct ptlrpc_request *req)
         }
         case MDS_STATFS:
                 DEBUG_REQ(D_INODE, req, "statfs");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_NET))
+                        RETURN(0);
                 rc = mds_statfs(req);
                 break;
 
         case MDS_READPAGE:
                 DEBUG_REQ(D_INODE, req, "readpage");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_NET))
+                        RETURN(0);
                 rc = mds_readpage(req, REQ_REC_OFF);
 
-                if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
                         RETURN(0);
                 }
 
@@ -1610,6 +1626,7 @@ int mds_handle(struct ptlrpc_request *req)
                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
                                              sizeof(*opcp));
                 __u32  opc;
+                int op = 0;
                 int size[4] = { sizeof(struct ptlrpc_body),
                                 sizeof(struct mds_body),
                                 mds->mds_max_mdsize,
@@ -1630,8 +1647,36 @@ int mds_handle(struct ptlrpc_request *req)
                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
                            reint_names[opc] == NULL) ? reint_names[opc] :
                                                        "unknown opcode");
+                switch (opc) {
+                case REINT_CREATE:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_CREATE;
+                        break;
+                case REINT_LINK:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_LINK;
+                        break;
+                case REINT_OPEN:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_OPEN;
+                        break;
+                case REINT_SETATTR:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_SETATTR;
+                        break;
+                case REINT_RENAME:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_RENAME;
+                        break;
+                case REINT_UNLINK:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_UNLINK;
+                        break;
+                default:
+                        op = 0;
+                        break;
+                }
 
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
+                if (op && req->rq_rqbd->rqbd_service->srv_stats)
+                        lprocfs_counter_incr(
+                                req->rq_rqbd->rqbd_service->srv_stats, op);
+
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_NET))
+                        RETURN(0);
 
                 if (opc == REINT_UNLINK || opc == REINT_RENAME)
                         bufcount = 4;
@@ -1651,25 +1696,30 @@ int mds_handle(struct ptlrpc_request *req)
 
         case MDS_CLOSE:
                 DEBUG_REQ(D_INODE, req, "close");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_NET))
+                        RETURN(0);
                 rc = mds_close(req, REQ_REC_OFF);
+                fail = OBD_FAIL_MDS_CLOSE_NET_REP;
                 break;
 
         case MDS_DONE_WRITING:
                 DEBUG_REQ(D_INODE, req, "done_writing");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DONE_WRITING_NET))
+                        RETURN(0);
                 rc = mds_done_writing(req, REQ_REC_OFF);
                 break;
 
         case MDS_PIN:
                 DEBUG_REQ(D_INODE, req, "pin");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_PIN_NET))
+                        RETURN(0);
                 rc = mds_pin(req, REQ_REC_OFF);
                 break;
 
         case MDS_SYNC:
                 DEBUG_REQ(D_INODE, req, "sync");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_NET))
+                        RETURN(0);
                 rc = mds_sync(req, REQ_REC_OFF);
                 break;
 
@@ -1680,13 +1730,15 @@ int mds_handle(struct ptlrpc_request *req)
 
         case MDS_QUOTACHECK:
                 DEBUG_REQ(D_INODE, req, "quotacheck");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACHECK_NET))
+                        RETURN(0);
                 rc = mds_handle_quotacheck(req);
                 break;
 
         case MDS_QUOTACTL:
                 DEBUG_REQ(D_INODE, req, "quotactl");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACTL_NET))
+                        RETURN(0);
                 rc = mds_handle_quotactl(req);
                 break;
 
@@ -1697,20 +1749,23 @@ int mds_handle(struct ptlrpc_request *req)
 
         case OBD_LOG_CANCEL:
                 CDEBUG(D_INODE, "log cancel\n");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
+                        RETURN(0);
                 rc = -ENOTSUPP; /* la la la */
                 break;
 
         case LDLM_ENQUEUE:
                 DEBUG_REQ(D_INODE, req, "enqueue");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
+                        RETURN(0);
                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
                                          ldlm_server_blocking_ast, NULL);
                 fail = OBD_FAIL_LDLM_REPLY;
                 break;
         case LDLM_CONVERT:
                 DEBUG_REQ(D_INODE, req, "convert");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
+                        RETURN(0);
                 rc = ldlm_handle_convert(req);
                 break;
         case LDLM_BL_CALLBACK:
@@ -1718,41 +1773,49 @@ int mds_handle(struct ptlrpc_request *req)
                 DEBUG_REQ(D_INODE, req, "callback");
                 CERROR("callbacks should not happen on MDS\n");
                 LBUG();
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK))
+                        RETURN(0);
                 break;
         case LLOG_ORIGIN_HANDLE_CREATE:
                 DEBUG_REQ(D_INODE, req, "llog_init");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_create(req);
                 break;
         case LLOG_ORIGIN_HANDLE_DESTROY:
                 DEBUG_REQ(D_INODE, req, "llog_init");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_destroy(req);
                 break;
         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
                 DEBUG_REQ(D_INODE, req, "llog next block");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_next_block(req);
                 break;
         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
                 DEBUG_REQ(D_INODE, req, "llog prev block");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_prev_block(req);
                 break;
         case LLOG_ORIGIN_HANDLE_READ_HEADER:
                 DEBUG_REQ(D_INODE, req, "llog read header");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_read_header(req);
                 break;
         case LLOG_ORIGIN_HANDLE_CLOSE:
                 DEBUG_REQ(D_INODE, req, "llog close");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_close(req);
                 break;
         case LLOG_CATINFO:
                 DEBUG_REQ(D_INODE, req, "llog catinfo");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_catinfo(req);
                 break;
         default:
@@ -1766,7 +1829,7 @@ int mds_handle(struct ptlrpc_request *req)
         /* If we're DISCONNECTing, the mds_export_data is already freed */
         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) {
                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
-                
+
                 /* I don't think last_xid is used for anyway, so I'm not sure
                    if we need to care about last_close_xid here.*/
                 lustre_msg_set_last_xid(req->rq_repmsg,
@@ -1778,15 +1841,6 @@ int mds_handle(struct ptlrpc_request *req)
         EXIT;
  out:
 
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (obd && obd->obd_recovering) {
-                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
-                        return target_queue_final_reply(req, rc);
-                }
-                /* Lost a race with recovery; let the error path DTRT. */
-                rc = req->rq_status = -ENOTCONN;
-        }
-
         target_send_reply(req, rc, fail);
         return 0;
 }
@@ -1820,7 +1874,6 @@ int mds_update_server_data(struct obd_device *obd, int force_sync)
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc)
                 CERROR("error writing MDS server data: rc = %d\n", rc);
-
         RETURN(rc);
 }
 
@@ -1865,6 +1918,26 @@ static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
                 options = ++p;
         }
 }
+static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
+{
+        int rc = 0;
+        ENTRY;
+
+        if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
+                class_uuid_t uuid;
+
+                ll_generate_random_uuid(uuid);
+                class_uuid_unparse(uuid, &mds->mds_lov_uuid);
+
+                OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
+                if (mds->mds_profile == NULL)
+                        RETURN(-ENOMEM);
+
+                strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
+                        LUSTRE_CFG_BUFLEN(lcfg, 3));
+        }
+        RETURN(rc);
+}
 
 /* mount the file system (secretly).  lustre_cfg parameters are:
  * 1 = device
@@ -1872,18 +1945,17 @@ static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
  * 3 = config name
  * 4 = mount options
  */
-static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
+static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
 {
         struct lprocfs_static_vars lvars;
-        struct lustre_cfg* lcfg = buf;
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_mount_info *lmi;
         struct vfsmount *mnt;
+        struct lustre_sb_info *lsi;
         struct obd_uuid uuid;
         __u8 *uuid_ptr;
-        char *options, *str, *label;
+        char *str, *label;
         char ns_name[48];
-        unsigned long page;
         int rc = 0;
         ENTRY;
 
@@ -1893,55 +1965,25 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
                  offsetof(struct obd_device, u.mds.mds_obt));
 
         if (lcfg->lcfg_bufcount < 3)
-                RETURN(rc = -EINVAL);
+                RETURN(-EINVAL);
 
         if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)
-                RETURN(rc = -EINVAL);
+                RETURN(-EINVAL);
 
         lmi = server_get_mount(obd->obd_name);
-        if (lmi) {
-                /* We already mounted in lustre_fill_super.
-                   lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
-                struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
-                fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts);
-                fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts);
-                mnt = lmi->lmi_mnt;
-                obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
-        } else {
-                /* old path - used by lctl */
-                CERROR("Using old MDS mount method\n");
-                page = __get_free_page(GFP_KERNEL);
-                if (!page)
-                        RETURN(-ENOMEM);
-
-                options = (char *)page;
-                memset(options, 0, CFS_PAGE_SIZE);
+        if (!lmi) {
+                CERROR("Not mounted in lustre_fill_super?\n");
+                RETURN(-EINVAL);
+        }
 
-                /* here we use "iopen_nopriv" hardcoded, because it affects
-                 * MDS utility and the rest of options are passed by mount
-                 * options. Probably this should be moved to somewhere else
-                 * like startup scripts or lconf. */
-                strcpy(options, "iopen_nopriv");
+        /* We mounted in lustre_fill_super.
+           lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
 
-                if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0 && lustre_cfg_buf(lcfg, 4)) {
-                        sprintf(options + strlen(options), ",%s",
-                                lustre_cfg_string(lcfg, 4));
-                        fsoptions_to_mds_flags(mds, options);
-                }
-                
-                mnt = do_kern_mount(lustre_cfg_string(lcfg, 2), 0,
-                                    lustre_cfg_string(lcfg, 1), 
-                                    (void *)options);
-                free_page(page);
-                if (IS_ERR(mnt)) {
-                        rc = PTR_ERR(mnt);
-                        LCONSOLE_ERROR("Can't mount disk %s (%d)\n",
-                                       lustre_cfg_string(lcfg, 1), rc);
-                        RETURN(rc);
-                }
-
-                obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
-        }
+        lsi = s2lsi(lmi->lmi_sb);
+        fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts);
+        fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts);
+        mnt = lmi->lmi_mnt;
+        obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
         if (IS_ERR(obd->obd_fsops))
                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
 
@@ -1954,15 +1996,33 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
         mds->mds_atime_diff = MAX_ATIME_DIFF;
+        mds->mds_evict_ost_nids = 1;
 
         sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
-        obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+        obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER,
+                                                LDLM_NAMESPACE_GREEDY);
         if (obd->obd_namespace == NULL) {
                 mds_cleanup(obd);
                 GOTO(err_ops, rc = -ENOMEM);
         }
         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
 
+        lprocfs_mds_init_vars(&lvars);
+        if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
+            lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
+                /* Init private stats here */
+                mds_stats_counter_init(obd->obd_stats);
+                obd->obd_proc_exports_entry = lprocfs_register("exports",
+                                                         obd->obd_proc_entry,
+                                                         NULL, NULL);
+                if (IS_ERR(obd->obd_proc_exports_entry)) {
+                        rc = PTR_ERR(obd->obd_proc_exports_entry);
+                        CERROR("error %d setting up lprocfs for %s\n",
+                               rc, "exports");
+                        obd->obd_proc_exports_entry = NULL;
+                }
+        }
+
         rc = mds_fs_setup(obd, mnt);
         if (rc) {
                 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
@@ -1970,24 +2030,15 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
                 GOTO(err_ns, rc);
         }
 
-        rc = llog_start_commit_thread();
+        if (obd->obd_proc_exports_entry)
+                lprocfs_add_simple(obd->obd_proc_exports_entry,
+                                   "clear", lprocfs_nid_stats_clear_read,
+                                   lprocfs_nid_stats_clear_write, obd);
+
+        rc = mds_lov_presetup(mds, lcfg);
         if (rc < 0)
                 GOTO(err_fs, rc);
 
-        if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
-                class_uuid_t uuid;
-
-                generate_random_uuid(uuid);
-                class_uuid_unparse(uuid, &mds->mds_lov_uuid);
-
-                OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
-                if (mds->mds_profile == NULL)
-                        GOTO(err_fs, rc = -ENOMEM);
-
-                strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
-                        LUSTRE_CFG_BUFLEN(lcfg, 3));
-        }
-
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "mds_ldlm_client", &obd->obd_ldlm_client);
         obd->obd_replayable = 1;
@@ -1996,12 +2047,14 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         if (rc)
                 GOTO(err_fs, rc);
 
+#if 0
         mds->mds_group_hash = upcall_cache_init(obd->obd_name);
         if (IS_ERR(mds->mds_group_hash)) {
                 rc = PTR_ERR(mds->mds_group_hash);
                 mds->mds_group_hash = NULL;
                 GOTO(err_qctxt, rc);
         }
+#endif
 
         /* Don't wait for mds_postrecov trying to clear orphans */
         obd->obd_async_recov = 1;
@@ -2013,15 +2066,6 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         if (rc)
                 GOTO(err_qctxt, rc);
 
-        lprocfs_init_vars(mds, &lvars);
-        if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
-            lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
-                /* Init private stats here */
-                mds_stats_counter_init(obd->obd_stats);
-                obd->obd_proc_exports = proc_mkdir("exports",
-                                                   obd->obd_proc_entry);
-        }
-
         uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
         if (uuid_ptr != NULL) {
                 class_uuid_unparse(uuid_ptr, &uuid);
@@ -2040,8 +2084,8 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
                               "/proc/fs/lustre/mds/%s/recovery_status.\n",
                               obd->obd_name, lustre_cfg_string(lcfg, 1),
                               label ?: "", label ? "/" : "", str,
-                              obd->obd_recoverable_clients,
-                              (obd->obd_recoverable_clients == 1) ?
+                              obd->obd_max_recoverable_clients,
+                              (obd->obd_max_recoverable_clients == 1) ?
                               "client" : "clients",
                               (int)(OBD_RECOVERY_TIMEOUT) / 60,
                               (int)(OBD_RECOVERY_TIMEOUT) % 60,
@@ -2053,7 +2097,8 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
                               obd->obd_replayable ? "enabled" : "disabled");
         }
 
-        ldlm_timeout = 6;
+        if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
+                ldlm_timeout = 6;
 
         RETURN(0);
 
@@ -2062,9 +2107,13 @@ err_qctxt:
 err_fs:
         /* No extra cleanup needed for llog_init_commit_thread() */
         mds_fs_cleanup(obd);
+#if 0
         upcall_cache_cleanup(mds->mds_group_hash);
         mds->mds_group_hash = NULL;
+#endif
 err_ns:
+        lprocfs_free_obd_stats(obd);
+        lprocfs_obd_cleanup(obd);
         ldlm_namespace_free(obd->obd_namespace, 0);
         obd->obd_namespace = NULL;
 err_ops:
@@ -2113,29 +2162,29 @@ static int mds_postsetup(struct obd_device *obd)
         int rc = 0;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
         if (mds->mds_profile) {
                 struct lustre_profile *lprof;
-                /* The profile defines which osc and mdc to connect to, for a 
+                /* The profile defines which osc and mdc to connect to, for a
                    client.  We reuse that here to figure out the name of the
-                   lov to use (and ignore lprof->lp_mdc).
-                   The profile was set in the config log with 
+                   lov to use (and ignore lprof->lp_md).
+                   The profile was set in the config log with
                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
                 lprof = class_get_profile(mds->mds_profile);
                 if (lprof == NULL) {
                         CERROR("No profile found: %s\n", mds->mds_profile);
                         GOTO(err_cleanup, rc = -ENOENT);
                 }
-                rc = mds_lov_connect(obd, lprof->lp_osc);
+                rc = mds_lov_connect(obd, lprof->lp_dt);
                 if (rc)
                         GOTO(err_cleanup, rc);
         }
@@ -2151,34 +2200,26 @@ err_cleanup:
 
 int mds_postrecov(struct obd_device *obd)
 {
-        int rc;
+        int rc = 0;
         ENTRY;
 
         if (obd->obd_fail)
                 RETURN(0);
 
         LASSERT(!obd->obd_recovering);
-        LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
-
-        /* FIXME why not put this in the synchronize? */
-        /* set nextid first, so we are sure it happens */
-        rc = mds_lov_set_nextid(obd);
-        if (rc) {
-                CERROR("%s: mds_lov_set_nextid failed %d\n",
-                       obd->obd_name, rc);
-                GOTO(out, rc);
-        }
+        LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
 
         /* clean PENDING dir */
-        rc = mds_cleanup_pending(obd);
-        if (rc < 0)
-                GOTO(out, rc);
+        if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+                rc = mds_cleanup_pending(obd);
+                if (rc < 0)
+                        GOTO(out, rc);
 
         /* FIXME Does target_finish_recovery really need this to block? */
         /* Notify the LOV, which will in turn call mds_notify for each tgt */
         /* This means that we have to hack obd_notify to think we're obd_set_up
            during mds_lov_connect. */
-        obd_notify(obd->u.mds.mds_osc_obd, NULL, 
+        obd_notify(obd->u.mds.mds_osc_obd, NULL,
                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
                    OBD_NOTIFY_SYNC, NULL);
 
@@ -2211,7 +2252,11 @@ static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
         case OBD_CLEANUP_EARLY:
                 break;
         case OBD_CLEANUP_EXPORTS:
-                target_cleanup_recovery(obd);
+                /*XXX Use this for mdd mds cleanup, so comment out
+                 *this target_cleanup_recovery for this tmp MDD MDS
+                 *Wangdi*/
+                if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+                        target_cleanup_recovery(obd);
                 mds_lov_early_clean(obd);
                 break;
         case OBD_CLEANUP_SELF_EXP:
@@ -2242,18 +2287,23 @@ static int mds_cleanup(struct obd_device *obd)
                    we just need to drop our ref */
                 class_export_put(mds->mds_osc_exp);
 
-        lprocfs_obd_cleanup(obd);
+        lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
+        lprocfs_free_per_client_stats(obd);
         lprocfs_free_obd_stats(obd);
+        lprocfs_obd_cleanup(obd);
 
         lquota_cleanup(mds_quota_interface_ref, obd);
 
         mds_update_server_data(obd, 1);
-        if (mds->mds_lov_objids != NULL) 
-                OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
+        /* XXX
+        mds_lov_destroy_objids(obd);
+        */
         mds_fs_cleanup(obd);
 
+#if 0
         upcall_cache_cleanup(mds->mds_group_hash);
         mds->mds_group_hash = NULL;
+#endif
 
         server_put_mount(obd->obd_name, mds->mds_vfsmnt);
         obd->u.obt.obt_sb = NULL;
@@ -2282,7 +2332,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
         struct obd_export *exp = req->rq_export;
         struct ldlm_request *dlmreq =
                 lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq));
-        struct lustre_handle remote_hdl = dlmreq->lock_handle1;
+        struct lustre_handle remote_hdl = dlmreq->lock_handle[0];
         struct list_head *iter;
 
         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
@@ -2297,7 +2347,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
                         lockh->cookie = lock->l_handle.h_cookie;
                         LDLM_DEBUG(lock, "restoring lock cookie");
-                        DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
+                        DEBUG_REQ(D_DLMTRACE, req,"restoring lock cookie "LPX64,
                                   lockh->cookie);
                         if (old_lock)
                                 *old_lock = LDLM_LOCK_GET(lock);
@@ -2324,7 +2374,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
 
         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
 
-        DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
+        DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
                   remote_hdl.cookie);
 }
 
@@ -2366,7 +2416,8 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) {
                 /* No intent was provided */
                 rc = lustre_pack_reply(req, 2, repsize, NULL);
-                LASSERT(rc == 0);
+                if (rc)
+                        RETURN(rc);
                 RETURN(0);
         }
 
@@ -2520,7 +2571,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         RETURN(ELDLM_LOCK_REPLACED);
 }
 
-static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
+static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lprocfs_static_vars lvars;
@@ -2529,7 +2580,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
         int rc = 0;
         ENTRY;
 
-        lprocfs_init_vars(mdt, &lvars);
+        lprocfs_mdt_init_vars(&lvars);
         lprocfs_obd_setup(obd, lvars.obd_vars);
 
         sema_init(&mds->mds_health_sem, 1);
@@ -2543,12 +2594,12 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
                 mds_max_threads = mds_min_threads = mds_num_threads;
         } else {
                 /* Base min threads on memory and cpus */
-                mds_min_threads = smp_num_cpus * num_physpages >> 
+                mds_min_threads = num_possible_cpus() * num_physpages >>
                         (27 - CFS_PAGE_SHIFT);
                 if (mds_min_threads < MDS_THREADS_MIN)
                         mds_min_threads = MDS_THREADS_MIN;
                 /* Largest auto threads start value */
-                if (mds_min_threads > 32) 
+                if (mds_min_threads > 32)
                         mds_min_threads = 32;
                 mds_max_threads = min(MDS_THREADS_MAX, mds_min_threads * 4);
         }
@@ -2558,8 +2609,8 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
                                 MDS_MAXREPSIZE, MDS_REQUEST_PORTAL,
                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
                                 mds_handle, LUSTRE_MDS_NAME,
-                                obd->obd_proc_entry, NULL, 
-                                mds_min_threads, mds_max_threads, "ll_mdt");
+                                obd->obd_proc_entry, NULL,
+                                mds_min_threads, mds_max_threads, "ll_mdt", 0);
 
         if (!mds->mds_service) {
                 CERROR("failed to start service\n");
@@ -2577,7 +2628,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
                                 mds_handle, "mds_setattr",
                                 obd->obd_proc_entry, NULL,
                                 mds_min_threads, mds_max_threads,
-                                "ll_mdt_attr");
+                                "ll_mdt_attr", 0);
         if (!mds->mds_setattr_service) {
                 CERROR("failed to start getattr service\n");
                 GOTO(err_thread, rc = -ENOMEM);
@@ -2592,9 +2643,9 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
                                 MDS_MAXREPSIZE, MDS_READPAGE_PORTAL,
                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
                                 mds_handle, "mds_readpage",
-                                obd->obd_proc_entry, NULL, 
+                                obd->obd_proc_entry, NULL,
                                 MDS_THREADS_MIN_READPAGE, mds_max_threads,
-                                "ll_mdt_rdpg");
+                                "ll_mdt_rdpg", 0);
         if (!mds->mds_readpage_service) {
                 CERROR("failed to start readpage service\n");
                 GOTO(err_thread2, rc = -ENOMEM);
@@ -2678,15 +2729,18 @@ static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
 static int mds_health_check(struct obd_device *obd)
 {
         struct obd_device_target *odt = &obd->u.obt;
+#ifdef USE_HEALTH_CHECK_WRITE
         struct mds_obd *mds = &obd->u.mds;
+#endif
         int rc = 0;
 
         if (odt->obt_sb->s_flags & MS_RDONLY)
                 rc = 1;
 
+#ifdef USE_HEALTH_CHECK_WRITE
         LASSERT(mds->mds_health_check_filp != NULL);
         rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp);
-
+#endif
         return rc;
 }
 
@@ -2696,10 +2750,9 @@ static int mds_process_config(struct obd_device *obd, obd_count len, void *buf)
         struct lprocfs_static_vars lvars;
         int rc;
 
-        lprocfs_init_vars(mds, &lvars);
-        
+        lprocfs_mds_init_vars(&lvars);
+
         rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd);
-        
         return(rc);
 }
 
@@ -2740,7 +2793,7 @@ static struct obd_ops mdt_obd_ops = {
 quota_interface_t *mds_quota_interface_ref;
 extern quota_interface_t mds_quota_interface;
 
-static int __init mds_init(void)
+static __attribute__((unused)) int __init mds_init(void)
 {
         int rc;
         struct lprocfs_static_vars lvars;
@@ -2754,28 +2807,191 @@ static int __init mds_init(void)
                 return rc;
         }
         init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops);
-        
-        lprocfs_init_vars(mds, &lvars);
-        class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
-        lprocfs_init_vars(mdt, &lvars);
-        class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
+
+        lprocfs_mds_init_vars(&lvars);
+        class_register_type(&mds_obd_ops, NULL,
+                            lvars.module_vars, LUSTRE_MDS_NAME, NULL);
+        lprocfs_mds_init_vars(&lvars);
+        mdt_obd_ops = mdt_obd_ops; //make compiler happy
+//        class_register_type(&mdt_obd_ops, NULL,
+//                            lvars.module_vars, LUSTRE_MDT_NAME, NULL);
 
         return 0;
 }
 
-static void /*__exit*/ mds_exit(void)
+static __attribute__((unused)) void /*__exit*/ mds_exit(void)
 {
         lquota_exit(mds_quota_interface_ref);
         if (mds_quota_interface_ref)
                 PORTAL_SYMBOL_PUT(mds_quota_interface);
 
         class_unregister_type(LUSTRE_MDS_NAME);
-        class_unregister_type(LUSTRE_MDT_NAME);
+//        class_unregister_type(LUSTRE_MDT_NAME);
+}
+/*mds still need lov setup here*/
+static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lvfs_run_ctxt saved;
+        const char     *dev;
+        struct vfsmount *mnt;
+        struct lustre_sb_info *lsi;
+        struct lustre_mount_info *lmi;
+        struct dentry  *dentry;
+        int rc = 0;
+        ENTRY;
+
+        CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
+        if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+                RETURN(0);
+
+        if (lcfg->lcfg_bufcount < 5) {
+                CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
+                RETURN(-EINVAL);
+        }
+        dev = lustre_cfg_string(lcfg, 4);
+        lmi = server_get_mount(dev);
+        LASSERT(lmi != NULL);
+
+        lsi = s2lsi(lmi->lmi_sb);
+        mnt = lmi->lmi_mnt;
+        /* FIXME: MDD LOV initialize objects.
+         * we need only lmi here but not get mount
+         * OSD did mount already, so put mount back
+         */
+        atomic_dec(&lsi->lsi_mounts);
+        mntput(mnt);
+
+        obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
+        mds_init_ctxt(obd, mnt);
+
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
+        if (IS_ERR(dentry)) {
+                rc = PTR_ERR(dentry);
+                CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
+                GOTO(err_putfs, rc);
+        }
+        mds->mds_objects_dir = dentry;
+
+        dentry = lookup_one_len("__iopen__", current->fs->pwd,
+                                strlen("__iopen__"));
+        if (IS_ERR(dentry)) {
+                rc = PTR_ERR(dentry);
+                CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
+                GOTO(err_objects, rc);
+        }
+
+        mds->mds_fid_de = dentry;
+        if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
+                rc = -ENOENT;
+                CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
+                GOTO(err_fid, rc);
+        }
+        rc = mds_lov_init_objids(obd);
+        if (rc != 0) {
+               CERROR("cannot init lov objid rc = %d\n", rc);
+               GOTO(err_fid, rc );
+        }
+
+        rc = mds_lov_presetup(mds, lcfg);
+        if (rc < 0)
+                GOTO(err_objects, rc);
+
+        /* Don't wait for mds_postrecov trying to clear orphans */
+        obd->obd_async_recov = 1;
+        rc = mds_postsetup(obd);
+        /* Bug 11557 - allow async abort_recov start
+           FIXME can remove most of this obd_async_recov plumbing
+        obd->obd_async_recov = 0;
+        */
+
+        if (rc)
+                GOTO(err_objects, rc);
+
+        mds->mds_max_mdsize = sizeof(struct lov_mds_md);
+        mds->mds_max_cookiesize = sizeof(struct llog_cookie);
+
+err_pop:
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        RETURN(rc);
+err_fid:
+        dput(mds->mds_fid_de);
+err_objects:
+        dput(mds->mds_objects_dir);
+err_putfs:
+        fsfilt_put_ops(obd->obd_fsops);
+        goto err_pop;
+}
+
+static int mds_cmd_cleanup(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lvfs_run_ctxt saved;
+        int rc = 0;
+        ENTRY;
+
+        if (obd->obd_fail)
+                LCONSOLE_WARN("%s: shutting down for failover; client state "
+                              "will be preserved.\n", obd->obd_name);
+
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+        mds_lov_destroy_objids(obd);
+
+        if (mds->mds_objects_dir != NULL) {
+                l_dput(mds->mds_objects_dir);
+                mds->mds_objects_dir = NULL;
+        }
+
+        shrink_dcache_parent(mds->mds_fid_de);
+        dput(mds->mds_fid_de);
+        LL_DQUOT_OFF(obd->u.obt.obt_sb);
+        fsfilt_put_ops(obd->obd_fsops);
+
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        RETURN(rc);
+}
+
+#if 0
+static int mds_cmd_health_check(struct obd_device *obd)
+{
+        return 0;
+}
+#endif
+static struct obd_ops mds_cmd_obd_ops = {
+        .o_owner           = THIS_MODULE,
+        .o_setup           = mds_cmd_setup,
+        .o_cleanup         = mds_cmd_cleanup,
+        .o_precleanup      = mds_precleanup,
+        .o_create          = mds_obd_create,
+        .o_destroy         = mds_obd_destroy,
+        .o_llog_init       = mds_llog_init,
+        .o_llog_finish     = mds_llog_finish,
+        .o_notify          = mds_notify,
+        .o_postrecov       = mds_postrecov,
+        //   .o_health_check    = mds_cmd_health_check,
+};
+
+static int __init mds_cmd_init(void)
+{
+        struct lprocfs_static_vars lvars;
+
+        lprocfs_mds_init_vars(&lvars);
+        class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
+                            LUSTRE_MDS_NAME, NULL);
+
+        return 0;
+}
+
+static void /*__exit*/ mds_cmd_exit(void)
+{
+        class_unregister_type(LUSTRE_MDS_NAME);
 }
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
 MODULE_LICENSE("GPL");
 
-module_init(mds_init);
-module_exit(mds_exit);
+module_init(mds_cmd_init);
+module_exit(mds_cmd_exit);