Whamcloud - gitweb
Open/create the last_rcvd file for MDS at filesystem mount time and close it
[fs/lustre-release.git] / lustre / mds / handler.c
index fe58fbc..a2cb3b6 100644 (file)
@@ -1,20 +1,21 @@
-/*
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
  *  linux/mds/handler.c
- *  
+ *
  *  Lustre Metadata Server (mds) request handler
- * 
- *  Copyright (C) 2001  Cluster File Systems, Inc.
+ *
+ *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
  *
  *  This code is issued under the GNU General Public License.
  *  See the file COPYING in this distribution
  *
  *  by Peter Braam <braam@clusterfs.com>
- * 
- *  This server is single threaded at present (but can easily be multi threaded)
- * 
+ *
+ *  This server is single threaded at present (but can easily be multi threaded)
+ *
  */
 
-
 #define EXPORT_SYMTAB
 
 #include <linux/version.h>
 #include <linux/fs.h>
 #include <linux/stat.h>
 #include <linux/locks.h>
-#include <linux/ext2_fs.h>
 #include <linux/quotaops.h>
 #include <asm/unistd.h>
-#include <linux/obd_support.h>
-#include <linux/obd.h>
-#include <linux/lustre_lib.h>
-#include <linux/lustre_idl.h>
-#include <linux/lustre_mds.h>
-#include <linux/obd_class.h>
+#include <asm/uaccess.h>
 
-// for testing
-static struct mds_obd *MDS;
+#define DEBUG_SUBSYSTEM S_MDS
 
-// for testing
-static int mds_queue_req(struct mds_request *req)
-{
-       
-       if (!MDS) { 
-               EXIT;
-               return -1;
-       }
-
-       list_add(&req->rq_list, &MDS->mds_reqs); 
-       init_waitqueue_head(&req->rq_wait_for_mds_rep);
-       req->rq_obd = MDS;
-       wake_up(&MDS->mds_waitq);
-       printk("-- sleeping\n");
-       interruptible_sleep_on(&req->rq_wait_for_mds_rep);
-       printk("-- done\n");
-       return 0;
-}
+#include <linux/lustre_mds.h>
+#include <linux/lustre_lib.h>
+#include <linux/lustre_net.h>
 
-static struct dentry *mds_fid2dentry(struct mds_obd *mds, struct lustre_fid *fid)
+int mds_sendpage(struct ptlrpc_request *req, struct file *file,
+                 __u64 offset, struct niobuf *dst)
 {
-       struct dentry *de;
-       struct inode *inode;
-
-       inode = iget(mds->mds_sb, fid->id);
-       if (!inode) { 
-               EXIT;
-       }
-
-       de = d_alloc_root(inode);
-       if (!de) { 
-               iput(inode);
-               EXIT;
-               return NULL;
-       }
-
-       de->d_inode = inode;
-       return de;
-}
+        int rc = 0;
+        mm_segment_t oldfs = get_fs();
+        struct ptlrpc_bulk_desc *bulk;
+        char *buf;
+
+        bulk = ptlrpc_prep_bulk(req->rq_connection);
+        if (bulk == NULL) {
+                rc = -ENOMEM;
+                GOTO(out, rc);
+        }
 
-int mds_getattr(struct mds_request *req)
-{
-       struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1);
-       struct inode *inode;
-       struct mds_rep *rep;
-       int rc;
-       
-       rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
-                         &req->rq_replen, &req->rq_repbuf);
-       if (rc) { 
-               EXIT;
-               printk("mds: out of memory\n");
-               req->rq_status = -ENOMEM;
-               return -ENOMEM;
-       }
-
-       req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
-       rep = req->rq_rep;
-
-       if (!de) { 
-               EXIT;
-               req->rq_rephdr->status = -ENOENT;
-               return 0;
-       }
-
-       inode = de->d_inode;
-       rep->atime = inode->i_atime;
-       rep->ctime = inode->i_ctime;
-       rep->mtime = inode->i_mtime;
-       rep->uid = inode->i_uid;
-       rep->gid = inode->i_gid;
-       rep->size = inode->i_size;
-       rep->mode = inode->i_mode;
-
-       dput(de); 
-       return 0;
-}
+        bulk->b_xid = req->rq_reqmsg->xid;
 
-int mds_reply(struct mds_request *req)
-{
-       ENTRY;
-       kfree(req->rq_reqbuf);
-       req->rq_reqbuf = NULL; 
-       wake_up_interruptible(&req->rq_wait_for_mds_rep); 
-       EXIT;
-       return 0;
-}
+        OBD_ALLOC(buf, PAGE_SIZE);
+        if (!buf) {
+                rc = -ENOMEM;
+                GOTO(cleanup_bulk, rc);
+        }
 
-int mds_error(struct mds_request *req)
-{
-       struct mds_rep_hdr *hdr;
-
-       ENTRY;
-       hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
-       if (!hdr) { 
-               EXIT;
-               return -ENOMEM;
-       }
-
-       memset(hdr, 0, sizeof(*hdr));
-       
-       hdr->seqno = req->rq_reqhdr->seqno;
-       hdr->status = req->rq_status; 
-       hdr->type = MDS_TYPE_ERR;
-       req->rq_repbuf = (char *)hdr;
-
-       EXIT;
-       return mds_reply(req);
-}
+        set_fs(KERNEL_DS);
+        rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
+                             &offset);
+        set_fs(oldfs);
 
-//int mds_handle(struct mds_conn *conn, int len, char *buf)
-int mds_handle(struct mds_request *req)
-{
-       int rc;
-       struct mds_req_hdr *hdr;
+        if (rc != PAGE_SIZE) {
+                rc = -EIO;
+                GOTO(cleanup_buf, rc);
+        }
 
-       ENTRY;
+        bulk->b_buf = buf;
+        bulk->b_buflen = PAGE_SIZE;
 
-       hdr = (struct mds_req_hdr *)req->rq_reqbuf;
+        rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
+                CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
+                       OBD_FAIL_MDS_SENDPAGE, rc);
+                PtlMDUnlink(bulk->b_md_h);
+                GOTO(cleanup_buf, rc);
+        }
+        wait_event_interruptible(bulk->b_waitq,
+                                 ptlrpc_check_bulk_sent(bulk));
 
-       if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
-               printk("lustre_mds: wrong packet type sent %d\n",
-                      NTOH__u32(hdr->type));
-               rc = -EINVAL;
-               goto out;
-       }
+        if (bulk->b_flags == PTL_RPC_INTR) {
+                rc = -EINTR;
+                GOTO(cleanup_buf, rc);
+        }
 
-       rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
-                           &req->rq_reqhdr, &req->rq_req);
-       if (rc) { 
-               printk("lustre_mds: Invalid request\n");
-               EXIT; 
-               goto out;
-       }
+        EXIT;
+ cleanup_buf:
+        OBD_FREE(buf, PAGE_SIZE);
+ cleanup_bulk:
+        OBD_FREE(bulk, sizeof(*bulk));
+ out:
+        return rc;
+}
 
-       switch (req->rq_reqhdr->opc) { 
+struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
+                              struct vfsmount **mnt)
+{
+        /* stolen from NFS */
+        struct super_block *sb = mds->mds_sb;
+        unsigned long ino = fid->id;
+        __u32 generation = fid->generation;
+        struct inode *inode;
+        struct list_head *lp;
+        struct dentry *result;
+
+        if (ino == 0)
+                return ERR_PTR(-ESTALE);
+
+        inode = iget(sb, ino);
+        if (inode == NULL)
+                return ERR_PTR(-ENOMEM);
+
+        CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
+
+        if (is_bad_inode(inode) ||
+            (generation && inode->i_generation != generation)) {
+                /* we didn't find the right inode.. */
+                CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
+                        inode->i_ino,
+                        inode->i_nlink, atomic_read(&inode->i_count),
+                        inode->i_generation,
+                        generation);
+                LBUG();
+                iput(inode);
+                return ERR_PTR(-ESTALE);
+        }
 
-       case MDS_GETATTR:
-               CDEBUG(D_INODE, "getattr\n");
-               rc = mds_getattr(req);
-               break;
+        /* now to find a dentry.
+         * If possible, get a well-connected one
+         */
+        if (mnt)
+                *mnt = mds->mds_vfsmnt;
+        spin_lock(&dcache_lock);
+        for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
+                result = list_entry(lp,struct dentry, d_alias);
+                if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
+                        dget_locked(result);
+                        result->d_vfs_flags |= DCACHE_REFERENCED;
+                        spin_unlock(&dcache_lock);
+                        iput(inode);
+                        if (mnt)
+                                mntget(*mnt);
+                        return result;
+                }
+        }
+        spin_unlock(&dcache_lock);
+        result = d_alloc_root(inode);
+        if (result == NULL) {
+                iput(inode);
+                return ERR_PTR(-ENOMEM);
+        }
+        if (mnt)
+                mntget(*mnt);
+        result->d_flags |= DCACHE_NFSD_DISCONNECTED;
+        return result;
+}
 
-       case MDS_OPEN:
-               return mds_getattr(req);
+int mds_getattr(struct ptlrpc_request *req)
+{
+        struct dentry *de;
+        struct inode *inode;
+        struct mds_body *body;
+        struct mds_obd *mds = &req->rq_obd->u.mds;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       case MDS_SETATTR:
-               return mds_getattr(req);
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
+                CERROR("mds: out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
 
-       case MDS_CREATE:
-               return mds_getattr(req);
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        de = mds_fid2dentry(mds, &body->fid1, NULL);
+        if (IS_ERR(de)) {
+                req->rq_status = -ENOENT;
+                RETURN(0);
+        }
 
-       case MDS_MKDIR:
-               return mds_getattr(req);
+        body = lustre_msg_buf(req->rq_repmsg, 0);
+        inode = de->d_inode;
+        body->ino = inode->i_ino;
+        body->generation = inode->i_generation;
+        body->atime = inode->i_atime;
+        body->ctime = inode->i_ctime;
+        body->mtime = inode->i_mtime;
+        body->uid = inode->i_uid;
+        body->gid = inode->i_gid;
+        body->size = inode->i_size;
+        body->mode = inode->i_mode;
+        body->nlink = inode->i_nlink;
+        body->valid = ~0;
+        mds_fs_get_objid(mds, inode, &body->objid);
+        l_dput(de);
+        RETURN(0);
+}
 
-       case MDS_RMDIR:
-               return mds_getattr(req);
+int mds_open(struct ptlrpc_request *req)
+{
+        struct dentry *de;
+        struct mds_body *body;
+        struct file *file;
+        struct vfsmount *mnt;
+        __u32 flags;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       case MDS_SYMLINK:
-               return mds_getattr(req);
-       case MDS_LINK:
-               return mds_getattr(req);
-  
-       case MDS_MKNOD:
-               return mds_getattr(req);
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
+                CERROR("mds: out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
 
-       case MDS_UNLINK:
-               return mds_getattr(req);
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
+        if (IS_ERR(de)) {
+                req->rq_status = -ENOENT;
+                RETURN(0);
+        }
+        flags = body->flags;
+        file = dentry_open(de, mnt, flags);
+        if (!file || IS_ERR(file)) {
+                req->rq_status = -EINVAL;
+                RETURN(0);
+        }
 
-       case MDS_RENAME:
-               return mds_getattr(req);
+        body = lustre_msg_buf(req->rq_repmsg, 0);
+        body->objid = (__u64) (unsigned long)file;
+        RETURN(0);
+}
 
-       default:
-               return mds_error(req);
-       }
+int mds_close(struct ptlrpc_request *req)
+{
+        struct dentry *de;
+        struct mds_body *body;
+        struct file *file;
+        struct vfsmount *mnt;
+        int rc;
+        ENTRY;
 
-out:
-       if (rc) { 
-               printk("mds: processing error %d\n", rc);
-               mds_error(req);
-       } else { 
-               CDEBUG(D_INODE, "sending reply\n"); 
-               mds_reply(req); 
-       }
-
-       return 0;
-}
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
+                CERROR("mds: out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
 
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
+        if (IS_ERR(de)) {
+                req->rq_status = -ENOENT;
+                RETURN(0);
+        }
 
-static void mds_timer_run(unsigned long __data)
-{
-       struct task_struct * p = (struct task_struct *) __data;
+        file = (struct file *)(unsigned long)body->objid;
+        req->rq_status = filp_close(file, 0);
+        l_dput(de);
+        mntput(mnt);
 
-       wake_up_process(p);
+        RETURN(0);
 }
 
-int mds_main(void *arg)
+int mds_readpage(struct ptlrpc_request *req)
 {
-       struct mds_obd *mds = (struct mds_obd *) arg;
-       struct timer_list timer;
-
-       lock_kernel();
-       daemonize();
-       spin_lock_irq(&current->sigmask_lock);
-       sigfillset(&current->blocked);
-       recalc_sigpending(current);
-       spin_unlock_irq(&current->sigmask_lock);
+        struct vfsmount *mnt;
+        struct dentry *de;
+        struct file *file;
+        struct niobuf *niobuf;
+        struct mds_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-       sprintf(current->comm, "lustre_mds");
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
+                CERROR("mds: out of memory\n");
+                req->rq_status = -ENOMEM;
+                RETURN(0);
+        }
 
-       /* Set up an interval timer which can be used to trigger a
-           wakeup after the interval expires */
-       init_timer(&timer);
-       timer.data = (unsigned long) current;
-       timer.function = mds_timer_run;
-       mds->mds_timer = &timer;
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
+        if (IS_ERR(de)) {
+                req->rq_status = PTR_ERR(de);
+                RETURN(0);
+        }
 
-       /* Record that the  thread is running */
-       mds->mds_thread = current;
-       wake_up(&mds->mds_done_waitq); 
+        CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
 
-       printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
-                       mds->mds_interval / HZ);
+        file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
+        /* note: in case of an error, dentry_open puts dentry */
+        if (IS_ERR(file)) {
+                req->rq_status = PTR_ERR(file);
+                RETURN(0);
+        }
 
-       /* XXX maintain a list of all managed devices: insert here */
+        niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
+        if (!niobuf) {
+                req->rq_status = -EINVAL;
+                LBUG();
+                RETURN(0);
+        }
 
-       /* And now, wait forever for commit wakeup events. */
-       while (1) {
-               struct mds_request *request;
-               int rc; 
+        /* to make this asynchronous make sure that the handling function
+           doesn't send a reply when this function completes. Instead a
+           callback function would send the reply */
+        rc = mds_sendpage(req, file, body->size, niobuf);
 
-               if (mds->mds_flags & MDS_UNMOUNT)
-                       break;
+        filp_close(file, 0);
+        req->rq_status = rc;
+        RETURN(0);
+}
 
+int mds_reint(struct ptlrpc_request *req)
+{
+        int rc;
+        struct mds_update_record rec;
+
+        rc = mds_update_unpack(req, &rec);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
+                CERROR("invalid record\n");
+                req->rq_status = -EINVAL;
+                RETURN(0);
+        }
+        /* rc will be used to interrupt a for loop over multiple records */
+        rc = mds_reint_rec(&rec, req);
+        return 0;
+}
 
-               wake_up(&mds->mds_done_waitq);
-               interruptible_sleep_on(&mds->mds_waitq);
+int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
+               struct ptlrpc_request *req)
+{
+        int rc;
+        ENTRY;
 
-               CDEBUG(D_INODE, "lustre_mds wakes\n");
-               CDEBUG(D_INODE, "pick up req here and continue\n"); 
+        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
+                CERROR("lustre_mds: Invalid request\n");
+                GOTO(out, rc);
+        }
 
-               if (list_empty(&mds->mds_reqs)) { 
-                       CDEBUG(D_INODE, "woke because of timer\n"); 
-               } else { 
-                       request = list_entry(mds->mds_reqs.next, 
-                                            struct mds_request, rq_list);
-                       list_del(&request->rq_list);
-                       rc = mds_handle(request); 
-               }
-       }
+        if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
+                CERROR("lustre_mds: wrong packet type sent %d\n",
+                       req->rq_reqmsg->type);
+                GOTO(out, rc = -EINVAL);
+        }
 
-       del_timer_sync(mds->mds_timer);
+        switch (req->rq_reqmsg->opc) {
+        case MDS_GETATTR:
+                CDEBUG(D_INODE, "getattr\n");
+                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
+                rc = mds_getattr(req);
+                break;
+
+        case MDS_READPAGE:
+                CDEBUG(D_INODE, "readpage\n");
+                OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
+                rc = mds_readpage(req);
+
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
+                        return 0;
+                break;
+
+        case MDS_REINT:
+                CDEBUG(D_INODE, "reint\n");
+                OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
+                rc = mds_reint(req);
+                break;
+
+        case MDS_OPEN:
+                CDEBUG(D_INODE, "open\n");
+                OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
+                rc = mds_open(req);
+                break;
+
+        case MDS_CLOSE:
+                CDEBUG(D_INODE, "close\n");
+                OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
+                rc = mds_close(req);
+                break;
+
+        default:
+                rc = ptlrpc_error(svc, req);
+                RETURN(rc);
+        }
 
-       /* XXX maintain a list of all managed devices: cleanup here */
+        EXIT;
+out:
+        if (rc) {
+                ptlrpc_error(svc, req);
+        } else {
+                CDEBUG(D_NET, "sending reply\n");
+                ptlrpc_reply(svc, req);
+        }
 
-       mds->mds_thread = NULL;
-       wake_up(&mds->mds_done_waitq);
-       printk("lustre_mds: exiting\n");
-       return 0;
+        return 0;
 }
 
-static void mds_stop_srv_thread(struct mds_obd *mds)
+static int mds_prep(struct obd_device *obddev)
 {
-       mds->mds_flags |= MDS_UNMOUNT;
+        struct obd_run_ctxt saved;
+        struct mds_obd *mds = &obddev->u.mds;
+        struct super_operations *s_ops;
+        struct file *f;
+        int err;
+
+        mds->mds_service = ptlrpc_init_svc(128 * 1024,
+                                           MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
+                                           "self", mds_handle);
+
+        if (!mds->mds_service) {
+                CERROR("failed to start service\n");
+                RETURN(-EINVAL);
+        }
 
-       while (mds->mds_thread) {
-               wake_up(&mds->mds_waitq);
-               sleep_on(&mds->mds_done_waitq);
-       }
-}
+        err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
+        if (err) {
+                CERROR("cannot start thread\n");
+                GOTO(err_svc, err);
+        }
 
-static void mds_start_srv_thread(struct mds_obd *mds)
-{
-       init_waitqueue_head(&mds->mds_waitq);
-       init_waitqueue_head(&mds->mds_done_waitq);
-       kernel_thread(mds_main, (void *)mds, 
-                     CLONE_VM | CLONE_FS | CLONE_FILES);
-       while (!mds->mds_thread) 
-               sleep_on(&mds->mds_done_waitq);
+        push_ctxt(&saved, &mds->mds_ctxt);
+        err = simple_mkdir(current->fs->pwd, "ROOT", 0700);
+        if (err && err != -EEXIST) {
+                CERROR("cannot create ROOT directory\n");
+                GOTO(err_svc, err);
+        }
+        err = simple_mkdir(current->fs->pwd, "FH", 0700);
+        if (err && err != -EEXIST) {
+                CERROR("cannot create FH directory\n");
+                GOTO(err_svc, err);
+        }
+        f = filp_open("last_rcvd", O_RDWR | O_CREAT, 0644);
+        if (IS_ERR(f)) {
+                CERROR("cannot open/create last_rcvd file\n");
+                GOTO(err_svc, err = PTR_ERR(f));
+        }
+        mds->last_rcvd = f;
+        pop_ctxt(&saved);
+
+        /*
+         * Replace the client filesystem delete_inode method with our own,
+         * so that we can clear the object ID before the inode is deleted.
+         * The fs_delete_inode method will call cl_delete_inode for us.
+         *
+         * We need to do this for the MDS superblock only, hence we install
+         * a modified copy of the original superblock method table.
+         *
+         * We still assume that there is only a single MDS client filesystem
+         * type, as we don't have access to the mds struct in * delete_inode.
+         */
+        OBD_ALLOC(s_ops, sizeof(*s_ops));
+        memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
+        mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
+        s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
+        mds->mds_sb->s_op = s_ops;
+
+        RETURN(0);
+
+err_svc:
+        rpc_unregister_service(mds->mds_service);
+        OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
+
+        return(err);
 }
 
 /* mount the file system (secretly) */
-static int mds_setup(struct obd_device *obddev, obd_count len,
-                       void *buf)
-                       
+static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
-       struct obd_ioctl_data* data = buf;
-       struct mds_obd *mds = &obddev->u.mds;
-       struct vfsmount *mnt;
-       int err; 
+        struct obd_ioctl_data* data = buf;
+        struct mds_obd *mds = &obddev->u.mds;
+        struct vfsmount *mnt;
+        int err = 0;
         ENTRY;
-       
-       mnt = do_kern_mount(data->ioc_inlbuf2, 0, 
-                           data->ioc_inlbuf1, NULL); 
-       err = PTR_ERR(mnt);
-       if (IS_ERR(mnt)) { 
-               EXIT;
-               return err;
-       }
-
-       mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
-       if (!obddev->u.mds.mds_sb) {
-               EXIT;
-               return -ENODEV;
-       }
-
-       INIT_LIST_HEAD(&mds->mds_reqs);
-       mds->mds_thread = NULL;
-       mds->mds_flags = 0;
-       mds->mds_interval = 3 * HZ;
-       mds->mds_vfsmnt = mnt;
-       obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
-
-       mds->mds_ctxt.pwdmnt = mnt;
-       mds->mds_ctxt.pwd = mnt->mnt_root;
-       mds->mds_ctxt.fs = KERNEL_DS;
-       MDS = mds;
-
-       spin_lock_init(&obddev->u.mds.fo_lock);
-
-       mds_start_srv_thread(mds);
+
+#ifdef CONFIG_DEV_RDONLY
+        dev_clear_rdonly(2);
+#endif
+        mds->mds_fstype = strdup(data->ioc_inlbuf2);
+
+        if (!strcmp(mds->mds_fstype, "ext3"))
+                mds->mds_fsops = &mds_ext3_fs_ops;
+        else if (!strcmp(mds->mds_fstype, "ext2"))
+                mds->mds_fsops = &mds_ext2_fs_ops;
+        else {
+                CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
+                GOTO(err_kfree, (err = -EPERM));
+        }
 
         MOD_INC_USE_COUNT;
-        EXIT; 
-        return 0;
-} 
+        mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
+        if (IS_ERR(mnt)) {
+                CERROR("do_kern_mount failed: %d\n", err);
+                GOTO(err_dec, err = PTR_ERR(mnt));
+        }
+
+        mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
+        if (!mds->mds_sb)
+                GOTO(err_put, (err = -ENODEV));
+
+        mds->mds_vfsmnt = mnt;
+        mds->mds_ctxt.pwdmnt = mnt;
+        mds->mds_ctxt.pwd = mnt->mnt_root;
+        mds->mds_ctxt.fs = KERNEL_DS;
+
+        err = mds_prep(obddev);
+        if (err)
+                GOTO(err_put, err);
+
+        RETURN(0);
+
+err_put:
+        unlock_kernel();
+        mntput(mds->mds_vfsmnt);
+        mds->mds_sb = 0;
+        lock_kernel();
+err_dec:
+        MOD_DEC_USE_COUNT;
+err_kfree:
+        kfree(mds->mds_fstype);
+        return err;
+}
 
 static int mds_cleanup(struct obd_device * obddev)
 {
+        struct super_operations *s_ops = NULL;
         struct super_block *sb;
-       struct mds_obd *mds = &obddev->u.mds;
+        struct mds_obd *mds = &obddev->u.mds;
 
         ENTRY;
 
-        if ( !(obddev->obd_flags & OBD_SET_UP) ) {
-                EXIT;
-                return 0;
+        if ( !list_empty(&obddev->obd_gen_clients) ) {
+                CERROR("still has clients!\n");
+                RETURN(-EBUSY);
         }
 
-        if ( !list_empty(&obddev->obd_gen_clients) ) {
-                printk(KERN_WARNING __FUNCTION__ ": still has clients!\n");
-                EXIT;
-                return -EBUSY;
+        ptlrpc_stop_thread(mds->mds_service);
+        rpc_unregister_service(mds->mds_service);
+        if (!list_empty(&mds->mds_service->srv_reqs)) {
+                // XXX reply with errors and clean up
+                CERROR("Request list not empty!\n");
         }
+        OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
 
-       MDS = NULL;
-       mds_stop_srv_thread(mds);
         sb = mds->mds_sb;
-        if (!mds->mds_sb){
-                EXIT;
-                return 0;
-        }
+        if (!mds->mds_sb)
+                RETURN(0);
+
+        if (mds->last_rcvd) {
+                int rc = filp_close(mds->last_rcvd, 0);
+                mds->last_rcvd = NULL;
 
-       if (!list_empty(&mds->mds_reqs)) {
-               // XXX reply with errors and clean up
-               CDEBUG(D_INODE, "Request list not empty!\n");
-       }
+                if (rc)
+                        CERROR("last_rcvd file won't close, rc=%d\n", rc);
+        }
+        s_ops = sb->s_op;
 
-       unlock_kernel();
-       mntput(mds->mds_vfsmnt); 
+        unlock_kernel();
+        mntput(mds->mds_vfsmnt);
         mds->mds_sb = 0;
-       kfree(mds->mds_fstype);
-       lock_kernel();
-       
+        kfree(mds->mds_fstype);
+        lock_kernel();
+#ifdef CONFIG_DEV_RDONLY
+        dev_clear_rdonly(2);
+#endif
+        OBD_FREE(s_ops, sizeof(*s_ops));
 
         MOD_DEC_USE_COUNT;
-        EXIT;
-        return 0;
+        RETURN(0);
 }
 
 /* use obd ops to offer management infrastructure */
@@ -414,21 +579,17 @@ static struct obd_ops mds_obd_ops = {
 static int __init mds_init(void)
 {
         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
-       return 0;
+        return 0;
 }
 
 static void __exit mds_exit(void)
 {
-       obd_unregister_type(LUSTRE_MDS_NAME);
+        obd_unregister_type(LUSTRE_MDS_NAME);
 }
 
 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
 MODULE_LICENSE("GPL");
 
-
-// for testing (maybe this stays)
-EXPORT_SYMBOL(mds_queue_req);
-
 module_init(mds_init);
 module_exit(mds_exit);