Whamcloud - gitweb
new files - restructure RPC further
authorbraam <braam>
Thu, 28 Feb 2002 08:49:03 +0000 (08:49 +0000)
committerbraam <braam>
Thu, 28 Feb 2002 08:49:03 +0000 (08:49 +0000)
lustre/ptlrpc/client.c [new file with mode: 0644]
lustre/ptlrpc/events.c [new file with mode: 0644]
lustre/ptlrpc/niobuf.c [new file with mode: 0644]
lustre/ptlrpc/service.c [new file with mode: 0644]

diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c
new file mode 100644 (file)
index 0000000..b758162
--- /dev/null
@@ -0,0 +1,196 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_net.h>
+
+int ptlrpc_enqueue(struct ptlrpc_client *peer, 
+                     struct ptlrpc_request *req)
+{
+       struct ptlrpc_request *srv_req;
+       
+       if (!peer->cli_obd) { 
+               EXIT;
+               return -1;
+       }
+
+       OBD_ALLOC(srv_req, sizeof(*srv_req));
+       if (!srv_req) { 
+               EXIT;
+               return -ENOMEM;
+       }
+
+        CDEBUG(0, "peer obd minor %d, incoming req %p, srv_req %p\n",
+              peer->cli_obd->obd_minor, req, srv_req);
+
+       memset(srv_req, 0, sizeof(*req)); 
+
+       /* move the request buffer */
+       srv_req->rq_reqbuf = req->rq_reqbuf;
+       srv_req->rq_reqlen = req->rq_reqlen;
+       srv_req->rq_obd = peer->cli_obd;
+
+       /* remember where it came from */
+       srv_req->rq_reply_handle = req;
+
+       list_add(&srv_req->rq_list, &peer->cli_obd->obd_req_list); 
+       wake_up(&peer->cli_obd->obd_req_waitq);
+       return 0;
+}
+
+int ptlrpc_connect_client(int dev, char *uuid, 
+                             struct ptlrpc_client *cl)
+{
+        int err; 
+
+        memset(cl, 0, sizeof(*cl));
+       cl->cli_xid = 0;
+       cl->cli_obd = NULL; 
+
+       /* non networked client */
+       if (dev >= 0 && dev < MAX_OBD_DEVICES) {
+               struct obd_device *obd = &obd_dev[dev];
+               
+               if ((!obd->obd_flags & OBD_ATTACHED) ||
+                   (!obd->obd_flags & OBD_SET_UP)) { 
+                       CERROR("target device %d not att or setup\n", dev);
+                       return -EINVAL;
+               }
+
+               cl->cli_obd = &obd_dev[dev];
+               return 0;
+       }
+
+       /* networked */
+       err = kportal_uuid_to_peer(uuid, &cl->cli_server);
+       if (err == 0) { 
+               CERROR("cannot find peer!"); 
+       }
+
+        return err;
+}
+
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, 
+                                       int opcode, int namelen, char *name,
+                                       int tgtlen, char *tgt)
+{
+       struct ptlrpc_request *request;
+       int rc;
+       ENTRY; 
+
+       OBD_ALLOC(request, sizeof(*request));
+       if (!request) { 
+               CERROR("request allocation out of memory\n");
+               return NULL;
+       }
+
+       memset(request, 0, sizeof(*request));
+       request->rq_xid = cl->cli_xid++;
+
+       rc = cl->cli_req_pack(name, namelen, tgt, tgtlen,
+                         &request->rq_reqhdr, &request->rq_req,
+                         &request->rq_reqlen, &request->rq_reqbuf);
+       if (rc) { 
+               CERROR("cannot pack request %d\n", rc); 
+               return NULL;
+       }
+       request->rq_reqhdr->opc = opcode;
+       request->rq_reqhdr->seqno = request->rq_xid;
+
+       EXIT;
+       return request;
+}
+
+void ptlrpc_free_req(struct ptlrpc_request *request)
+{
+       OBD_FREE(request, sizeof(*request));
+}
+
+int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
+                             
+{
+       int rc;
+        DECLARE_WAITQUEUE(wait, current);
+
+       init_waitqueue_head(&req->rq_wait_for_rep);
+
+       if (cl->cli_obd) {
+               /* Local delivery */
+                ENTRY;
+               rc = ptlrpc_enqueue(cl, req); 
+       } else {
+               /* Remote delivery via portals. */
+               req->rq_req_portal = cl->cli_request_portal;
+               req->rq_reply_portal = cl->cli_reply_portal;
+               rc = ptl_send_rpc(req, &cl->cli_server);
+       }
+       if (rc) { 
+               CERROR("error %d, opcode %d\n", rc, 
+                      req->rq_reqhdr->opc); 
+               return -rc;
+       }
+
+        CDEBUG(0, "-- sleeping\n");
+        add_wait_queue(&req->rq_wait_for_rep, &wait);
+        while (req->rq_repbuf == NULL) {
+                set_current_state(TASK_INTERRUPTIBLE);
+
+                /* if this process really wants to die, let it go */
+                if (sigismember(&(current->pending.signal), SIGKILL) ||
+                    sigismember(&(current->pending.signal), SIGINT))
+                        break;
+
+                schedule();
+        }
+        remove_wait_queue(&req->rq_wait_for_rep, &wait);
+        set_current_state(TASK_RUNNING);
+        CDEBUG(0, "-- done\n");
+
+        if (req->rq_repbuf == NULL) {
+                /* We broke out because of a signal */
+                EXIT;
+                return -EINTR;
+        }
+
+       rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep);
+       if (rc) {
+               CERROR("unpack_rep failed: %d\n", rc);
+               return rc;
+       }
+        CERROR("got rep %lld\n", req->rq_rephdr->seqno);
+       if ( req->rq_rephdr->status == 0 )
+                CDEBUG(0, "--> buf %p len %d status %d\n",
+                      req->rq_repbuf, req->rq_replen, 
+                      req->rq_rephdr->status); 
+
+       EXIT;
+       return 0;
+}
diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c
new file mode 100644 (file)
index 0000000..f6299f0
--- /dev/null
@@ -0,0 +1,211 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_net.h>
+
+ptl_handle_eq_t sent_pkt_eq, rcvd_rep_eq, bulk_source_eq, bulk_sink_eq;
+
+/*
+ *  Free the packet when it has gone out
+ */
+static int sent_packet_callback(ptl_event_t *ev, void *data)
+{
+        ENTRY;
+
+        if (ev->type == PTL_EVENT_SENT) {
+                OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
+        } else { 
+                // XXX make sure we understand all events, including ACK's
+                CERROR("Unknown event %d\n", ev->type); 
+                BUG();
+        }
+
+        EXIT;
+        return 1;
+}
+
+/*
+ * Wake up the thread waiting for the reply once it comes in.
+ */
+static int rcvd_reply_callback(ptl_event_t *ev, void *data)
+{
+        struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+        ENTRY;
+
+        if (ev->type == PTL_EVENT_PUT) {
+                rpc->rq_repbuf = ev->mem_desc.start + ev->offset;
+                barrier();
+                wake_up_interruptible(&rpc->rq_wait_for_rep);
+        } else { 
+                // XXX make sure we understand all events, including ACK's
+                CERROR("Unknown event %d\n", ev->type); 
+                BUG();
+        }
+
+        EXIT;
+        return 1;
+}
+
+int server_request_callback(ptl_event_t *ev, void *data)
+{
+        struct ptlrpc_service *service = data;
+        int rc;
+
+        if (ev->rlength != ev->mlength)
+                CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
+                       ev->mlength, ev->rlength);
+
+        /* The ME is unlinked when there is less than 1024 bytes free
+         * on its MD.  This ensures we are always able to handle the rpc, 
+         * although the 1024 value is a guess as to the size of a
+         * large rpc (the known safe margin should be determined).
+         *
+         * NOTE: The portals API by default unlinks all MD's associated
+         *       with an ME when it's unlinked.  For now, this behavior
+         *       has been commented out of the portals library so the
+         *       MD can be unlinked when its ref count drops to zero.
+         *       A new MD and ME will then be created that use the same
+         *       kmalloc()'ed memory and inserted at the ring tail.
+         */
+
+        service->srv_ref_count[service->srv_md_active]++;
+
+        if (ev->offset >= (service->srv_buf_size - 1024)) {
+                CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_me_active);
+
+                rc = PtlMEUnlink(service->srv_me_h[service->srv_me_active]);
+                service->srv_me_h[service->srv_me_active] = 0;
+
+                if (rc != PTL_OK) {
+                        CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc);
+                        BUG();
+                        return rc;
+                }
+
+                service->srv_me_active = NEXT_INDEX(service->srv_me_active,
+                        service->srv_ring_length);
+
+                if (service->srv_me_h[service->srv_me_active] == 0)
+                        CERROR("All %d ring ME's are unlinked!\n",
+                               service->srv_ring_length);
+        }
+
+        if (ev->type == PTL_EVENT_PUT) {
+                wake_up(&service->srv_waitq);
+        } else {
+                CERROR("Unexpected event type: %d\n", ev->type);
+        }
+
+        return 0;
+}
+
+
+static int bulk_source_callback(ptl_event_t *ev, void *data)
+{
+        struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+
+        ENTRY;
+
+        if (ev->type == PTL_EVENT_SENT) {
+                CDEBUG(D_NET, "got SENT event\n");
+        } else if (ev->type == PTL_EVENT_ACK) {
+                CDEBUG(D_NET, "got ACK event\n");
+                wake_up_interruptible(&rpc->rq_wait_for_bulk);
+        } else {
+                CERROR("Unexpected event type!\n");
+                BUG();
+        }
+
+        EXIT;
+        return 1;
+}
+
+static int bulk_sink_callback(ptl_event_t *ev, void *data)
+{
+        struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+
+        ENTRY;
+
+        if (ev->type == PTL_EVENT_PUT) {
+                if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset)
+                        CERROR("bulkbuf != mem_desc -- why?\n");
+                wake_up_interruptible(&rpc->rq_wait_for_bulk);
+        } else {
+                CERROR("Unexpected event type!\n");
+                BUG();
+        }
+
+        EXIT;
+        return 1;
+}
+
+int ptlrpc_init_portals(void)
+{
+        int rc;
+        const ptl_handle_ni_t *nip;
+        ptl_handle_ni_t ni;
+
+        nip = inter_module_get_request(LUSTRE_NAL "_ni", LUSTRE_NAL);
+        if (nip == NULL) {
+                CERROR("get_ni failed: is the NAL module loaded?\n");
+                return -EIO;
+        }
+        ni = *nip;
+
+        rc = PtlEQAlloc(ni, 128, sent_packet_callback, NULL, &sent_pkt_eq);
+        if (rc != PTL_OK)
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+
+        rc = PtlEQAlloc(ni, 128, rcvd_reply_callback, NULL, &rcvd_rep_eq);
+        if (rc != PTL_OK)
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+
+        rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq);
+        if (rc != PTL_OK)
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+
+        rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq);
+        if (rc != PTL_OK)
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+
+        return rc;
+}
+
+void ptlrpc_exit_portals(void)
+{
+        PtlEQFree(sent_pkt_eq);
+        PtlEQFree(rcvd_rep_eq);
+        PtlEQFree(bulk_source_eq);
+        PtlEQFree(bulk_sink_eq);
+
+        inter_module_put(LUSTRE_NAL "_ni");
+}
diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c
new file mode 100644 (file)
index 0000000..f019560
--- /dev/null
@@ -0,0 +1,239 @@
+
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_net.h>
+
+extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq;
+
+int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
+                 int portal, int is_request)
+{
+        int rc;
+        ptl_process_id_t remote_id;
+        ptl_handle_md_t md_h;
+
+        /* FIXME: This is bad. */
+        if (request->rq_bulklen) {
+                request->rq_req_md.start = request->rq_bulkbuf;
+                request->rq_req_md.length = request->rq_bulklen;
+                request->rq_req_md.eventq = bulk_source_eq;
+        } else if (is_request) {
+                request->rq_req_md.start = request->rq_reqbuf;
+                request->rq_req_md.length = request->rq_reqlen;
+                request->rq_req_md.eventq = sent_pkt_eq;
+        } else {
+                request->rq_req_md.start = request->rq_repbuf;
+                request->rq_req_md.length = request->rq_replen;
+                request->rq_req_md.eventq = sent_pkt_eq;
+        }
+        request->rq_req_md.threshold = 1;
+        request->rq_req_md.options = PTL_MD_OP_PUT;
+        request->rq_req_md.user_ptr = request;
+
+        rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h);
+        if (rc != 0) {
+                BUG();
+                CERROR("PtlMDBind failed: %d\n", rc);
+                return rc;
+        }
+
+        remote_id.addr_kind = PTL_ADDR_NID;
+        remote_id.nid = peer->peer_nid;
+        remote_id.pid = 0;
+
+        if (request->rq_bulklen) {
+                rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0,
+                            request->rq_xid, 0, 0);
+        } else {
+                rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0,
+                            request->rq_xid, 0, 0);
+        }
+        if (rc != PTL_OK) {
+                BUG();
+                CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+                       portal, request->rq_xid, rc);
+                /* FIXME: tear down md */
+        }
+
+        return rc;
+}
+
+int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
+{
+        ptl_handle_me_t me_h, bulk_me_h;
+        ptl_process_id_t local_id;
+        int rc;
+        char *repbuf;
+
+        ENTRY;
+
+        if (request->rq_replen == 0) {
+                CERROR("request->rq_replen is 0!\n");
+                EXIT;
+                return -EINVAL;
+        }
+
+        /* request->rq_repbuf is set only when the reply comes in, in
+         * client_packet_callback() */
+        OBD_ALLOC(repbuf, request->rq_replen);
+        if (!repbuf) { 
+                EXIT;
+                return -ENOMEM;
+        }
+
+        local_id.addr_kind = PTL_ADDR_GID;
+        local_id.gid = PTL_ID_ANY;
+        local_id.rid = PTL_ID_ANY;
+
+        //CERROR("sending req %d\n", request->rq_xid);
+        rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
+                         request->rq_xid, 0, PTL_UNLINK, &me_h);
+        if (rc != PTL_OK) {
+                CERROR("PtlMEAttach failed: %d\n", rc);
+                BUG();
+                EXIT;
+                goto cleanup;
+        }
+
+        request->rq_reply_md.start = repbuf;
+        request->rq_reply_md.length = request->rq_replen;
+        request->rq_reply_md.threshold = 1;
+        request->rq_reply_md.options = PTL_MD_OP_PUT;
+        request->rq_reply_md.user_ptr = request;
+        request->rq_reply_md.eventq = rcvd_rep_eq;
+
+        rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK,
+                         &request->rq_reply_md_h);
+        if (rc != PTL_OK) {
+                CERROR("PtlMDAttach failed: %d\n", rc);
+                BUG();
+                EXIT;
+                goto cleanup2;
+        }
+
+        if (request->rq_bulklen != 0) {
+                rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
+                                 local_id, request->rq_xid, 0, PTL_UNLINK,
+                                 &bulk_me_h);
+                if (rc != PTL_OK) {
+                        CERROR("PtlMEAttach failed: %d\n", rc);
+                        BUG();
+                        EXIT;
+                        goto cleanup3;
+                }
+
+                request->rq_bulk_md.start = request->rq_bulkbuf;
+                request->rq_bulk_md.length = request->rq_bulklen;
+                request->rq_bulk_md.threshold = 1;
+                request->rq_bulk_md.options = PTL_MD_OP_PUT;
+                request->rq_bulk_md.user_ptr = request;
+                request->rq_bulk_md.eventq = bulk_sink_eq;
+
+                rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK,
+                                 &request->rq_bulk_md_h);
+                if (rc != PTL_OK) {
+                        CERROR("PtlMDAttach failed: %d\n", rc);
+                        BUG();
+                        EXIT;
+                        goto cleanup4;
+                }
+        }
+
+        return ptl_send_buf(request, peer, request->rq_req_portal, 1);
+
+ cleanup4:
+        PtlMEUnlink(bulk_me_h);
+ cleanup3:
+        PtlMDUnlink(request->rq_reply_md_h);
+ cleanup2:
+        PtlMEUnlink(me_h);
+ cleanup:
+        OBD_FREE(repbuf, request->rq_replen);
+
+        return rc;
+}
+
+/* ptl_received_rpc() should be called by the sleeping process once
+ * it finishes processing an event.  This ensures the ref count is
+ * decremented and that the rpc ring buffer cycles properly.
+ */ 
+int ptl_received_rpc(struct ptlrpc_service *service) {
+        int rc, index;
+
+        index = service->srv_md_active;
+        CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index,
+               service->srv_ref_count[index]);
+        service->srv_ref_count[index]--;
+
+        if ((service->srv_ref_count[index] <= 0) &&
+            (service->srv_me_h[index] == 0)) {
+
+                /* Replace the unlinked ME and MD */
+                rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail],
+                                 service->srv_id, 0, ~0, PTL_RETAIN,
+                                 PTL_INS_AFTER, &(service->srv_me_h[index]));
+                CERROR("Inserting new ME and MD in ring, rc %d\n", rc);
+                service->srv_me_tail = index;
+                service->srv_ref_count[index] = 0;
+                
+                if (rc != PTL_OK) {
+                        CERROR("PtlMEInsert failed: %d\n", rc);
+                        BUG();
+                        return rc;
+                }
+
+                service->srv_md[index].start        = service->srv_buf[index];
+                service->srv_md[index].length       = service->srv_buf_size;
+                service->srv_md[index].threshold    = PTL_MD_THRESH_INF;
+                service->srv_md[index].options      = PTL_MD_OP_PUT;
+                service->srv_md[index].user_ptr     = service;
+                service->srv_md[index].eventq       = service->srv_eq_h;
+
+                rc = PtlMDAttach(service->srv_me_h[index],
+                                 service->srv_md[index],
+                                 PTL_RETAIN, &(service->srv_md_h[index]));
+
+                CDEBUG(D_INFO, "Attach MD in ring, rc %d\n", rc);
+                if (rc != PTL_OK) {
+                        /* XXX cleanup */
+                        BUG();
+                        CERROR("PtlMDAttach failed: %d\n", rc);
+                        return rc;
+                }
+
+                service->srv_md_active =
+                        NEXT_INDEX(index, service->srv_ring_length);
+        } 
+        
+        return 0;
+}
diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c
new file mode 100644 (file)
index 0000000..89830f1
--- /dev/null
@@ -0,0 +1,346 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_net.h>
+
+extern int server_request_callback(ptl_event_t *ev, void *data);
+
+static int ptlrpc_check_event(struct ptlrpc_service *svc)
+{
+        
+        if (sigismember(&(current->pending.signal),
+                        SIGKILL) ||
+            sigismember(&(current->pending.signal),
+                        SIGINT)) { 
+                svc->srv_flags |= SVC_KILLED;
+                EXIT;
+                return 1;
+        }
+
+        if ( svc->srv_flags & SVC_STOPPING ) {
+                EXIT;
+                return 1;
+        }
+
+        if ( svc->srv_eq_h ) { 
+                int rc;
+                rc = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
+
+                if (rc == PTL_OK) { 
+                        svc->srv_flags |= SVC_EVENT;
+                        EXIT;
+                        return 1;
+                }
+
+                if (rc == PTL_EQ_DROPPED) { 
+                        CERROR("dropped event!\n");
+                        BUG();
+                }
+                CERROR("PtlEQGet returns %d\n", rc); 
+                EXIT;
+                return 0;
+        }
+
+        if (!list_empty(&svc->srv_reqs)) {
+                svc->srv_flags |= SVC_LIST;
+                EXIT;
+                return 1;
+        }
+                
+        EXIT;
+        return 0;
+}
+
+struct ptlrpc_service *ptlrpc_init_svc(__u32 bufsize, 
+                                       int portal, 
+                                       char *uuid, 
+                                       req_unpack_t unpack, 
+                                       rep_pack_t pack,
+                                       svc_handler_t handler
+                                       )
+{
+        int err;
+        struct ptlrpc_service *svc;
+
+        OBD_ALLOC(svc, sizeof(*svc)); 
+        if ( !svc ) { 
+                CERROR("no memory\n");
+                return NULL;
+        }
+
+        memset(svc, 0, sizeof(*svc)); 
+
+        spin_lock_init(&svc->srv_lock);
+        INIT_LIST_HEAD(&svc->srv_reqs);
+        init_waitqueue_head(&svc->srv_ctl_waitq); 
+        init_waitqueue_head(&svc->srv_waitq); 
+
+        svc->srv_thread = NULL;
+       svc->srv_flags = 0;
+
+        svc->srv_buf_size = bufsize;
+        svc->srv_portal = portal;
+        svc->srv_req_unpack = unpack;
+        svc->srv_rep_pack = pack;
+        svc->srv_handler = handler;
+       err = kportal_uuid_to_peer(uuid, &svc->srv_self);
+        if (err) { 
+                CERROR("cannot get peer for uuid %s", uuid); 
+                OBD_FREE(svc, sizeof(*svc));
+                return NULL; 
+        }
+        return svc;
+}
+
+static int ptlrpc_main(void *arg)
+{
+        int rc;
+        struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
+       struct obd_device *obddev = data->dev;
+        struct ptlrpc_service *svc = data->svc;
+
+       ENTRY;
+
+       lock_kernel();
+       daemonize();
+       spin_lock_irq(&current->sigmask_lock);
+       sigfillset(&current->blocked);
+       recalc_sigpending(current);
+       spin_unlock_irq(&current->sigmask_lock);
+
+       sprintf(current->comm, data->name);
+
+       /* Record that the  thread is running */
+       svc->srv_thread = current;
+        svc->srv_flags = SVC_RUNNING;
+       wake_up(&svc->srv_ctl_waitq); 
+
+       /* XXX maintain a list of all managed devices: insert here */
+
+       /* And now, loop forever on requests */
+       while (1) {
+
+                wait_event(svc->srv_waitq, ptlrpc_check_event(svc));
+                
+                if (svc->srv_flags & SVC_SIGNAL) {
+                        EXIT;
+                        break;
+                }
+
+                if (svc->srv_flags & SVC_STOPPING) {
+                        EXIT;
+                        break;
+                }
+
+                if (svc->srv_flags & SVC_EVENT) { 
+                       struct ptlrpc_request request;
+                        svc->srv_flags = SVC_RUNNING; 
+
+                        /* FIXME: If we move to an event-driven model,
+                         * we should put the request on the stack of
+                         * mds_handle instead. */
+                        memset(&request, 0, sizeof(request));
+                        request.rq_reqbuf = svc->srv_ev.mem_desc.start + svc->srv_ev.offset;
+                        request.rq_reqlen = svc->srv_ev.mem_desc.length;
+                        request.rq_xid = svc->srv_ev.match_bits;
+                        CERROR("got req %d\n", request.rq_xid);
+
+                        request.rq_peer.peer_nid = svc->srv_ev.initiator.nid;
+                        /* FIXME: this NI should be the incoming NI.
+                         * We don't know how to find that from here. */
+                        request.rq_peer.peer_ni = svc->srv_self.peer_ni;
+                        rc = svc->srv_handler(obddev, svc, &request);
+                        ptl_received_rpc(svc);
+                        continue;
+                }
+
+                if (svc->srv_flags & SVC_LIST) { 
+                       struct ptlrpc_request *request;
+                        svc->srv_flags = SVC_RUNNING; 
+
+                        spin_lock(&svc->srv_lock);
+                        request = list_entry(svc->srv_reqs.next,
+                                             struct ptlrpc_request,
+                                             rq_list);
+                        list_del(&request->rq_list);
+                        spin_unlock(&svc->srv_lock);
+                        rc = svc->srv_handler(obddev, svc, request);
+                        continue;
+                }
+                CERROR("unknown break in service"); 
+                break; 
+        }
+
+       svc->srv_thread = NULL;
+        svc->srv_flags = SVC_STOPPED;
+       wake_up(&svc->srv_ctl_waitq);
+       CERROR("svc %s: exiting\n", data->name);
+       return 0;
+}
+
+void ptlrpc_stop_thread(struct ptlrpc_service *svc)
+{
+       svc->srv_flags = SVC_STOPPING;
+
+        wake_up(&svc->srv_waitq);
+        wait_event_interruptible(svc->srv_ctl_waitq,  (svc->srv_flags & SVC_STOPPED));
+}
+
+int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
+                                char *name)
+{
+        struct ptlrpc_svc_data d;
+        int rc;
+       ENTRY;
+
+        d.dev = dev;
+        d.svc = svc;
+        d.name = name;
+
+       init_waitqueue_head(&svc->srv_waitq);
+
+       init_waitqueue_head(&svc->srv_ctl_waitq);
+       rc = kernel_thread(ptlrpc_main, (void *) &d, 
+                           CLONE_VM | CLONE_FS | CLONE_FILES);
+        if (rc < 0) { 
+                CERROR("cannot start thread\n"); 
+                return -EINVAL;
+        }
+        wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING);
+
+       EXIT;
+        return 0;
+}
+
+
+int rpc_register_service(struct ptlrpc_service *service, char *uuid)
+{
+        struct lustre_peer peer;
+        int rc, i;
+
+        rc = kportal_uuid_to_peer(uuid, &peer);
+        if (rc != 0) {
+                CERROR("Invalid uuid \"%s\"\n", uuid);
+                return -EINVAL;
+        }
+
+        service->srv_ring_length = RPC_RING_LENGTH;
+        service->srv_me_active = 0;
+        service->srv_md_active = 0;
+
+        service->srv_id.addr_kind = PTL_ADDR_GID;
+        service->srv_id.gid = PTL_ID_ANY;
+        service->srv_id.rid = PTL_ID_ANY;
+
+        rc = PtlEQAlloc(peer.peer_ni, 128, server_request_callback,
+                        service, &(service->srv_eq_h));
+
+        if (rc != PTL_OK) {
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+                return rc;
+        }
+
+        /* Attach the leading ME on which we build the ring */
+        rc = PtlMEAttach(peer.peer_ni, service->srv_portal,
+                         service->srv_id, 0, ~0, PTL_RETAIN,
+                         &(service->srv_me_h[0]));
+
+        if (rc != PTL_OK) {
+                CERROR("PtlMEAttach failed: %d\n", rc);
+                return rc;
+        }
+
+        for (i = 0; i < service->srv_ring_length; i++) {
+                OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);
+
+                if (service->srv_buf[i] == NULL) {
+                        CERROR("no memory\n");
+                        return -ENOMEM;
+                }
+
+                /* Insert additional ME's to the ring */
+                if (i > 0) {
+                        rc = PtlMEInsert(service->srv_me_h[i-1],
+                                         service->srv_id, 0, ~0, PTL_RETAIN,
+                                         PTL_INS_AFTER,&(service->srv_me_h[i]));
+                        service->srv_me_tail = i;
+
+                        if (rc != PTL_OK) {
+                                CERROR("PtlMEInsert failed: %d\n", rc);
+                                return rc;
+                        }
+                }
+
+                service->srv_ref_count[i] = 0;
+                service->srv_md[i].start        = service->srv_buf[i];
+                service->srv_md[i].length        = service->srv_buf_size;
+                service->srv_md[i].threshold        = PTL_MD_THRESH_INF;
+                service->srv_md[i].options        = PTL_MD_OP_PUT;
+                service->srv_md[i].user_ptr        = service;
+                service->srv_md[i].eventq        = service->srv_eq_h;
+
+                rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i],
+                                 PTL_RETAIN, &(service->srv_md_h[i]));
+
+                if (rc != PTL_OK) {
+                        /* cleanup */
+                        CERROR("PtlMDAttach failed: %d\n", rc);
+                        return rc;
+                }
+        }
+
+        return 0;
+}
+
+int rpc_unregister_service(struct ptlrpc_service *service)
+{
+        int rc, i;
+
+        for (i = 0; i < service->srv_ring_length; i++) {
+                rc = PtlMDUnlink(service->srv_md_h[i]);
+                if (rc)
+                        CERROR("PtlMDUnlink failed: %d\n", rc);
+        
+                rc = PtlMEUnlink(service->srv_me_h[i]);
+                if (rc)
+                        CERROR("PtlMEUnlink failed: %d\n", rc);
+        
+                OBD_FREE(service->srv_buf[i], service->srv_buf_size);                
+        }
+
+        rc = PtlEQFree(service->srv_eq_h);
+        if (rc)
+                CERROR("PtlEQFree failed: %d\n", rc);
+
+        return 0;
+}
+