Whamcloud - gitweb
- new RPC infrastructure:
authorbraam <braam>
Thu, 31 Jan 2002 21:27:21 +0000 (21:27 +0000)
committerbraam <braam>
Thu, 31 Jan 2002 21:27:21 +0000 (21:27 +0000)
 - struct service with callback functions, buffers and Portals support
 - services are Portal aware, but the old null transport may still work.
 - lustre peer moved to Portals, as was all connection management
 -
- fixed ptl entry constants for incoming REQUESTS, REPLIES for MDS, OST etc.
- all rpc's now have a peer destination (which can be NULL)
- replies are now pre-allocated
- the XID's are used to number transactions _and_ as match entries
- packet movement differentiates Portals and non-portals case.
- reception of packets uses the new Portals callbacks
- module initialization finds peer's by UUID.  For now "mds" is a UUID.
- callbacks are used for (1) incoming requests (2) freeing outgoing packets
(3) incomding reply.  Code reuse was possible.
- adapted all the test scripts (mdc and llmount)

14 files changed:
lustre/include/linux/lustre_light.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/llite/namei.c
lustre/llite/rw.c
lustre/llite/super.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/ost/ost_handler.c
lustre/ptlrpc/rpc.c
lustre/tests/llmount.sh
lustre/tests/mdcreq.sh
lustre/tests/mdcreqcleanup.sh [new file with mode: 0755]

index a8d835e..e40877d 100644 (file)
@@ -14,6 +14,7 @@
 #include <linux/obd_class.h>
 #include <linux/obdo.h>
 #include <linux/list.h>
+#include <linux/lustre_net.h>
 
 #define LL_SUPER_MAGIC 0x0BD00BD0;
 
@@ -34,6 +35,8 @@ struct ll_sb_info {
         struct list_head         ll_inodes;    /* list of dirty inodes */
         unsigned long            ll_cache_count;
         struct semaphore         ll_list_mutex;
+       struct lustre_peer       ll_peer;
+       struct lustre_peer      *ll_peer_ptr;
 };
 
 
index a880433..cfb58a7 100644 (file)
@@ -58,6 +58,8 @@ struct mds_obd {
        struct file_operations *mds_fop; 
        struct inode_operations *mds_iop;
        struct address_space_operations *mds_aops;
+
+        struct ptlrpc_service *mds_service;
 };
 
 
index b7b4187..f403154 100644 (file)
 #include <portals/p30.h>
 #include <linux/lustre_idl.h>
 
-#define OSC_PORTAL 1
-#define MDS_PORTAL 2
-#define OST_PORTAL 3
-
-struct lustre_peer {
-        ptl_handle_ni_t peer_ni;
-        __u32 peer_nid;
-};
+#define OSC_REQUEST_PORTAL 1
+#define OSC_REPLY_PORTAL   2
+#define MDS_REQUEST_PORTAL 3
+#define MDS_REPLY_PORTAL   4
+#define OST_REQUEST_PORTAL 5
+#define OST_REPLY_PORTAL   6
 
 struct ptlrpc_service {
+        char *srv_buf;
         __u32 srv_buf_size;
-        void (* srv_callback)(ptl_event_t *, void *data);
         __u32 srv_ring_length;
+        __u32 srv_portal;
+
+        struct lustre_peer srv_self;
+
+        /* FIXME: perhaps a list of EQs, if multiple NIs are used? */
+        ptl_handle_eq_t srv_eq;
+
+        ptl_handle_me_t srv_me;
+        ptl_process_id_t srv_id;
+        ptl_md_t srv_md;
+        ptl_handle_md_t srv_md_h;
+        wait_queue_head_t *srv_wait_queue;
 };
 
 
@@ -53,8 +63,9 @@ struct ptlrpc_request {
        int rq_reqlen;
        struct ptlreq_hdr *rq_reqhdr;
        union ptl_req rq_req;
+        __u32 rq_xid;
 
-       char *rq_repbuf;
+       char *rq_repbuf;
        int rq_replen;
        struct ptlrep_hdr *rq_rephdr;
        union ptl_rep rq_rep;
@@ -66,10 +77,15 @@ struct ptlrpc_request {
         ptl_md_t rq_req_md;
         __u32 rq_reply_portal;
         __u32 rq_req_portal;
+
+        struct lustre_peer rq_peer;
 };
 
 /* rpc/rpc.c */
+int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
+                 int portal, int is_request);
 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer);
+int rpc_register_service(struct ptlrpc_service *service, char *uuid);
 
 /* FIXME */
 #if 1
index fb7531f..afb5b14 100644 (file)
@@ -82,6 +82,7 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry)
        struct mds_rep *rep; 
        struct ptlrep_hdr *hdr = NULL; 
        struct inode * inode = NULL;
+        struct ll_sb_info *sbi;
        int err;
        int type;
        ino_t ino;
@@ -94,8 +95,10 @@ static struct dentry *ll_lookup(struct inode * dir, struct dentry *dentry)
        if (!ino)
                goto negative;
 
-       err = mdc_getattr(ino, type, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 
-                        &rep, &hdr);
+        sbi = (struct ll_sb_info *)(&dentry->d_inode->i_sb->u.generic_sbp);
+
+       err = mdc_getattr(sbi->ll_peer_ptr, ino, type,
+                         OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, &rep, &hdr);
         if ( err ) {
                 printk(__FUNCTION__ ": obdo_fromid failed\n");
                 EXIT;
@@ -137,10 +140,12 @@ static struct inode *ll_create_node(struct inode *dir, const char *name,
        struct mds_rep *rep;
        struct ptlrep_hdr *hdr;
         int err;
+        struct ll_sb_info *sbi =
+               (struct ll_sb_info *)(&dir->i_sb->u.generic_sbp);
 
         ENTRY;
 
-       err = mdc_create(dir, name, namelen, mode, id,
+       err = mdc_create(sbi->ll_peer_ptr, dir, name, namelen, mode, id,
                         current->uid, current->gid, CURRENT_TIME, 
                         &rep, &hdr); 
        if (err) { 
index 1a04d52..a109efd 100644 (file)
@@ -244,6 +244,8 @@ int ll_readpage(struct file *file, struct page *page)
 int ll_dir_readpage(struct file *file, struct page *page)
 {
        struct inode *inode = page->mapping->host;
+        struct ll_sb_info *sbi =
+               (struct ll_sb_info *)(&inode->i_sb->u.generic_sbp);
        char *buf;
        __u64 offset;
         int rc = 0;
@@ -265,7 +267,8 @@ int ll_dir_readpage(struct file *file, struct page *page)
 
        offset = page->index << PAGE_SHIFT; 
        buf = kmap(page);
-        rc = mdc_readpage(inode->i_ino, S_IFDIR, offset, buf, NULL, &hdr);
+        rc = mdc_readpage(sbi->ll_peer_ptr, inode->i_ino, S_IFDIR, offset, buf,
+                         NULL, &hdr);
        kunmap(buff); 
         if ( rc ) {
                EXIT; 
index a214135..337d1e0 100644 (file)
@@ -126,6 +126,10 @@ static struct super_block * ll_read_super(struct super_block *sb,
         }
        connected = 1;
 
+       err = kportal_uuid_to_peer("mds", &sbi->ll_peer);
+       if (err == 0)
+               sbi->ll_peer_ptr = &sbi->ll_peer;
+
         sbi->ll_super = sb;
        sbi->ll_rootino = 2;
 
@@ -136,7 +140,7 @@ static struct super_block * ll_read_super(struct super_block *sb,
         sb->s_op = &ll_super_operations;
 
         /* make root inode */
-       err = mdc_getattr(sbi->ll_rootino, S_IFDIR, 
+       err = mdc_getattr(sbi->ll_peer_ptr, sbi->ll_rootino, S_IFDIR, 
                          OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 
                          &rep, &hdr);
         if (err) {
@@ -238,6 +242,8 @@ int ll_setattr(struct dentry *de, struct iattr *attr)
 {
         struct inode *inode = de->d_inode;
        struct ptlrep_hdr *hdr = NULL;
+        struct ll_sb_info *sbi =
+               (struct ll_sb_info *)(&inode->i_sb->u.generic_sbp);
        int err;
 
         ENTRY;
@@ -245,7 +251,7 @@ int ll_setattr(struct dentry *de, struct iattr *attr)
        /* change incore inode */
        ll_attr2inode(inode, attr);
 
-       err = mdc_setattr(inode, attr, NULL, &hdr); 
+       err = mdc_setattr(sbi->ll_peer_ptr, inode, attr, NULL, &hdr); 
         if ( err )
                 printk(__FUNCTION__ ": ll_setattr fails (%d)\n", err);
 
index afa78f6..3c9b939 100644 (file)
@@ -51,8 +51,12 @@ int mdc_setattr(struct lustre_peer *peer,
        rec = mds_req_tgt(request->rq_req.mds);
        mds_setattr_pack(rec, inode, iattr); 
        request->rq_req.mds->opcode = HTON__u32(REINT_SETATTR);
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep);
 
        rc = mdc_reint(peer, request);
+       if (rc)
+               return rc;
 
        if (rep) { 
                *rep = request->rq_rep.mds;
@@ -61,8 +65,7 @@ int mdc_setattr(struct lustre_peer *peer,
                *hdr = request->rq_rephdr;
        }
 
-       kfree(request); 
-       return rc;
+       return 0;
 }
 
 int mdc_create(struct lustre_peer *peer, 
@@ -82,6 +85,9 @@ int mdc_create(struct lustre_peer *peer,
                return -ENOMEM;
        }
 
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep);
+
        rec = mds_req_tgt(request->rq_req.mds);
        mds_create_pack(rec, dir, name, namelen, mode, id, uid, gid, time); 
 
index a373f6c..b59305f 100644 (file)
@@ -35,6 +35,9 @@
 
 extern int mds_queue_req(struct ptlrpc_request *);
 
+/* FIXME: this belongs in some sort of service struct */
+static int mdc_xid = 0;
+
 struct ptlrpc_request *mds_prep_req(int opcode, int namelen, char *name, int tgtlen, char *tgt)
 {
        struct ptlrpc_request *request;
@@ -47,6 +50,8 @@ struct ptlrpc_request *mds_prep_req(int opcode, int namelen, char *name, int tgt
                return NULL;
        }
 
+       request->rq_xid = mdc_xid++;
+
        rc = mds_pack_req(name, namelen, tgt, tgtlen,
                          &request->rq_reqhdr, &(request->rq_req.mds),
                          &request->rq_reqlen, &request->rq_reqbuf);
@@ -71,10 +76,17 @@ static int mds_queue_wait(struct ptlrpc_request *req, struct lustre_peer *peer)
        int rc;
 
        /* XXX fix the race here (wait_for_event?)*/
-       /* hand the packet over to the server */
-       rc = ptl_send_rpc(req, peer);
+       if (peer == NULL) {
+               /* Local delivery */
+               rc = mds_queue_req(req); 
+       } else {
+               /* Remote delivery via portals. */
+               req->rq_req_portal = MDS_REQUEST_PORTAL;
+               req->rq_reply_portal = MDS_REPLY_PORTAL;
+               rc = ptl_send_rpc(req, peer);
+       }
        if (rc) { 
-               printk("mdc_queue_wait: error %d, opcode %d\n", rc, 
+               printk(__FUNCTION__ ": error %d, opcode %d\n", rc, 
                       req->rq_reqhdr->opc); 
                return -rc;
        }
@@ -84,15 +96,20 @@ static int mds_queue_wait(struct ptlrpc_request *req, struct lustre_peer *peer)
        interruptible_sleep_on(&req->rq_wait_for_rep);
        printk("-- done\n");
 
-       mds_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
-                      &req->rq_rep.mds); 
+       rc = mds_unpack_rep(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, 
+                           &req->rq_rep.mds);
+       if (rc) {
+               printk(__FUNCTION__ ": mds_unpack_rep failed: %d\n", rc);
+               return rc;
+       }
+
        if ( req->rq_rephdr->status == 0 )
                printk("-->mdc_queue_wait: buf %p len %d status %d\n", 
                       req->rq_repbuf, req->rq_replen, 
                       req->rq_rephdr->status); 
 
        EXIT;
-       return req->rq_rephdr->status;
+       return 0;
 }
 
 void mds_free_req(struct ptlrpc_request *request)
@@ -161,6 +178,9 @@ int mdc_readpage(struct lustre_peer *peer, ino_t ino, int type, __u64 offset,
        request->rq_req.mds->size = offset;
        request->rq_req.mds->tgtlen = sizeof(niobuf); 
 
+       request->rq_replen = 
+               sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep);
+
        rc = mds_queue_wait(request, peer);
        if (rc) { 
                printk("mdc request: error in handling %d\n", rc); 
@@ -198,7 +218,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
                       unsigned int cmd, unsigned long arg)
 {
        int err;
-       struct lustre_peer peer
+       struct lustre_peer peer, *peer_ptr = NULL;
 
        ENTRY;
 
@@ -216,13 +236,15 @@ static int request_ioctl(struct inode *inode, struct file *file,
                 return -EINVAL;
         }
 
-       //rc = ptl_peer("mds", peer); 
+       err = kportal_uuid_to_peer("mds", &peer);
+       if (err == 0)
+               peer_ptr = &peer;
        
        switch (cmd) {
        case IOC_REQUEST_GETATTR: { 
                struct ptlrep_hdr *hdr = NULL;
                printk("-- getting attr for ino 2\n"); 
-               err = mdc_getattr(&peer, 2, S_IFDIR, ~0, NULL, &hdr);
+               err = mdc_getattr(peer_ptr, 2, S_IFDIR, ~0, NULL, &hdr);
                if (hdr)
                        kfree(hdr);
                printk("-- done err %d\n", err);
@@ -238,7 +260,7 @@ static int request_ioctl(struct inode *inode, struct file *file,
                        break;
                }
                printk("-- readpage 0 for ino 2\n"); 
-               err = mdc_readpage(&peer, 2, S_IFDIR, 0, buf, NULL, &hdr);
+               err = mdc_readpage(peer_ptr, 2, S_IFDIR, 0, buf, NULL, &hdr);
                printk("-- done err %d\n", err);
                if (!err) { 
                        printk("-- status: %d\n", hdr->status); 
@@ -260,13 +282,14 @@ static int request_ioctl(struct inode *inode, struct file *file,
                iattr.ia_atime = 0;
                iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
 
-               err = mdc_setattr(&peer, &inode, &iattr, NULL, &hdr);
+               err = mdc_setattr(peer_ptr, &inode, &iattr, NULL, &hdr);
                printk("-- done err %d\n", err);
                if (!err) { 
                        printk("-- status: %d\n", hdr->status); 
                        err = hdr->status;
+               } else {
+                       kfree(hdr); 
                }
-               kfree(hdr); 
                break;
        }
 
@@ -280,8 +303,8 @@ static int request_ioctl(struct inode *inode, struct file *file,
                iattr.ia_atime = 0;
                iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
 
-               err = mdc_create(&peer, &inode, "foofile", strlen("foofile"), 
-                                0100707, 47114711, 
+               err = mdc_create(peer_ptr, &inode, "foofile",
+                                strlen("foofile"), 0100707, 47114711, 
                                 11, 47, 0, NULL, &hdr);
                printk("-- done err %d\n", err);
                if (!err) { 
index da72b64..72bfa8b 100644 (file)
@@ -95,18 +95,28 @@ int mds_reply(struct ptlrpc_request *req)
 
        ENTRY;
        
-       /* move the reply to the client */ 
-       clnt_req->rq_replen = req->rq_replen;
-       clnt_req->rq_repbuf = req->rq_repbuf;
-       req->rq_repbuf = NULL;
-       req->rq_replen = 0;
-
-       /* free the request buffer */
-       kfree(req->rq_reqbuf);
-       req->rq_reqbuf = NULL; 
-
-       /* wake up the client */ 
-       wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
+       if (req->rq_obd->mds_service != NULL) {
+               /* This is a request that came from the network via portals. */
+
+               /* FIXME: we need to increment the count of handled events */
+               ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0);
+       } else {
+               /* This is a local request that came from another thread. */
+
+               /* move the reply to the client */ 
+               clnt_req->rq_replen = req->rq_replen;
+               clnt_req->rq_repbuf = req->rq_repbuf;
+               req->rq_repbuf = NULL;
+               req->rq_replen = 0;
+
+               /* free the request buffer */
+               kfree(req->rq_reqbuf);
+               req->rq_reqbuf = NULL;
+
+               /* wake up the client */ 
+               wake_up_interruptible(&clnt_req->rq_wait_for_rep); 
+       }
+
        EXIT;
        return 0;
 }
@@ -415,26 +425,55 @@ int mds_main(void *arg)
 
        /* And now, wait forever for commit wakeup events. */
        while (1) {
-               struct ptlrpc_request *request;
-               int rc; 
+               int rc;
 
                if (mds->mds_flags & MDS_UNMOUNT)
                        break;
 
-
                wake_up(&mds->mds_done_waitq);
                interruptible_sleep_on(&mds->mds_waitq);
 
                CDEBUG(D_INODE, "lustre_mds wakes\n");
                CDEBUG(D_INODE, "pick up req here and continue\n"); 
 
-               if (list_empty(&mds->mds_reqs)) { 
-                       CDEBUG(D_INODE, "woke because of timer\n"); 
-               } else { 
-                       request = list_entry(mds->mds_reqs.next, 
-                                            struct ptlrpc_request, rq_list);
-                       list_del(&request->rq_list);
-                       rc = mds_handle(request); 
+               if (mds->mds_service != NULL) {
+                       ptl_event_t ev;
+
+                       while (1) {
+                               struct ptlrpc_request request;
+
+                               rc = PtlEQGet(mds->mds_service->srv_eq, &ev);
+                               if (rc != PTL_OK && rc != PTL_EQ_DROPPED)
+                                       break;
+                               /* FIXME: If we move to an event-driven model,
+                                * we should put the request on the stack of
+                                * mds_handle instead. */
+                               memset(&request, 0, sizeof(request));
+                               request.rq_reqbuf = ev.mem_desc.start +
+                                       ev.offset;
+                               request.rq_reqlen = ev.mem_desc.length;
+                               request.rq_obd = MDS;
+                               request.rq_xid = ev.match_bits;
+
+                               request.rq_peer.peer_nid = ev.initiator.nid;
+                               /* FIXME: this NI should be the incoming NI.
+                                * We don't know how to find that from here. */
+                               request.rq_peer.peer_ni =
+                                       mds->mds_service->srv_self.peer_ni;
+                               rc = mds_handle(&request);
+                       }
+               } else {
+                       struct ptlrpc_request *request;
+
+                       if (list_empty(&mds->mds_reqs)) {
+                               CDEBUG(D_INODE, "woke because of timer\n");
+                       } else {
+                               request = list_entry(mds->mds_reqs.next,
+                                                    struct ptlrpc_request,
+                                                    rq_list);
+                               list_del(&request->rq_list);
+                               rc = mds_handle(request);
+                       }
                }
        }
 
@@ -475,50 +514,32 @@ static int mds_setup(struct obd_device *obddev, obd_count len,
 {
        struct obd_ioctl_data* data = buf;
        struct mds_obd *mds = &obddev->u.mds;
+       struct vfsmount *mnt;
+       struct lustre_peer peer;
+       int err; 
         ENTRY;
 
-        /* If the ioctl data contains two inline buffers, this is a request to
-         * mount a local filesystem for metadata storage.  If the ioctl data
-         * contains one buffer, however, it is the UUID of the remote node that
-         * we will contact for metadata. */
-        if (data->ioc_inllen2 == 0) {
-                __u32 nid;
-
-                nid = kportal_uuid_to_nid(data->ioc_inlbuf1);
-                if (nid == 0) {
-                        printk("Lustre: uuid_to_nid failed; use ptlctl to "
-                               "associate this uuid with a NID\n");
-                        EXIT;
-                        return -EINVAL;
-                }
-                printk("Lustre MDS: remote nid is %u\n", nid);
-                mds->mds_remote_nid = nid;
-        } else {
-                struct vfsmount *mnt;
-                int err; 
-
-                mnt = do_kern_mount(data->ioc_inlbuf2, 0, 
-                                    data->ioc_inlbuf1, NULL); 
-                err = PTR_ERR(mnt);
-                if (IS_ERR(mnt)) { 
-                        EXIT;
-                        return err;
-                }
-
-                mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
-                if (!obddev->u.mds.mds_sb) {
-                        EXIT;
-                        return -ENODEV;
-                }
-
-                mds->mds_vfsmnt = mnt;
-                obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
-
-                mds->mds_ctxt.pwdmnt = mnt;
-                mds->mds_ctxt.pwd = mnt->mnt_root;
-                mds->mds_ctxt.fs = KERNEL_DS;
-                mds->mds_remote_nid = 0;
-        }
+
+       mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL); 
+       err = PTR_ERR(mnt);
+       if (IS_ERR(mnt)) { 
+               EXIT;
+               return err;
+       }
+
+       mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
+       if (!obddev->u.mds.mds_sb) {
+               EXIT;
+               return -ENODEV;
+       }
+
+       mds->mds_vfsmnt = mnt;
+       obddev->u.mds.mds_fstype = strdup(data->ioc_inlbuf2);
+
+       mds->mds_ctxt.pwdmnt = mnt;
+       mds->mds_ctxt.pwd = mnt->mnt_root;
+       mds->mds_ctxt.fs = KERNEL_DS;
+       mds->mds_remote_nid = 0;
 
        INIT_LIST_HEAD(&mds->mds_reqs);
        mds->mds_thread = NULL;
@@ -528,6 +549,20 @@ static int mds_setup(struct obd_device *obddev, obd_count len,
 
        spin_lock_init(&obddev->u.mds.mds_lock);
 
+       err = kportal_uuid_to_peer("self", &peer);
+       if (err == 0) {
+               mds->mds_service = kmalloc(sizeof(*mds->mds_service),
+                                                 GFP_KERNEL);
+               if (mds->mds_service == NULL)
+                       return -ENOMEM;
+               mds->mds_service->srv_buf_size = 64 * 1024;
+               mds->mds_service->srv_portal = MDS_REQUEST_PORTAL;
+               memcpy(&mds->mds_service->srv_self, &peer, sizeof(peer));
+               mds->mds_service->srv_wait_queue = &mds->mds_waitq;
+
+               rpc_register_service(mds->mds_service, "self");
+       }
+
        mds_start_srv_thread(mds);
 
         MOD_INC_USE_COUNT;
index ba337ae..da3eed9 100644 (file)
@@ -310,6 +310,7 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
 }
 
 
+#if 0
 static struct page * ext2_get_page(struct inode *dir, unsigned long n)
 {
        struct address_space *mapping = dir->i_mapping;
@@ -332,8 +333,6 @@ fail:
        return ERR_PTR(-EIO);
 }
 
-#if 0
-
 static inline void ext2_put_page(struct page *page)
 {
        kunmap(page);
index 19455a1..f529207 100644 (file)
 #include <linux/obd_support.h>
 #include <linux/lustre_net.h>
 
-ptl_handle_ni_t LUSTRE_NI;
 static ptl_handle_eq_t req_eq;
-static int req_initialized = 0;
 
-/* This seems silly now, but some day we'll have more than one NI */
-int req_get_peer(__u32 nid, struct lustre_peer *peer)
-{
-        peer->peer_ni = req_ni;
-        peer->peer_nid = nid;
-
-        return 0;
-}
 static int request_callback(ptl_event_t *ev, void *data)
 {
         struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
@@ -48,32 +38,50 @@ static int request_callback(ptl_event_t *ev, void *data)
         ENTRY;
 
         if (ev->type == PTL_EVENT_SENT) {
-                kfree(rpc->rq_reqbuf);
+                kfree(ev->mem_desc.start);
         } else if (ev->type == PTL_EVENT_PUT) {
-                struct ptlrpc_request *clnt_rpc = rpc->rq_reply_handle;
-
                 rpc->rq_repbuf = ev->mem_desc.start + ev->offset;
-
-                wake_up_interruptible(&clnt_rpc->rq_wait_for_rep);
+                wake_up_interruptible(&rpc->rq_wait_for_rep);
         }
 
         EXIT;
         return 1;
 }
 
+static int incoming_callback(ptl_event_t *ev, void *data)
+{
+        struct ptlrpc_service *service = data;
+
+        ENTRY;
+
+        if (ev->type == PTL_EVENT_PUT) {
+                wake_up(service->srv_wait_queue);
+        } else {
+                printk("Unexpected event type: %d\n", ev->type);
+        }
+
+        EXIT;
+        return 0;
+}
+
 int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
-                 int portal)
+                 int portal, int is_request)
 {
         int rc;
         ptl_process_id_t remote_id;
         ptl_handle_md_t md_h;
 
-        request->rq_req_md.start = request->rq_reqbuf;
-        request->rq_req_md.length = request->rq_reqlen;
+        if (is_request) {
+                request->rq_req_md.start = request->rq_reqbuf;
+                request->rq_req_md.length = request->rq_reqlen;
+        } else {
+                request->rq_req_md.start = request->rq_repbuf;
+                request->rq_req_md.length = request->rq_replen;
+        }
         request->rq_req_md.threshold = PTL_MD_THRESH_INF;
         request->rq_req_md.options = PTL_MD_OP_PUT;
         request->rq_req_md.user_ptr = request;
-        request->rq_req_md.eventq = PTL_EQ_NONE;
+        request->rq_req_md.eventq = req_eq;
 
         rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h);
         if (rc != 0) {
@@ -85,7 +93,8 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
         remote_id.nid = peer->peer_nid;
         remote_id.pid = 0;
 
-        rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, 0, 0, 0);
+        rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid,
+                    0, 0);
         if (rc != PTL_OK) {
                 printk(__FUNCTION__ ": PtlPut failed: %d\n", rc);
                 /* FIXME: tear down md */
@@ -114,7 +123,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         local_id.rid = PTL_ID_ANY;
 
         rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
-                         0, ~0, PTL_RETAIN, &me_h);
+                         request->rq_xid, 0, PTL_RETAIN, &me_h);
         if (rc != PTL_OK) {
                 EXIT;
                 /* FIXME: tear down EQ, free reqbuf */
@@ -134,20 +143,81 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
                 return rc;
         }
 
-        return ptl_send_buf(request, peer, request->rq_req_portal);
+        return ptl_send_buf(request, peer, request->rq_req_portal, 1);
 }
 
+int rpc_register_service(struct ptlrpc_service *service, char *uuid)
+{
+        struct lustre_peer peer;
+        int rc;
+
+        rc = kportal_uuid_to_peer(uuid, &peer);
+        if (rc != 0) {
+                printk("Invalid uuid \"%s\"\n", uuid);
+                return -EINVAL;
+        }
 
-//int req_init_event_queue(struct lustre_peer *peer);
+        service->srv_buf = kmalloc(service->srv_buf_size, GFP_KERNEL);
+        if (service->srv_buf == NULL) {
+                printk(__FUNCTION__ ": no memory\n");
+                return -ENOMEM;
+        }
+
+        service->srv_id.addr_kind = PTL_ADDR_GID;
+        service->srv_id.gid = PTL_ID_ANY;
+        service->srv_id.rid = PTL_ID_ANY;
+
+       rc = PtlMEAttach(peer.peer_ni, service->srv_portal, service->srv_id,
+                         0, ~0, PTL_RETAIN, &service->srv_me);
+        if (rc != PTL_OK) {
+                printk("PtlMEAttach failed: %d\n", rc);
+                return rc;
+        }
+
+        rc = PtlEQAlloc(peer.peer_ni, 128, incoming_callback, service,
+                        &service->srv_eq);
+        if (rc != PTL_OK) {
+                printk("PtlEQAlloc failed: %d\n", rc);
+                return rc;
+        }
+
+        /* FIXME: Build an auto-unlinking MD and build a ring. */
+        /* FIXME: Make sure that these are reachable by DMA on well-known
+         * addresses. */
+       service->srv_md.start           = service->srv_buf;
+       service->srv_md.length          = service->srv_buf_size;
+       service->srv_md.threshold       = PTL_MD_THRESH_INF;
+       service->srv_md.options         = PTL_MD_OP_PUT;
+       service->srv_md.user_ptr        = service;
+       service->srv_md.eventq          = service->srv_eq;
+
+       rc = PtlMDAttach(service->srv_me, service->srv_md,
+                         PTL_RETAIN, &service->srv_md_h);
+        if (rc != PTL_OK) {
+                printk("PtlMDAttach failed: %d\n", rc);
+                /* FIXME: wow, we need to clean up. */
+                return rc;
+        }
+
+        return 0;
+}
 
 static int req_init_portals(void)
 {
         int rc;
-        rc = PtlEQAlloc(req_ni, 128, request_callback, NULL, &req_eq);
-        if (rc != PTL_OK) {
-                EXIT;
-                return rc; /* FIXME: does this portals rc make sense? */
+        const ptl_handle_ni_t *nip;
+        ptl_handle_ni_t ni;
+
+        nip = inter_module_get_request(LUSTRE_NAL "_ni", LUSTRE_NAL);
+        if (nip == NULL) {
+                printk("get_ni failed: is the NAL module loaded?\n");
+                return -EIO;
         }
+        ni = *nip;
+
+        rc = PtlEQAlloc(ni, 128, request_callback, NULL, &req_eq);
+        if (rc != PTL_OK)
+                printk("PtlEQAlloc failed: %d\n", rc);
 
         return rc;
 }
@@ -159,10 +229,11 @@ static int __init req_init(void)
 
 static void __exit req_exit(void)
 {
-        if (req_initialized) {
-                PtlNIFini(req_ni);
-                inter_module_put(LUSTRE_NAL "_init");
-        }
+        PtlEQFree(req_eq);
+
+        inter_module_put(LUSTRE_NAL "_ni");
+
+        return;
 }
 
 MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
index bed10ae..9499617 100755 (executable)
@@ -2,9 +2,20 @@
 
 R=/r
 
-insmod /lib/modules/2.4.17/kernel/drivers/block/loop.o
+mknod /dev/portals c 10 240
+
 insmod $R/usr/src/portals/linux/oslib/portals.o
 insmod $R/usr/src/portals/linux/socknal/ksocknal.o
+
+$R/usr/src/portals/linux/utils/acceptor 1234 &
+
+$R/usr/src/portals/linux/utils/ptlctl <<EOF
+mynid
+setup tcp localhost 1234
+connect self
+connect mds
+EOF
+
 insmod $R/usr/src/obd/rpc/ptlrpc.o
 insmod $R/usr/src/obd/class/obdclass.o 
 insmod $R/usr/src/obd/ext2obd/obdext2.o
index 411a147..56b5bc6 100644 (file)
@@ -2,6 +2,21 @@
 
 R=/r
 
+mknod /dev/portals c 10 240
+
+insmod $R/usr/src/portals/linux/oslib/portals.o
+insmod $R/usr/src/portals/linux/socknal/ksocknal.o
+
+$R/usr/src/portals/linux/utils/acceptor 1234 &
+
+$R/usr/src/portals/linux/utils/ptlctl <<EOF
+mynid
+setup tcp localhost 1234
+connect self
+connect mds
+EOF
+
+insmod $R/usr/src/obd/rpc/ptlrpc.o
 insmod $R/usr/src/obd/class/obdclass.o 
 insmod $R/usr/src/obd/ext2obd/obdext2.o
 insmod $R/usr/src/obd/ost/ost.o
@@ -14,7 +29,6 @@ dd if=/dev/zero of=/tmp/fs bs=1024 count=10000
 mke2fs -b 4096 -F /tmp/fs
 losetup /dev/loop/0 /tmp/fs
 
-
 mknod /dev/obd c 10 241
 
 $R/usr/src/obd/utils/obdctl <<EOF
@@ -25,6 +39,6 @@ quit
 EOF
 
 mknod /dev/request c 10 244
-# $R/usr/src/obd/utils/testreq
+# $R/usr/src/obd/tests/testreq
 
 
diff --git a/lustre/tests/mdcreqcleanup.sh b/lustre/tests/mdcreqcleanup.sh
new file mode 100755 (executable)
index 0000000..8096a03
--- /dev/null
@@ -0,0 +1,22 @@
+rmmod llight
+rmmod mdc
+/usr/src/obd/utils/obdctl <<EOF
+device 0
+cleanup
+detach
+quit
+EOF
+rmmod mds
+rmmod osc
+rmmod ost
+rmmod obext2
+rmmod obdclass
+rmmod ptlrpc
+/usr/src/portals/linux/utils/ptlctl <<EOF
+setup tcp localhost 1234
+disconnect self
+disconnect mds
+EOF
+rmmod ksocknal
+killall acceptor
+rmmod portals