Whamcloud - gitweb
- zero out the request structure after allocation
[fs/lustre-release.git] / lustre / mds / handler.c
index 2380d69..19b9691 100644 (file)
@@ -1,14 +1,20 @@
 /*
- *  linux/fs/ext2_obd/ext2_obd.c
+ *  linux/mds/handler.c
+ *  
+ *  Lustre Metadata Server (mds) request handler
+ * 
+ *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
  *
- * Copyright (C) 2001  Cluster File Systems, Inc.
+ *  This code is issued under the GNU General Public License.
+ *  See the file COPYING in this distribution
  *
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
- *
- * by Peter Braam <braam@clusterfs.com>
+ *  by Peter Braam <braam@clusterfs.com>
+ * 
+ *  This server is single threaded at present (but can easily be multi threaded). 
+ * 
  */
 
+
 #define EXPORT_SYMTAB
 
 #include <linux/version.h>
 #include <linux/ext2_fs.h>
 #include <linux/quotaops.h>
 #include <asm/unistd.h>
+#include <asm/uaccess.h>
 #include <linux/obd_support.h>
 #include <linux/obd.h>
 #include <linux/lustre_lib.h>
 #include <linux/lustre_idl.h>
 #include <linux/lustre_mds.h>
+#include <linux/lustre_net.h>
 #include <linux/obd_class.h>
 
+// XXX for testing
+static struct mds_obd *MDS;
 
-static struct dentry *mds_fid2dentry(struct mds_obd *mds, struct lustre_fid *fid)
+// XXX make this networked!  
+static int mds_queue_req(struct ptlrpc_request *req)
 {
-       struct dentry *de;
-       struct inode *inode;
+       struct ptlrpc_request *srv_req;
+       
+       if (!MDS) { 
+               EXIT;
+               return -1;
+       }
 
-       inode = iget(mds->mds_sb, fid->id);
-       if (!inode) { 
+       srv_req = kmalloc(sizeof(*srv_req), GFP_KERNEL);
+       if (!srv_req) { 
                EXIT;
+               return -ENOMEM;
        }
 
-       de = d_alloc_root(inode);
-       if (!de) { 
-               iput(inode);
+       printk("---> MDS at %d %p, incoming req %p, srv_req %p\n", 
+              __LINE__, MDS, req, srv_req);
+
+       memset(srv_req, 0, sizeof(*req)); 
+
+       /* move the request buffer */
+       srv_req->rq_reqbuf = req->rq_reqbuf;
+       srv_req->rq_reqlen    = req->rq_reqlen;
+       srv_req->rq_obd = MDS;
+
+       /* remember where it came from */
+       srv_req->rq_reply_handle = req;
+
+       list_add(&srv_req->rq_list, &MDS->mds_reqs); 
+       wake_up(&MDS->mds_waitq);
+       return 0;
+}
+
+/* XXX do this over the net */
+int mds_sendpage(struct ptlrpc_request *req, struct file *file, 
+                   __u64 offset, struct niobuf *dst)
+{
+       int rc; 
+       mm_segment_t oldfs = get_fs();
+
+       if (req->rq_peer.peer_nid == 0) {
+               /* dst->addr is a user address, but in a different task! */
+               set_fs(KERNEL_DS); 
+               rc = generic_file_read(file, (char *)(long)dst->addr, 
+                                      PAGE_SIZE, &offset); 
+               set_fs(oldfs);
+
+               if (rc != PAGE_SIZE) 
+                       return -EIO;
+       } else {
+               char *buf;
+
+               buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
+               if (!buf) {
+                       return -ENOMEM;
+               }
+
+               set_fs(KERNEL_DS); 
+               rc = generic_file_read(file, buf, PAGE_SIZE, &offset); 
+               set_fs(oldfs);
+
+               if (rc != PAGE_SIZE) 
+                       return -EIO;
+
+               req->rq_bulkbuf = buf;
+               req->rq_bulklen = PAGE_SIZE;
+               rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0);
+               init_waitqueue_head(&req->rq_wait_for_bulk);
+               sleep_on(&req->rq_wait_for_bulk);
+               kfree(buf);
+               req->rq_bulklen = 0; /* FIXME: eek. */
+       }
+
+       return 0;
+}
+
+/* XXX replace with networking code */
+int mds_reply(struct ptlrpc_request *req)
+{
+       struct ptlrpc_request *clnt_req = req->rq_reply_handle;
+
+       ENTRY;
+       
+       if (req->rq_obd->mds_service != NULL) {
+               /* This is a request that came from the network via portals. */
+
+               /* FIXME: we need to increment the count of handled events */
+               ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
+       } else {
+               /* This is a local request that came from another thread. */
+
+               /* move the reply to the client */ 
+               clnt_req->rq_replen = req->rq_replen;
+               clnt_req->rq_repbuf = req->rq_repbuf;
+               req->rq_repbuf = NULL;
+               req->rq_replen = 0;
+
+               /* free the request buffer */
+               kfree(req->rq_reqbuf);
+               req->rq_reqbuf = NULL;
+
+               /* wake up the client */ 
+               wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
+       }
+
+       EXIT;
+       return 0;
+}
+
+int mds_error(struct ptlrpc_request *req)
+{
+       struct ptlrep_hdr *hdr;
+
+       ENTRY;
+
+       hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
+       if (!hdr) { 
                EXIT;
-               return NULL;
+               return -ENOMEM;
+       }
+
+       memset(hdr, 0, sizeof(*hdr));
+       
+       hdr->seqno = req->rq_reqhdr->seqno;
+       hdr->status = req->rq_status; 
+       hdr->type = MDS_TYPE_ERR;
+
+       req->rq_repbuf = (char *)hdr;
+       req->rq_replen = sizeof(*hdr); 
+
+       EXIT;
+       return mds_reply(req);
+}
+
+struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt)
+{
+       /* stolen from NFS */ 
+       struct super_block *sb = mds->mds_sb; 
+       unsigned long ino = fid->id;
+       //__u32 generation = fid->generation;
+       __u32 generation = 0;
+       struct inode *inode;
+       struct list_head *lp;
+       struct dentry *result;
+
+       if (mnt) { 
+               *mnt = mntget(mds->mds_vfsmnt);
+       }
+
+       if (ino == 0)
+               return ERR_PTR(-ESTALE);
+
+       inode = iget(sb, ino);
+       if (inode == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       printk("--> mds_fid2dentry: sb %p\n", inode->i_sb); 
+
+       if (is_bad_inode(inode)
+           || (generation && inode->i_generation != generation)
+               ) {
+               /* we didn't find the right inode.. */
+               printk(__FUNCTION__ 
+                      "bad inode %lu, link: %d ct: %d or version  %u/%u\n",
+                       inode->i_ino,
+                       inode->i_nlink, atomic_read(&inode->i_count),
+                       inode->i_generation,
+                       generation);
+               iput(inode);
+               return ERR_PTR(-ESTALE);
        }
 
-       de->d_inode = inode;
-       return de;
+       /* now to find a dentry.
+        * If possible, get a well-connected one
+        */
+       spin_lock(&dcache_lock);
+       for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
+               result = list_entry(lp,struct dentry, d_alias);
+               if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
+                       dget_locked(result);
+                       result->d_vfs_flags |= DCACHE_REFERENCED;
+                       spin_unlock(&dcache_lock);
+                       iput(inode);
+                       return result;
+               }
+       }
+       spin_unlock(&dcache_lock);
+       result = d_alloc_root(inode);
+       if (result == NULL) {
+               iput(inode);
+               return ERR_PTR(-ENOMEM);
+       }
+       result->d_flags |= DCACHE_NFSD_DISCONNECTED;
+       return result;
 }
 
-int mds_getattr(struct mds_request *req)
+static inline void mds_get_objid(struct inode *inode, __u64 *id)
 {
-       struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req->fid1);
+       memcpy(id, &inode->u.ext2_i.i_data, sizeof(*id));
+}
+
+int mds_getattr(struct ptlrpc_request *req)
+{
+       struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
+                                          NULL);
        struct inode *inode;
-       struct mds_rep *rep = req->rq_rep;
+       struct mds_rep *rep;
        int rc;
        
-       rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep, 
+       rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds
                          &req->rq_replen, &req->rq_repbuf);
        if (rc) { 
                EXIT;
                printk("mds: out of memory\n");
                req->rq_status = -ENOMEM;
-               return -ENOMEM;
+               return 0;
        }
 
        req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
+       rep = req->rq_rep.mds;
 
        if (!de) { 
                EXIT;
@@ -73,6 +266,7 @@ int mds_getattr(struct mds_request *req)
        }
 
        inode = de->d_inode;
+       rep->ino = inode->i_ino;
        rep->atime = inode->i_atime;
        rep->ctime = inode->i_ctime;
        rep->mtime = inode->i_mtime;
@@ -80,61 +274,100 @@ int mds_getattr(struct mds_request *req)
        rep->gid = inode->i_gid;
        rep->size = inode->i_size;
        rep->mode = inode->i_mode;
-
+       rep->nlink = inode->i_nlink;
+       rep->valid = ~0;
+       mds_get_objid(inode, &rep->objid);
+       dput(de); 
        return 0;
 }
 
-int mds_reply(struct mds_request *req)
+int mds_readpage(struct ptlrpc_request *req)
 {
-       ENTRY;
-       kfree(req->rq_reqbuf);
-       req->rq_reqbuf = NULL; 
-       wake_up_interruptible(&req->rq_wait_for_mds_rep); 
-       EXIT;
-       return 0;
-}
+       struct vfsmount *mnt;
+       struct dentry *de = mds_fid2dentry(req->rq_obd, &req->rq_req.mds->fid1, 
+                                          &mnt);
+       struct file *file; 
+       struct niobuf *niobuf; 
+       struct mds_rep *rep;
+       int rc;
+       
+       printk("mds_readpage: ino %ld\n", de->d_inode->i_ino);
+       rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep.mds, 
+                         &req->rq_replen, &req->rq_repbuf);
+       if (rc) { 
+               EXIT;
+               printk("mds: out of memory\n");
+               req->rq_status = -ENOMEM;
+               return 0;
+       }
 
-int mds_error(struct mds_request *req)
-{
-       struct mds_rep_hdr *hdr;
+       req->rq_rephdr->seqno = req->rq_reqhdr->seqno;
+       rep = req->rq_rep.mds;
 
-       ENTRY;
-       hdr = kmalloc(sizeof(*hdr), GFP_KERNEL);
-       if (!hdr) { 
+       if (IS_ERR(de)) { 
                EXIT;
-               return -ENOMEM;
+               req->rq_rephdr->status = PTR_ERR(de); 
+               return 0;
        }
 
-       memset(hdr, 0, sizeof(*hdr));
-       
-       hdr->seqno = req->rq_reqhdr->seqno;
-       hdr->status = req->rq_status; 
-       hdr->type = MDS_TYPE_ERR;
-       req->rq_repbuf = (char *)hdr;
+       file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); 
+       /* note: in case of an error, dentry_open puts dentry */
+       if (IS_ERR(file)) { 
+               EXIT;
+               req->rq_rephdr->status = PTR_ERR(file);
+               return 0;
+       }
+               
+       niobuf = mds_req_tgt(req->rq_req.mds);
+
+       /* to make this asynchronous make sure that the handling function 
+          doesn't send a reply when this function completes. Instead a 
+          callback function would send the reply */ 
+       rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf); 
 
+       filp_close(file, 0);
+       req->rq_rephdr->status = rc;
        EXIT;
-       return mds_reply(req);
+       return 0;
+}
+
+int mds_reint(struct ptlrpc_request *req)
+{
+       int rc;
+       char *buf = mds_req_tgt(req->rq_req.mds);
+       int len = req->rq_req.mds->tgtlen;
+       struct mds_update_record rec;
+       
+       rc = mds_update_unpack(buf, len, &rec);
+       if (rc) { 
+               printk(__FUNCTION__ ": invalid record\n");
+               req->rq_status = -EINVAL;
+               return 0;
+       }
+       /* rc will be used to interrupt a for loop over multiple records */
+       rc = mds_reint_rec(&rec, req); 
+       return 0; 
 }
 
 //int mds_handle(struct mds_conn *conn, int len, char *buf)
-int mds_handle(struct mds_request *req)
+int mds_handle(struct ptlrpc_request *req)
 {
        int rc;
-       struct mds_req_hdr *hdr;
+       struct ptlreq_hdr *hdr;
 
        ENTRY;
 
-       hdr = (struct mds_req_hdr *)req->rq_reqbuf;
+       hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
 
-       if (NTOH_u32(hdr->type) != MDS_TYPE_REQ) {
+       if (NTOH__u32(hdr->type) != MDS_TYPE_REQ) {
                printk("lustre_mds: wrong packet type sent %d\n",
-                      NTOH_u32(hdr->type));
+                      NTOH__u32(hdr->type));
                rc = -EINVAL;
                goto out;
        }
 
        rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen, 
-                           &req->rq_reqhdr, &req->rq_req);
+                           &req->rq_reqhdr, &req->rq_req.mds);
        if (rc) { 
                printk("lustre_mds: Invalid request\n");
                EXIT; 
@@ -148,35 +381,15 @@ int mds_handle(struct mds_request *req)
                rc = mds_getattr(req);
                break;
 
-       case MDS_OPEN:
-               return mds_getattr(req);
-
-       case MDS_SETATTR:
-               return mds_getattr(req);
-
-       case MDS_CREATE:
-               return mds_getattr(req);
-
-       case MDS_MKDIR:
-               return mds_getattr(req);
-
-       case MDS_RMDIR:
-               return mds_getattr(req);
-
-       case MDS_SYMLINK:
-               return mds_getattr(req);
-       case MDS_LINK:
-               return mds_getattr(req);
-  
-       case MDS_MKNOD:
-               return mds_getattr(req);
-
-       case MDS_UNLINK:
-               return mds_getattr(req);
+       case MDS_READPAGE:
+               CDEBUG(D_INODE, "readpage\n");
+               rc = mds_readpage(req);
+               break;
 
-       case MDS_RENAME:
-               return mds_getattr(req);
+       case MDS_REINT:
+               CDEBUG(D_INODE, "reint\n");
+               rc = mds_reint(req);
+               break;
 
        default:
                return mds_error(req);
@@ -184,7 +397,11 @@ int mds_handle(struct mds_request *req)
 
 out:
        if (rc) { 
-               printk("mds: processing error %d\n", rc);
+               printk(__FUNCTION__ ": no header\n");
+               return 0;
+       }
+
+       if( req->rq_status) { 
                mds_error(req);
        } else { 
                CDEBUG(D_INODE, "sending reply\n"); 
@@ -227,25 +444,62 @@ int mds_main(void *arg)
        mds->mds_thread = current;
        wake_up(&mds->mds_done_waitq); 
 
-       printk(KERN_INFO "lustre_mds starting.  Commit interval %ld seconds\n",
+       printk(KERN_INFO "lustre_mds starting.  Commit interval %d seconds\n",
                        mds->mds_interval / HZ);
 
        /* XXX maintain a list of all managed devices: insert here */
 
        /* And now, wait forever for commit wakeup events. */
        while (1) {
+               int rc;
+
                if (mds->mds_flags & MDS_UNMOUNT)
                        break;
 
-
                wake_up(&mds->mds_done_waitq);
                interruptible_sleep_on(&mds->mds_waitq);
 
                CDEBUG(D_INODE, "lustre_mds wakes\n");
                CDEBUG(D_INODE, "pick up req here and continue\n"); 
 
-               if (list_empty(&mds->mds_reqs)) { 
-                       CDEBUG(D_INODE, "woke because of timer\n"); 
+               if (mds->mds_service != NULL) {
+                       ptl_event_t ev;
+
+                       while (1) {
+                               struct ptlrpc_request request;
+
+                               rc = PtlEQGet(mds->mds_service->srv_eq, &ev);
+                               if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
+                                       break;
+                               /* FIXME: If we move to an event-driven model,
+                                * we should put the request on the stack of
+                                * mds_handle instead. */
+                               memset(&request, 0, sizeof(request));
+                               request.rq_reqbuf = ev.mem_desc.start +
+                                       ev.offset;
+                               request.rq_reqlen = ev.mem_desc.length;
+                               request.rq_obd = MDS;
+                               request.rq_xid = ev.match_bits;
+
+                               request.rq_peer.peer_nid = ev.initiator.nid;
+                               /* FIXME: this NI should be the incoming NI.
+                                * We don't know how to find that from here. */
+                               request.rq_peer.peer_ni =
+                                       mds->mds_service->srv_self.peer_ni;
+                               rc = mds_handle(&request);
+                       }
+               } else {
+                       struct ptlrpc_request *request;
+
+                       if (list_empty(&mds->mds_reqs)) {
+                               CDEBUG(D_INODE, "woke because of timer\n");
+                       } else {
+                               request = list_entry(mds->mds_reqs.next,
+                                                    struct ptlrpc_request,
+                                                    rq_list);
+                               list_del(&request->rq_list);
+                               rc = mds_handle(request);
+                       }
                }
        }
 
@@ -287,11 +541,12 @@ static int mds_setup(struct obd_device *obddev, obd_count len,
        struct obd_ioctl_data* data = buf;
        struct mds_obd *mds = &obddev->u.mds;
        struct vfsmount *mnt;
+       struct lustre_peer peer;
        int err; 
         ENTRY;
-       
-       mnt = do_kern_mount(data->ioc_inlbuf2, 0, 
-                           data->ioc_inlbuf1, NULL); 
+
+
+       mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
        err = PTR_ERR(mnt);
        if (IS_ERR(mnt)) { 
                EXIT;
@@ -299,23 +554,40 @@ static int mds_setup(struct obd_device *obddev, obd_count len,
        }
 
        mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
-       if (!obddev->u.mds.mds_sb) {
-               EXIT;
-               return -ENODEV;
-       }
+       if (!obddev->u.mds.mds_sb) {
+               EXIT;
+               return -ENODEV;
+       }
 
-       INIT_LIST_HEAD(&mds->mds_reqs);
-       mds->mds_thread = NULL;
-       mds->mds_flags = 0;
-       mds->mds_interval = 3 * HZ;
        mds->mds_vfsmnt = mnt;
        obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
 
        mds->mds_ctxt.pwdmnt = mnt;
        mds->mds_ctxt.pwd = mnt->mnt_root;
        mds->mds_ctxt.fs = KERNEL_DS;
+       mds->mds_remote_nid = 0;
 
-       spin_lock_init(&obddev->u.mds.fo_lock);
+       INIT_LIST_HEAD(&mds->mds_reqs);
+       mds->mds_thread = NULL;
+       mds->mds_flags = 0;
+       mds->mds_interval = 3 * HZ;
+       MDS = mds;
+
+       spin_lock_init(&obddev->u.mds.mds_lock);
+
+       err = kportal_uuid_to_peer("self", &peer);
+       if (err == 0) {
+               mds->mds_service = kmalloc(sizeof(*mds->mds_service),
+                                                 GFP_KERNEL);
+               if (mds->mds_service == NULL)
+                       return -ENOMEM;
+               mds->mds_service->srv_buf_size = 64 * 1024;
+               mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
+               memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
+               mds->mds_service->srv_wait_queue = &mds->mds_waitq;
+
+               rpc_register_service(mds->mds_service, "self");
+       }
 
        mds_start_srv_thread(mds);
 
@@ -342,6 +614,7 @@ static int mds_cleanup(struct obd_device * obddev)
                 return -EBUSY;
         }
 
+       MDS = NULL;
        mds_stop_srv_thread(mds);
         sb = mds->mds_sb;
         if (!mds->mds_sb){
@@ -374,8 +647,8 @@ static struct obd_ops mds_obd_ops = {
 
 static int __init mds_init(void)
 {
-        printk(KERN_INFO "Lustre MDS v0.01, braam@clusterfs.com\n");
         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
+       return 0;
 }
 
 static void __exit mds_exit(void)
@@ -384,8 +657,12 @@ static void __exit mds_exit(void)
 }
 
 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
-MODULE_DESCRIPTION("Lustre Metadata server module");
+MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
 MODULE_LICENSE("GPL");
 
+
+// for testing (maybe this stays)
+EXPORT_SYMBOL(mds_queue_req);
+
 module_init(mds_init);
 module_exit(mds_exit);