Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / mds / handler.c
index 3a99194..6d1a6e7 100644 (file)
@@ -50,6 +50,7 @@
 # include <linux/locks.h>
 #endif
 
+#include <linux/lustre_acl.h>
 #include <obd_class.h>
 #include <lustre_dlm.h>
 #include <obd_lov.h>
@@ -142,7 +143,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
         }
 
-        DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
+        DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s",
                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
                   desc->bd_nob_transferred, count,
                   req->rq_export->exp_client_uuid.uuid,
@@ -169,13 +170,13 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
                                      struct lustre_handle *lockh,
-                                     char *name, int namelen, __u64 lockpart)
+                                     __u64 lockpart)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
         struct ldlm_res_id res_id = { .name = {0} };
         int flags = LDLM_FL_ATOMIC_CB, rc;
-        ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; 
+        ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
         ENTRY;
 
         if (IS_ERR(de))
@@ -183,8 +184,8 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
 
         res_id.name[0] = de->d_inode->i_ino;
         res_id.name[1] = de->d_inode->i_generation;
-        rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id, 
-                                    LDLM_IBITS, &policy, lock_mode, &flags, 
+        rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
+                                    LDLM_IBITS, &policy, lock_mode, &flags,
                                     ldlm_blocking_ast, ldlm_completion_ast,
                                     NULL, NULL, 0, NULL, lockh);
         if (rc != ELDLM_OK) {
@@ -254,12 +255,12 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
         RETURN(result);
 }
 
-static int mds_connect_internal(struct obd_export *exp, 
+static int mds_connect_internal(struct obd_export *exp,
                                 struct obd_connect_data *data)
 {
         struct obd_device *obd = exp->exp_obd;
         if (data != NULL) {
-                data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
+                data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
 
                 /* If no known bits (which should not happen, probably,
@@ -310,25 +311,19 @@ static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
  * about that client, like open files, the last operation number it did
  * on the server, etc.
  */
-static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
+static int mds_connect(const struct lu_env *env,
+                       struct lustre_handle *conn, struct obd_device *obd,
                        struct obd_uuid *cluuid, struct obd_connect_data *data)
 {
         struct obd_export *exp;
         struct mds_export_data *med;
         struct mds_client_data *mcd = NULL;
-        int rc, abort_recovery;
+        int rc;
         ENTRY;
 
         if (!conn || !obd || !cluuid)
                 RETURN(-EINVAL);
 
-        /* Check for aborted recovery. */
-        spin_lock_bh(&obd->obd_processing_task_lock);
-        abort_recovery = obd->obd_abort_recovery;
-        spin_unlock_bh(&obd->obd_processing_task_lock);
-        if (abort_recovery)
-                target_abort_recovery(obd);
-
         /* XXX There is a small race between checking the list and adding a
          * new connection for the same UUID, but the real threat (list
          * corruption when multiple different clients connect) is solved.
@@ -380,7 +375,7 @@ int mds_init_export(struct obd_export *exp)
 
         INIT_LIST_HEAD(&med->med_open_head);
         spin_lock_init(&med->med_open_lock);
-       
+        
         spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         spin_unlock(&exp->exp_lock);
@@ -420,7 +415,7 @@ static int mds_destroy_export(struct obd_export *export)
                 CWARN("%s: allocation failure during cleanup; can not force "
                       "close file handles on this service.\n", obd->obd_name);
                 OBD_FREE(lmm, mds->mds_max_mdsize);
-                GOTO(out, rc = -ENOMEM);
+                GOTO(out_lmm, rc = -ENOMEM);
         }
 
         spin_lock(&med->med_open_lock);
@@ -444,7 +439,7 @@ static int mds_destroy_export(struct obd_export *export)
                        mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name,
                        mfd->mfd_dentry->d_inode->i_ino);
 
-                rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm,&lmm_size,1);
+                rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm, &lmm_size, 1);
                 if (rc < 0)
                         CWARN("mds_get_md failure, rc=%d\n", rc);
                 else
@@ -453,7 +448,6 @@ static int mds_destroy_export(struct obd_export *export)
                 /* child orphan sem protects orphan_dec_test and
                  * is_orphan race, mds_mfd_close drops it */
                 MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode);
-
                 rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd,
                                    !(export->exp_flags & OBD_OPT_FAILOVER),
                                    lmm, lmm_size, logcookies,
@@ -476,16 +470,14 @@ static int mds_destroy_export(struct obd_export *export)
 
                 spin_lock(&med->med_open_lock);
         }
+        spin_unlock(&med->med_open_lock);
 
         OBD_FREE(logcookies, mds->mds_max_cookiesize);
+out_lmm:
         OBD_FREE(lmm, mds->mds_max_mdsize);
-
-        spin_unlock(&med->med_open_lock);
-
+out:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         mds_client_free(export);
-
- out:
         RETURN(rc);
 }
 
@@ -546,7 +538,7 @@ static int mds_getstatus(struct ptlrpc_request *req)
 
 /* get the LOV EA from @inode and store it into @md.  It can be at most
  * @size bytes, and @size is updated with the actual EA size.
- * The EA size is also returned on success, and -ve errno on failure. 
+ * The EA size is also returned on success, and -ve errno on failure.
  * If there is no EA then 0 is returned. */
 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
                int *size, int lock)
@@ -864,7 +856,7 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
         struct lvfs_run_ctxt saved;
         struct mds_body *body;
         struct dentry *dparent = NULL, *dchild = NULL;
-        struct lvfs_ucred uc = {NULL,};
+        struct lvfs_ucred uc = {0,};
         struct lustre_handle parent_lockh;
         int namesize;
         int rc = 0, cleanup_phase = 0, resent_req = 0;
@@ -939,10 +931,10 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
 
         if (resent_req == 0) {
                 if (name) {
-                        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
-                        rc = mds_get_parent_child_locked(obd, &obd->u.mds, 
+                        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
+                        rc = mds_get_parent_child_locked(obd, &obd->u.mds,
                                                          &body->fid1,
-                                                         &parent_lockh, 
+                                                         &parent_lockh,
                                                          &dparent, LCK_CR,
                                                          MDS_INODELOCK_UPDATE,
                                                          name, namesize,
@@ -952,11 +944,11 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                         /* For revalidate by fid we always take UPDATE lock */
                         dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
                                                        LCK_CR, child_lockh,
-                                                       NULL, 0, child_part);
+                                                       child_part);
                         LASSERT(dchild);
                         if (IS_ERR(dchild))
                                 rc = PTR_ERR(dchild);
-                } 
+                }
                 if (rc)
                         GOTO(cleanup, rc);
         } else {
@@ -1031,7 +1023,7 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
         struct lvfs_run_ctxt saved;
         struct dentry *de;
         struct mds_body *body;
-        struct lvfs_ucred uc = { NULL, };
+        struct lvfs_ucred uc = {0,};
         int rc = 0;
         ENTRY;
 
@@ -1059,7 +1051,8 @@ static int mds_getattr(struct ptlrpc_request *req, int offset)
                 GOTO(out_pop, rc);
         }
 
-        req->rq_status = mds_getattr_internal(obd, de, req, body,REPLY_REC_OFF);
+        req->rq_status = mds_getattr_internal(obd, de, req, body,
+                                              REPLY_REC_OFF);
 
         l_dput(de);
         GOTO(out_pop, rc);
@@ -1184,7 +1177,7 @@ static int mds_readpage(struct ptlrpc_request *req, int offset)
         struct mds_body *body, *repbody;
         struct lvfs_run_ctxt saved;
         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
-        struct lvfs_ucred uc = {NULL,};
+        struct lvfs_ucred uc = {0,};
         ENTRY;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
@@ -1275,8 +1268,8 @@ int mds_reint(struct ptlrpc_request *req, int offset,
         return rc;
 }
 
-static int mds_filter_recovery_request(struct ptlrpc_request *req,
-                                       struct obd_device *obd, int *process)
+int mds_filter_recovery_request(struct ptlrpc_request *req,
+                                struct obd_device *obd, int *process)
 {
         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
         case MDS_CONNECT: /* This will never get here, but for completeness. */
@@ -1287,21 +1280,23 @@ static int mds_filter_recovery_request(struct ptlrpc_request *req,
                RETURN(0);
 
         case MDS_CLOSE:
+        case MDS_DONE_WRITING:
         case MDS_SYNC: /* used in unmounting */
         case OBD_PING:
         case MDS_REINT:
+        case SEQ_QUERY:
+        case FLD_QUERY:
         case LDLM_ENQUEUE:
                 *process = target_queue_recovery_request(req, obd);
                 RETURN(0);
 
         default:
                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
-                *process = 0;
-                /* XXX what should we set rq_status to here? */
-                req->rq_status = -EAGAIN;
-                RETURN(ptlrpc_error(req));
+                *process = -EAGAIN;
+                RETURN(0);
         }
 }
+EXPORT_SYMBOL(mds_filter_recovery_request);
 
 static char *reint_names[] = {
         [REINT_SETATTR] "setattr",
@@ -1392,7 +1387,7 @@ static int mds_handle_quotactl(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int mds_msg_check_version(struct lustre_msg *msg)
+int mds_msg_check_version(struct lustre_msg *msg)
 {
         int rc;
 
@@ -1400,6 +1395,9 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_CONNECT:
         case MDS_DISCONNECT:
         case OBD_PING:
+        case SEC_CTX_INIT:
+        case SEC_CTX_INIT_CONT:
+        case SEC_CTX_FINI:
                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
@@ -1412,6 +1410,8 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_GETATTR_NAME:
         case MDS_STATFS:
         case MDS_READPAGE:
+        case MDS_WRITEPAGE:
+        case MDS_IS_SUBDIR:
         case MDS_REINT:
         case MDS_CLOSE:
         case MDS_DONE_WRITING:
@@ -1424,6 +1424,8 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         case MDS_QUOTACTL:
         case QUOTA_DQACQ:
         case QUOTA_DQREL:
+        case SEQ_QUERY:
+        case FLD_QUERY:
                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
                 if (rc)
                         CERROR("bad opc %u version %08x, expecting %08x\n",
@@ -1463,11 +1465,12 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         }
         return rc;
 }
+EXPORT_SYMBOL(mds_msg_check_version);
 
 int mds_handle(struct ptlrpc_request *req)
 {
         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
-        int rc = 0;
+        int rc;
         struct mds_obd *mds = NULL; /* quell gcc overwarning */
         struct obd_device *obd = NULL;
         ENTRY;
@@ -1485,7 +1488,7 @@ int mds_handle(struct ptlrpc_request *req)
         /* XXX identical to OST */
         if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) {
                 struct mds_export_data *med;
-                int recovering, abort_recovery;
+                int recovering;
 
                 if (req->rq_export == NULL) {
                         CERROR("operation %d on unconnected MDS from %s\n",
@@ -1497,7 +1500,7 @@ int mds_handle(struct ptlrpc_request *req)
 
                 med = &req->rq_export->exp_mds_data;
                 obd = req->rq_export->exp_obd;
-                mds = &obd->u.mds;
+                mds = mds_req2mds(req);
 
                 /* sanity check: if the xid matches, the request must
                  * be marked as a resent or replayed */
@@ -1520,16 +1523,18 @@ int mds_handle(struct ptlrpc_request *req)
 
                 /* Check for aborted recovery. */
                 spin_lock_bh(&obd->obd_processing_task_lock);
-                abort_recovery = obd->obd_abort_recovery;
                 recovering = obd->obd_recovering;
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-                if (abort_recovery) {
-                        target_abort_recovery(obd);
-                } else if (recovering) {
+                if (recovering) {
                         rc = mds_filter_recovery_request(req, obd,
                                                          &should_process);
                         if (rc || !should_process)
                                 RETURN(rc);
+                        else if (should_process < 0) {
+                                req->rq_status = should_process;
+                                rc = ptlrpc_error(req);
+                                RETURN(rc);
+                        }
                 }
         }
 
@@ -1537,9 +1542,15 @@ int mds_handle(struct ptlrpc_request *req)
         case MDS_CONNECT:
                 DEBUG_REQ(D_INODE, req, "connect");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
-                rc = target_handle_connect(req, mds_handle);
+                rc = target_handle_connect(req);
                 if (!rc) {
                         /* Now that we have an export, set mds. */
+                        /*
+                         * XXX nikita: these assignments are useless: mds is
+                         * never used below, and obd is only used for
+                         * MSG_LAST_REPLAY case, which never happens for
+                         * MDS_CONNECT.
+                         */
                         obd = req->rq_export->exp_obd;
                         mds = mds_req2mds(req);
                 }
@@ -1770,7 +1781,7 @@ int mds_handle(struct ptlrpc_request *req)
         /* If we're DISCONNECTing, the mds_export_data is already freed */
         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) {
                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
-                
+
                 /* I don't think last_xid is used for anyway, so I'm not sure
                    if we need to care about last_close_xid here.*/
                 lustre_msg_set_last_xid(req->rq_repmsg,
@@ -1782,15 +1793,6 @@ int mds_handle(struct ptlrpc_request *req)
         EXIT;
  out:
 
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (obd && obd->obd_recovering) {
-                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
-                        return target_queue_final_reply(req, rc);
-                }
-                /* Lost a race with recovery; let the error path DTRT. */
-                rc = req->rq_status = -ENOTCONN;
-        }
-
         target_send_reply(req, rc, fail);
         return 0;
 }
@@ -1824,7 +1826,6 @@ int mds_update_server_data(struct obd_device *obd, int force_sync)
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc)
                 CERROR("error writing MDS server data: rc = %d\n", rc);
-
         RETURN(rc);
 }
 
@@ -1869,6 +1870,30 @@ static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
                 options = ++p;
         }
 }
+static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
+{
+        int rc;
+        ENTRY;
+
+        rc = llog_start_commit_thread();
+        if (rc < 0)
+                RETURN(rc);
+
+        if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
+                class_uuid_t uuid;
+
+                ll_generate_random_uuid(uuid);
+                class_uuid_unparse(uuid, &mds->mds_lov_uuid);
+
+                OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
+                if (mds->mds_profile == NULL)
+                        RETURN(-ENOMEM);
+
+                strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
+                        LUSTRE_CFG_BUFLEN(lcfg, 3));
+        }
+        RETURN(rc);
+}
 
 /* mount the file system (secretly).  lustre_cfg parameters are:
  * 1 = device
@@ -1876,14 +1901,13 @@ static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
  * 3 = config name
  * 4 = mount options
  */
-static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
+static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
 {
         struct lprocfs_static_vars lvars;
-        struct lustre_cfg* lcfg = buf;
         struct mds_obd *mds = &obd->u.mds;
-        struct lustre_sb_info *lsi;
         struct lustre_mount_info *lmi;
         struct vfsmount *mnt;
+        struct lustre_sb_info *lsi;
         struct obd_uuid uuid;
         __u8 *uuid_ptr;
         char *str, *label;
@@ -1910,6 +1934,7 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
 
         /* We mounted in lustre_fill_super.
            lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
+                
         lsi = s2lsi(lmi->lmi_sb);
         fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts);
         fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts);
@@ -1936,6 +1961,15 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         }
         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
 
+        lprocfs_init_vars(mds, &lvars);
+        if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
+            lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
+                /* Init private stats here */
+                mds_stats_counter_init(obd->obd_stats);
+                obd->obd_proc_exports = proc_mkdir("exports",
+                                                   obd->obd_proc_entry);
+        }
+
         rc = mds_fs_setup(obd, mnt);
         if (rc) {
                 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
@@ -1943,24 +1977,10 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
                 GOTO(err_ns, rc);
         }
 
-        rc = llog_start_commit_thread();
+        rc = mds_lov_presetup(mds, lcfg);
         if (rc < 0)
                 GOTO(err_fs, rc);
 
-        if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
-                class_uuid_t uuid;
-
-                ll_generate_random_uuid(uuid);
-                class_uuid_unparse(uuid, &mds->mds_lov_uuid);
-
-                OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
-                if (mds->mds_profile == NULL)
-                        GOTO(err_fs, rc = -ENOMEM);
-
-                strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
-                        LUSTRE_CFG_BUFLEN(lcfg, 3));
-        }
-
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "mds_ldlm_client", &obd->obd_ldlm_client);
         obd->obd_replayable = 1;
@@ -1969,12 +1989,14 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         if (rc)
                 GOTO(err_fs, rc);
 
+#if 0
         mds->mds_group_hash = upcall_cache_init(obd->obd_name);
         if (IS_ERR(mds->mds_group_hash)) {
                 rc = PTR_ERR(mds->mds_group_hash);
                 mds->mds_group_hash = NULL;
                 GOTO(err_qctxt, rc);
         }
+#endif
 
         /* Don't wait for mds_postrecov trying to clear orphans */
         obd->obd_async_recov = 1;
@@ -1986,15 +2008,6 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         if (rc)
                 GOTO(err_qctxt, rc);
 
-        lprocfs_init_vars(mds, &lvars);
-        if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
-            lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
-                /* Init private stats here */
-                mds_stats_counter_init(obd->obd_stats);
-                obd->obd_proc_exports = proc_mkdir("exports",
-                                                   obd->obd_proc_entry);
-        }
-
         uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
         if (uuid_ptr != NULL) {
                 class_uuid_unparse(uuid_ptr, &uuid);
@@ -2013,8 +2026,8 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
                               "/proc/fs/lustre/mds/%s/recovery_status.\n",
                               obd->obd_name, lustre_cfg_string(lcfg, 1),
                               label ?: "", label ? "/" : "", str,
-                              obd->obd_recoverable_clients,
-                              (obd->obd_recoverable_clients == 1) ?
+                              obd->obd_max_recoverable_clients,
+                              (obd->obd_max_recoverable_clients == 1) ?
                               "client" : "clients",
                               (int)(OBD_RECOVERY_TIMEOUT) / 60,
                               (int)(OBD_RECOVERY_TIMEOUT) % 60,
@@ -2036,9 +2049,13 @@ err_qctxt:
 err_fs:
         /* No extra cleanup needed for llog_init_commit_thread() */
         mds_fs_cleanup(obd);
+#if 0
         upcall_cache_cleanup(mds->mds_group_hash);
         mds->mds_group_hash = NULL;
+#endif
 err_ns:
+        lprocfs_obd_cleanup(obd);
+        lprocfs_free_obd_stats(obd);
         ldlm_namespace_free(obd->obd_namespace, 0);
         obd->obd_namespace = NULL;
 err_ops:
@@ -2087,29 +2104,29 @@ static int mds_postsetup(struct obd_device *obd)
         int rc = 0;
         ENTRY;
 
-        rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, NULL, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
         if (mds->mds_profile) {
                 struct lustre_profile *lprof;
-                /* The profile defines which osc and mdc to connect to, for a 
+                /* The profile defines which osc and mdc to connect to, for a
                    client.  We reuse that here to figure out the name of the
-                   lov to use (and ignore lprof->lp_mdc).
-                   The profile was set in the config log with 
+                   lov to use (and ignore lprof->lp_md).
+                   The profile was set in the config log with
                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
                 lprof = class_get_profile(mds->mds_profile);
                 if (lprof == NULL) {
                         CERROR("No profile found: %s\n", mds->mds_profile);
                         GOTO(err_cleanup, rc = -ENOENT);
                 }
-                rc = mds_lov_connect(obd, lprof->lp_osc);
+                rc = mds_lov_connect(obd, lprof->lp_dt);
                 if (rc)
                         GOTO(err_cleanup, rc);
         }
@@ -2144,15 +2161,16 @@ int mds_postrecov(struct obd_device *obd)
         }
 
         /* clean PENDING dir */
-        rc = mds_cleanup_pending(obd);
-        if (rc < 0)
-                GOTO(out, rc);
+        if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+                rc = mds_cleanup_pending(obd);
+                if (rc < 0)
+                        GOTO(out, rc);
 
         /* FIXME Does target_finish_recovery really need this to block? */
         /* Notify the LOV, which will in turn call mds_notify for each tgt */
         /* This means that we have to hack obd_notify to think we're obd_set_up
            during mds_lov_connect. */
-        obd_notify(obd->u.mds.mds_osc_obd, NULL, 
+        obd_notify(obd->u.mds.mds_osc_obd, NULL,
                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
                    OBD_NOTIFY_SYNC, NULL);
 
@@ -2185,7 +2203,11 @@ static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
         case OBD_CLEANUP_EARLY:
                 break;
         case OBD_CLEANUP_EXPORTS:
-                target_cleanup_recovery(obd);
+                /*XXX Use this for mdd mds cleanup, so comment out
+                 *this target_cleanup_recovery for this tmp MDD MDS
+                 *Wangdi*/
+                if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+                        target_cleanup_recovery(obd);
                 mds_lov_early_clean(obd);
                 break;
         case OBD_CLEANUP_SELF_EXP:
@@ -2222,12 +2244,14 @@ static int mds_cleanup(struct obd_device *obd)
         lquota_cleanup(mds_quota_interface_ref, obd);
 
         mds_update_server_data(obd, 1);
-        if (mds->mds_lov_objids != NULL) 
+        if (mds->mds_lov_objids != NULL)
                 OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
         mds_fs_cleanup(obd);
 
+#if 0
         upcall_cache_cleanup(mds->mds_group_hash);
         mds->mds_group_hash = NULL;
+#endif
 
         server_put_mount(obd->obd_name, mds->mds_vfsmnt);
         obd->u.obt.obt_sb = NULL;
@@ -2494,7 +2518,7 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         RETURN(ELDLM_LOCK_REPLACED);
 }
 
-static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
+static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lprocfs_static_vars lvars;
@@ -2533,7 +2557,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
                                 mds_handle, LUSTRE_MDS_NAME,
                                 obd->obd_proc_entry, NULL, 
-                                mds_min_threads, mds_max_threads, "ll_mdt");
+                                mds_min_threads, mds_max_threads, "ll_mdt", 0);
 
         if (!mds->mds_service) {
                 CERROR("failed to start service\n");
@@ -2551,7 +2575,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
                                 mds_handle, "mds_setattr",
                                 obd->obd_proc_entry, NULL,
                                 mds_min_threads, mds_max_threads,
-                                "ll_mdt_attr");
+                                "ll_mdt_attr", 0);
         if (!mds->mds_setattr_service) {
                 CERROR("failed to start getattr service\n");
                 GOTO(err_thread, rc = -ENOMEM);
@@ -2568,7 +2592,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
                                 mds_handle, "mds_readpage",
                                 obd->obd_proc_entry, NULL, 
                                 MDS_THREADS_MIN_READPAGE, mds_max_threads,
-                                "ll_mdt_rdpg");
+                                "ll_mdt_rdpg", 0);
         if (!mds->mds_readpage_service) {
                 CERROR("failed to start readpage service\n");
                 GOTO(err_thread2, rc = -ENOMEM);
@@ -2580,7 +2604,7 @@ static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
                 GOTO(err_thread3, rc);
 
         ping_evictor_start();
-
+        
         RETURN(0);
 
 err_thread3:
@@ -2664,7 +2688,6 @@ static int mds_health_check(struct obd_device *obd)
         LASSERT(mds->mds_health_check_filp != NULL);
         rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp);
 #endif
-
         return rc;
 }
 
@@ -2675,9 +2698,8 @@ static int mds_process_config(struct obd_device *obd, obd_count len, void *buf)
         int rc;
 
         lprocfs_init_vars(mds, &lvars);
-        
+
         rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd);
-        
         return(rc);
 }
 
@@ -2718,7 +2740,7 @@ static struct obd_ops mdt_obd_ops = {
 quota_interface_t *mds_quota_interface_ref;
 extern quota_interface_t mds_quota_interface;
 
-static int __init mds_init(void)
+static __attribute__((unused)) int __init mds_init(void)
 {
         int rc;
         struct lprocfs_static_vars lvars;
@@ -2732,28 +2754,211 @@ static int __init mds_init(void)
                 return rc;
         }
         init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops);
-        
+
         lprocfs_init_vars(mds, &lvars);
-        class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
+        class_register_type(&mds_obd_ops, NULL,
+                            lvars.module_vars, LUSTRE_MDS_NAME, NULL);
         lprocfs_init_vars(mdt, &lvars);
-        class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
+        mdt_obd_ops = mdt_obd_ops; //make compiler happy
+//        class_register_type(&mdt_obd_ops, NULL,
+//                            lvars.module_vars, LUSTRE_MDT_NAME, NULL);
 
         return 0;
 }
 
-static void /*__exit*/ mds_exit(void)
+static __attribute__((unused)) void /*__exit*/ mds_exit(void)
 {
         lquota_exit(mds_quota_interface_ref);
         if (mds_quota_interface_ref)
                 PORTAL_SYMBOL_PUT(mds_quota_interface);
 
         class_unregister_type(LUSTRE_MDS_NAME);
-        class_unregister_type(LUSTRE_MDT_NAME);
+//        class_unregister_type(LUSTRE_MDT_NAME);
+}
+/*mds still need lov setup here*/
+static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lvfs_run_ctxt saved;
+        const char     *dev;
+        struct vfsmount *mnt;
+        struct lustre_sb_info *lsi;
+        struct lustre_mount_info *lmi;
+        struct dentry  *dentry;
+        struct file *file;
+        int rc = 0;
+        ENTRY;
+
+        CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
+        if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+                RETURN(0);
+
+        if (lcfg->lcfg_bufcount < 5) {
+                CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
+                RETURN(-EINVAL);
+        }
+        dev = lustre_cfg_string(lcfg, 4);
+        lmi = server_get_mount(dev);
+        LASSERT(lmi != NULL);
+
+        lsi = s2lsi(lmi->lmi_sb);
+        mnt = lmi->lmi_mnt;
+        /* FIXME: MDD LOV initialize objects.
+         * we need only lmi here but not get mount
+         * OSD did mount already, so put mount back
+         */
+        atomic_dec(&lsi->lsi_mounts);
+        mntput(mnt);
+
+        obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
+        mds_init_ctxt(obd, mnt);
+
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
+        if (IS_ERR(dentry)) {
+                rc = PTR_ERR(dentry);
+                CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
+                GOTO(err_putfs, rc);
+        }
+        mds->mds_objects_dir = dentry;
+
+        dentry = lookup_one_len("__iopen__", current->fs->pwd,
+                                strlen("__iopen__"));
+        if (IS_ERR(dentry)) {
+                rc = PTR_ERR(dentry);
+                CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
+                GOTO(err_objects, rc);
+        }
+
+        mds->mds_fid_de = dentry;
+        if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
+                rc = -ENOENT;
+                CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
+                GOTO(err_fid, rc);
+        }
+
+        /* open and test the lov objd file */
+        file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
+        if (IS_ERR(file)) {
+                rc = PTR_ERR(file);
+                CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
+                GOTO(err_fid, rc = PTR_ERR(file));
+        }
+        mds->mds_lov_objid_filp = file;
+        if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
+                CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
+                       file->f_dentry->d_inode->i_mode);
+                GOTO(err_lov_objid, rc = -ENOENT);
+        }
+
+        rc = mds_lov_presetup(mds, lcfg);
+        if (rc < 0)
+                GOTO(err_objects, rc);
+
+        /* Don't wait for mds_postrecov trying to clear orphans */
+        obd->obd_async_recov = 1;
+        rc = mds_postsetup(obd);
+        /* Bug 11557 - allow async abort_recov start
+           FIXME can remove most of this obd_async_recov plumbing
+        obd->obd_async_recov = 0;
+        */
+
+        if (rc)
+                GOTO(err_objects, rc);
+
+        mds->mds_max_mdsize = sizeof(struct lov_mds_md);
+        mds->mds_max_cookiesize = sizeof(struct llog_cookie);
+
+err_pop:
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        RETURN(rc);
+err_lov_objid:
+        if (mds->mds_lov_objid_filp &&
+                filp_close((struct file *)mds->mds_lov_objid_filp, 0))
+                CERROR("can't close %s after error\n", LOV_OBJID);
+err_fid:
+        dput(mds->mds_fid_de);
+err_objects:
+        dput(mds->mds_objects_dir);
+err_putfs:
+        fsfilt_put_ops(obd->obd_fsops);
+        goto err_pop;
+}
+
+static int mds_cmd_cleanup(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lvfs_run_ctxt saved;
+        int rc = 0;
+        ENTRY;
+
+        if (obd->obd_fail)
+                LCONSOLE_WARN("%s: shutting down for failover; client state "
+                              "will be preserved.\n", obd->obd_name);
+
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        if (mds->mds_lov_objid_filp) {
+                rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
+                mds->mds_lov_objid_filp = NULL;
+                if (rc)
+                        CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
+        }
+        if (mds->mds_objects_dir != NULL) {
+                l_dput(mds->mds_objects_dir);
+                mds->mds_objects_dir = NULL;
+        }
+
+        if (mds->mds_lov_objids != NULL)
+                OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
+
+        shrink_dcache_parent(mds->mds_fid_de);
+        dput(mds->mds_fid_de);
+        LL_DQUOT_OFF(obd->u.obt.obt_sb);
+        fsfilt_put_ops(obd->obd_fsops);
+
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        RETURN(rc);
+}
+
+#if 0
+static int mds_cmd_health_check(struct obd_device *obd)
+{
+        return 0;
+}
+#endif
+static struct obd_ops mds_cmd_obd_ops = {
+        .o_owner           = THIS_MODULE,
+        .o_setup           = mds_cmd_setup,
+        .o_cleanup         = mds_cmd_cleanup,
+        .o_precleanup      = mds_precleanup,
+        .o_create          = mds_obd_create,
+        .o_destroy         = mds_obd_destroy,
+        .o_llog_init       = mds_llog_init,
+        .o_llog_finish     = mds_llog_finish,
+        .o_notify          = mds_notify,
+        .o_postrecov       = mds_postrecov,
+        //   .o_health_check    = mds_cmd_health_check,
+};
+
+static int __init mds_cmd_init(void)
+{
+        struct lprocfs_static_vars lvars;
+
+        lprocfs_init_vars(mds, &lvars);
+        class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
+                            LUSTRE_MDS_NAME, NULL);
+
+        return 0;
+}
+
+static void /*__exit*/ mds_cmd_exit(void)
+{
+        class_unregister_type(LUSTRE_MDS_NAME);
 }
 
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
 MODULE_LICENSE("GPL");
 
-module_init(mds_init);
-module_exit(mds_exit);
+module_init(mds_cmd_init);
+module_exit(mds_cmd_exit);