--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_net.h>
+
+int ptlrpc_enqueue(struct ptlrpc_client *peer,
+ struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *srv_req;
+
+ if (!peer->cli_obd) {
+ EXIT;
+ return -1;
+ }
+
+ OBD_ALLOC(srv_req, sizeof(*srv_req));
+ if (!srv_req) {
+ EXIT;
+ return -ENOMEM;
+ }
+
+ CDEBUG(0, "peer obd minor %d, incoming req %p, srv_req %p\n",
+ peer->cli_obd->obd_minor, req, srv_req);
+
+ memset(srv_req, 0, sizeof(*req));
+
+ /* move the request buffer */
+ srv_req->rq_reqbuf = req->rq_reqbuf;
+ srv_req->rq_reqlen = req->rq_reqlen;
+ srv_req->rq_obd = peer->cli_obd;
+
+ /* remember where it came from */
+ srv_req->rq_reply_handle = req;
+
+ list_add(&srv_req->rq_list, &peer->cli_obd->obd_req_list);
+ wake_up(&peer->cli_obd->obd_req_waitq);
+ return 0;
+}
+
+int ptlrpc_connect_client(int dev, char *uuid,
+ struct ptlrpc_client *cl)
+{
+ int err;
+
+ memset(cl, 0, sizeof(*cl));
+ cl->cli_xid = 0;
+ cl->cli_obd = NULL;
+
+ /* non networked client */
+ if (dev >= 0 && dev < MAX_OBD_DEVICES) {
+ struct obd_device *obd = &obd_dev[dev];
+
+ if ((!obd->obd_flags & OBD_ATTACHED) ||
+ (!obd->obd_flags & OBD_SET_UP)) {
+ CERROR("target device %d not att or setup\n", dev);
+ return -EINVAL;
+ }
+
+ cl->cli_obd = &obd_dev[dev];
+ return 0;
+ }
+
+ /* networked */
+ err = kportal_uuid_to_peer(uuid, &cl->cli_server);
+ if (err == 0) {
+ CERROR("cannot find peer!");
+ }
+
+ return err;
+}
+
+struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
+ int opcode, int namelen, char *name,
+ int tgtlen, char *tgt)
+{
+ struct ptlrpc_request *request;
+ int rc;
+ ENTRY;
+
+ OBD_ALLOC(request, sizeof(*request));
+ if (!request) {
+ CERROR("request allocation out of memory\n");
+ return NULL;
+ }
+
+ memset(request, 0, sizeof(*request));
+ request->rq_xid = cl->cli_xid++;
+
+ rc = cl->cli_req_pack(name, namelen, tgt, tgtlen,
+ &request->rq_reqhdr, &request->rq_req,
+ &request->rq_reqlen, &request->rq_reqbuf);
+ if (rc) {
+ CERROR("cannot pack request %d\n", rc);
+ return NULL;
+ }
+ request->rq_reqhdr->opc = opcode;
+ request->rq_reqhdr->seqno = request->rq_xid;
+
+ EXIT;
+ return request;
+}
+
+void ptlrpc_free_req(struct ptlrpc_request *request)
+{
+ OBD_FREE(request, sizeof(*request));
+}
+
+int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req)
+
+{
+ int rc;
+ DECLARE_WAITQUEUE(wait, current);
+
+ init_waitqueue_head(&req->rq_wait_for_rep);
+
+ if (cl->cli_obd) {
+ /* Local delivery */
+ ENTRY;
+ rc = ptlrpc_enqueue(cl, req);
+ } else {
+ /* Remote delivery via portals. */
+ req->rq_req_portal = cl->cli_request_portal;
+ req->rq_reply_portal = cl->cli_reply_portal;
+ rc = ptl_send_rpc(req, &cl->cli_server);
+ }
+ if (rc) {
+ CERROR("error %d, opcode %d\n", rc,
+ req->rq_reqhdr->opc);
+ return -rc;
+ }
+
+ CDEBUG(0, "-- sleeping\n");
+ add_wait_queue(&req->rq_wait_for_rep, &wait);
+ while (req->rq_repbuf == NULL) {
+ set_current_state(TASK_INTERRUPTIBLE);
+
+ /* if this process really wants to die, let it go */
+ if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGINT))
+ break;
+
+ schedule();
+ }
+ remove_wait_queue(&req->rq_wait_for_rep, &wait);
+ set_current_state(TASK_RUNNING);
+ CDEBUG(0, "-- done\n");
+
+ if (req->rq_repbuf == NULL) {
+ /* We broke out because of a signal */
+ EXIT;
+ return -EINTR;
+ }
+
+ rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep);
+ if (rc) {
+ CERROR("unpack_rep failed: %d\n", rc);
+ return rc;
+ }
+ CERROR("got rep %lld\n", req->rq_rephdr->seqno);
+ if ( req->rq_rephdr->status == 0 )
+ CDEBUG(0, "--> buf %p len %d status %d\n",
+ req->rq_repbuf, req->rq_replen,
+ req->rq_rephdr->status);
+
+ EXIT;
+ return 0;
+}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_net.h>
+
+ptl_handle_eq_t sent_pkt_eq, rcvd_rep_eq, bulk_source_eq, bulk_sink_eq;
+
+/*
+ * Free the packet when it has gone out
+ */
+static int sent_packet_callback(ptl_event_t *ev, void *data)
+{
+ ENTRY;
+
+ if (ev->type == PTL_EVENT_SENT) {
+ OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
+ } else {
+ // XXX make sure we understand all events, including ACK's
+ CERROR("Unknown event %d\n", ev->type);
+ BUG();
+ }
+
+ EXIT;
+ return 1;
+}
+
+/*
+ * Wake up the thread waiting for the reply once it comes in.
+ */
+static int rcvd_reply_callback(ptl_event_t *ev, void *data)
+{
+ struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+ ENTRY;
+
+ if (ev->type == PTL_EVENT_PUT) {
+ rpc->rq_repbuf = ev->mem_desc.start + ev->offset;
+ barrier();
+ wake_up_interruptible(&rpc->rq_wait_for_rep);
+ } else {
+ // XXX make sure we understand all events, including ACK's
+ CERROR("Unknown event %d\n", ev->type);
+ BUG();
+ }
+
+ EXIT;
+ return 1;
+}
+
+int server_request_callback(ptl_event_t *ev, void *data)
+{
+ struct ptlrpc_service *service = data;
+ int rc;
+
+ if (ev->rlength != ev->mlength)
+ CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
+ ev->mlength, ev->rlength);
+
+ /* The ME is unlinked when there is less than 1024 bytes free
+ * on its MD. This ensures we are always able to handle the rpc,
+ * although the 1024 value is a guess as to the size of a
+ * large rpc (the known safe margin should be determined).
+ *
+ * NOTE: The portals API by default unlinks all MD's associated
+ * with an ME when it's unlinked. For now, this behavior
+ * has been commented out of the portals library so the
+ * MD can be unlinked when its ref count drops to zero.
+ * A new MD and ME will then be created that use the same
+ * kmalloc()'ed memory and inserted at the ring tail.
+ */
+
+ service->srv_ref_count[service->srv_md_active]++;
+
+ if (ev->offset >= (service->srv_buf_size - 1024)) {
+ CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_me_active);
+
+ rc = PtlMEUnlink(service->srv_me_h[service->srv_me_active]);
+ service->srv_me_h[service->srv_me_active] = 0;
+
+ if (rc != PTL_OK) {
+ CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc);
+ BUG();
+ return rc;
+ }
+
+ service->srv_me_active = NEXT_INDEX(service->srv_me_active,
+ service->srv_ring_length);
+
+ if (service->srv_me_h[service->srv_me_active] == 0)
+ CERROR("All %d ring ME's are unlinked!\n",
+ service->srv_ring_length);
+ }
+
+ if (ev->type == PTL_EVENT_PUT) {
+ wake_up(&service->srv_waitq);
+ } else {
+ CERROR("Unexpected event type: %d\n", ev->type);
+ }
+
+ return 0;
+}
+
+
+static int bulk_source_callback(ptl_event_t *ev, void *data)
+{
+ struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+
+ ENTRY;
+
+ if (ev->type == PTL_EVENT_SENT) {
+ CDEBUG(D_NET, "got SENT event\n");
+ } else if (ev->type == PTL_EVENT_ACK) {
+ CDEBUG(D_NET, "got ACK event\n");
+ wake_up_interruptible(&rpc->rq_wait_for_bulk);
+ } else {
+ CERROR("Unexpected event type!\n");
+ BUG();
+ }
+
+ EXIT;
+ return 1;
+}
+
+static int bulk_sink_callback(ptl_event_t *ev, void *data)
+{
+ struct ptlrpc_request *rpc = ev->mem_desc.user_ptr;
+
+ ENTRY;
+
+ if (ev->type == PTL_EVENT_PUT) {
+ if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset)
+ CERROR("bulkbuf != mem_desc -- why?\n");
+ wake_up_interruptible(&rpc->rq_wait_for_bulk);
+ } else {
+ CERROR("Unexpected event type!\n");
+ BUG();
+ }
+
+ EXIT;
+ return 1;
+}
+
+int ptlrpc_init_portals(void)
+{
+ int rc;
+ const ptl_handle_ni_t *nip;
+ ptl_handle_ni_t ni;
+
+ nip = inter_module_get_request(LUSTRE_NAL "_ni", LUSTRE_NAL);
+ if (nip == NULL) {
+ CERROR("get_ni failed: is the NAL module loaded?\n");
+ return -EIO;
+ }
+ ni = *nip;
+
+ rc = PtlEQAlloc(ni, 128, sent_packet_callback, NULL, &sent_pkt_eq);
+ if (rc != PTL_OK)
+ CERROR("PtlEQAlloc failed: %d\n", rc);
+
+ rc = PtlEQAlloc(ni, 128, rcvd_reply_callback, NULL, &rcvd_rep_eq);
+ if (rc != PTL_OK)
+ CERROR("PtlEQAlloc failed: %d\n", rc);
+
+ rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq);
+ if (rc != PTL_OK)
+ CERROR("PtlEQAlloc failed: %d\n", rc);
+
+ rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq);
+ if (rc != PTL_OK)
+ CERROR("PtlEQAlloc failed: %d\n", rc);
+
+ return rc;
+}
+
+void ptlrpc_exit_portals(void)
+{
+ PtlEQFree(sent_pkt_eq);
+ PtlEQFree(rcvd_rep_eq);
+ PtlEQFree(bulk_source_eq);
+ PtlEQFree(bulk_sink_eq);
+
+ inter_module_put(LUSTRE_NAL "_ni");
+}
--- /dev/null
+
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_net.h>
+
+extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq;
+
+int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer,
+ int portal, int is_request)
+{
+ int rc;
+ ptl_process_id_t remote_id;
+ ptl_handle_md_t md_h;
+
+ /* FIXME: This is bad. */
+ if (request->rq_bulklen) {
+ request->rq_req_md.start = request->rq_bulkbuf;
+ request->rq_req_md.length = request->rq_bulklen;
+ request->rq_req_md.eventq = bulk_source_eq;
+ } else if (is_request) {
+ request->rq_req_md.start = request->rq_reqbuf;
+ request->rq_req_md.length = request->rq_reqlen;
+ request->rq_req_md.eventq = sent_pkt_eq;
+ } else {
+ request->rq_req_md.start = request->rq_repbuf;
+ request->rq_req_md.length = request->rq_replen;
+ request->rq_req_md.eventq = sent_pkt_eq;
+ }
+ request->rq_req_md.threshold = 1;
+ request->rq_req_md.options = PTL_MD_OP_PUT;
+ request->rq_req_md.user_ptr = request;
+
+ rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h);
+ if (rc != 0) {
+ BUG();
+ CERROR("PtlMDBind failed: %d\n", rc);
+ return rc;
+ }
+
+ remote_id.addr_kind = PTL_ADDR_NID;
+ remote_id.nid = peer->peer_nid;
+ remote_id.pid = 0;
+
+ if (request->rq_bulklen) {
+ rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0,
+ request->rq_xid, 0, 0);
+ } else {
+ rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0,
+ request->rq_xid, 0, 0);
+ }
+ if (rc != PTL_OK) {
+ BUG();
+ CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid,
+ portal, request->rq_xid, rc);
+ /* FIXME: tear down md */
+ }
+
+ return rc;
+}
+
+int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
+{
+ ptl_handle_me_t me_h, bulk_me_h;
+ ptl_process_id_t local_id;
+ int rc;
+ char *repbuf;
+
+ ENTRY;
+
+ if (request->rq_replen == 0) {
+ CERROR("request->rq_replen is 0!\n");
+ EXIT;
+ return -EINVAL;
+ }
+
+ /* request->rq_repbuf is set only when the reply comes in, in
+ * client_packet_callback() */
+ OBD_ALLOC(repbuf, request->rq_replen);
+ if (!repbuf) {
+ EXIT;
+ return -ENOMEM;
+ }
+
+ local_id.addr_kind = PTL_ADDR_GID;
+ local_id.gid = PTL_ID_ANY;
+ local_id.rid = PTL_ID_ANY;
+
+ //CERROR("sending req %d\n", request->rq_xid);
+ rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
+ request->rq_xid, 0, PTL_UNLINK, &me_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup;
+ }
+
+ request->rq_reply_md.start = repbuf;
+ request->rq_reply_md.length = request->rq_replen;
+ request->rq_reply_md.threshold = 1;
+ request->rq_reply_md.options = PTL_MD_OP_PUT;
+ request->rq_reply_md.user_ptr = request;
+ request->rq_reply_md.eventq = rcvd_rep_eq;
+
+ rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK,
+ &request->rq_reply_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup2;
+ }
+
+ if (request->rq_bulklen != 0) {
+ rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal,
+ local_id, request->rq_xid, 0, PTL_UNLINK,
+ &bulk_me_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup3;
+ }
+
+ request->rq_bulk_md.start = request->rq_bulkbuf;
+ request->rq_bulk_md.length = request->rq_bulklen;
+ request->rq_bulk_md.threshold = 1;
+ request->rq_bulk_md.options = PTL_MD_OP_PUT;
+ request->rq_bulk_md.user_ptr = request;
+ request->rq_bulk_md.eventq = bulk_sink_eq;
+
+ rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK,
+ &request->rq_bulk_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ BUG();
+ EXIT;
+ goto cleanup4;
+ }
+ }
+
+ return ptl_send_buf(request, peer, request->rq_req_portal, 1);
+
+ cleanup4:
+ PtlMEUnlink(bulk_me_h);
+ cleanup3:
+ PtlMDUnlink(request->rq_reply_md_h);
+ cleanup2:
+ PtlMEUnlink(me_h);
+ cleanup:
+ OBD_FREE(repbuf, request->rq_replen);
+
+ return rc;
+}
+
+/* ptl_received_rpc() should be called by the sleeping process once
+ * it finishes processing an event. This ensures the ref count is
+ * decremented and that the rpc ring buffer cycles properly.
+ */
+int ptl_received_rpc(struct ptlrpc_service *service) {
+ int rc, index;
+
+ index = service->srv_md_active;
+ CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index,
+ service->srv_ref_count[index]);
+ service->srv_ref_count[index]--;
+
+ if ((service->srv_ref_count[index] <= 0) &&
+ (service->srv_me_h[index] == 0)) {
+
+ /* Replace the unlinked ME and MD */
+ rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail],
+ service->srv_id, 0, ~0, PTL_RETAIN,
+ PTL_INS_AFTER, &(service->srv_me_h[index]));
+ CERROR("Inserting new ME and MD in ring, rc %d\n", rc);
+ service->srv_me_tail = index;
+ service->srv_ref_count[index] = 0;
+
+ if (rc != PTL_OK) {
+ CERROR("PtlMEInsert failed: %d\n", rc);
+ BUG();
+ return rc;
+ }
+
+ service->srv_md[index].start = service->srv_buf[index];
+ service->srv_md[index].length = service->srv_buf_size;
+ service->srv_md[index].threshold = PTL_MD_THRESH_INF;
+ service->srv_md[index].options = PTL_MD_OP_PUT;
+ service->srv_md[index].user_ptr = service;
+ service->srv_md[index].eventq = service->srv_eq_h;
+
+ rc = PtlMDAttach(service->srv_me_h[index],
+ service->srv_md[index],
+ PTL_RETAIN, &(service->srv_md_h[index]));
+
+ CDEBUG(D_INFO, "Attach MD in ring, rc %d\n", rc);
+ if (rc != PTL_OK) {
+ /* XXX cleanup */
+ BUG();
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ return rc;
+ }
+
+ service->srv_md_active =
+ NEXT_INDEX(index, service->srv_ring_length);
+ }
+
+ return 0;
+}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_net.h>
+
+extern int server_request_callback(ptl_event_t *ev, void *data);
+
+static int ptlrpc_check_event(struct ptlrpc_service *svc)
+{
+
+ if (sigismember(&(current->pending.signal),
+ SIGKILL) ||
+ sigismember(&(current->pending.signal),
+ SIGINT)) {
+ svc->srv_flags |= SVC_KILLED;
+ EXIT;
+ return 1;
+ }
+
+ if ( svc->srv_flags & SVC_STOPPING ) {
+ EXIT;
+ return 1;
+ }
+
+ if ( svc->srv_eq_h ) {
+ int rc;
+ rc = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
+
+ if (rc == PTL_OK) {
+ svc->srv_flags |= SVC_EVENT;
+ EXIT;
+ return 1;
+ }
+
+ if (rc == PTL_EQ_DROPPED) {
+ CERROR("dropped event!\n");
+ BUG();
+ }
+ CERROR("PtlEQGet returns %d\n", rc);
+ EXIT;
+ return 0;
+ }
+
+ if (!list_empty(&svc->srv_reqs)) {
+ svc->srv_flags |= SVC_LIST;
+ EXIT;
+ return 1;
+ }
+
+ EXIT;
+ return 0;
+}
+
+struct ptlrpc_service *ptlrpc_init_svc(__u32 bufsize,
+ int portal,
+ char *uuid,
+ req_unpack_t unpack,
+ rep_pack_t pack,
+ svc_handler_t handler
+ )
+{
+ int err;
+ struct ptlrpc_service *svc;
+
+ OBD_ALLOC(svc, sizeof(*svc));
+ if ( !svc ) {
+ CERROR("no memory\n");
+ return NULL;
+ }
+
+ memset(svc, 0, sizeof(*svc));
+
+ spin_lock_init(&svc->srv_lock);
+ INIT_LIST_HEAD(&svc->srv_reqs);
+ init_waitqueue_head(&svc->srv_ctl_waitq);
+ init_waitqueue_head(&svc->srv_waitq);
+
+ svc->srv_thread = NULL;
+ svc->srv_flags = 0;
+
+ svc->srv_buf_size = bufsize;
+ svc->srv_portal = portal;
+ svc->srv_req_unpack = unpack;
+ svc->srv_rep_pack = pack;
+ svc->srv_handler = handler;
+ err = kportal_uuid_to_peer(uuid, &svc->srv_self);
+ if (err) {
+ CERROR("cannot get peer for uuid %s", uuid);
+ OBD_FREE(svc, sizeof(*svc));
+ return NULL;
+ }
+ return svc;
+}
+
+static int ptlrpc_main(void *arg)
+{
+ int rc;
+ struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
+ struct obd_device *obddev = data->dev;
+ struct ptlrpc_service *svc = data->svc;
+
+ ENTRY;
+
+ lock_kernel();
+ daemonize();
+ spin_lock_irq(¤t->sigmask_lock);
+ sigfillset(¤t->blocked);
+ recalc_sigpending(current);
+ spin_unlock_irq(¤t->sigmask_lock);
+
+ sprintf(current->comm, data->name);
+
+ /* Record that the thread is running */
+ svc->srv_thread = current;
+ svc->srv_flags = SVC_RUNNING;
+ wake_up(&svc->srv_ctl_waitq);
+
+ /* XXX maintain a list of all managed devices: insert here */
+
+ /* And now, loop forever on requests */
+ while (1) {
+
+ wait_event(svc->srv_waitq, ptlrpc_check_event(svc));
+
+ if (svc->srv_flags & SVC_SIGNAL) {
+ EXIT;
+ break;
+ }
+
+ if (svc->srv_flags & SVC_STOPPING) {
+ EXIT;
+ break;
+ }
+
+ if (svc->srv_flags & SVC_EVENT) {
+ struct ptlrpc_request request;
+ svc->srv_flags = SVC_RUNNING;
+
+ /* FIXME: If we move to an event-driven model,
+ * we should put the request on the stack of
+ * mds_handle instead. */
+ memset(&request, 0, sizeof(request));
+ request.rq_reqbuf = svc->srv_ev.mem_desc.start + svc->srv_ev.offset;
+ request.rq_reqlen = svc->srv_ev.mem_desc.length;
+ request.rq_xid = svc->srv_ev.match_bits;
+ CERROR("got req %d\n", request.rq_xid);
+
+ request.rq_peer.peer_nid = svc->srv_ev.initiator.nid;
+ /* FIXME: this NI should be the incoming NI.
+ * We don't know how to find that from here. */
+ request.rq_peer.peer_ni = svc->srv_self.peer_ni;
+ rc = svc->srv_handler(obddev, svc, &request);
+ ptl_received_rpc(svc);
+ continue;
+ }
+
+ if (svc->srv_flags & SVC_LIST) {
+ struct ptlrpc_request *request;
+ svc->srv_flags = SVC_RUNNING;
+
+ spin_lock(&svc->srv_lock);
+ request = list_entry(svc->srv_reqs.next,
+ struct ptlrpc_request,
+ rq_list);
+ list_del(&request->rq_list);
+ spin_unlock(&svc->srv_lock);
+ rc = svc->srv_handler(obddev, svc, request);
+ continue;
+ }
+ CERROR("unknown break in service");
+ break;
+ }
+
+ svc->srv_thread = NULL;
+ svc->srv_flags = SVC_STOPPED;
+ wake_up(&svc->srv_ctl_waitq);
+ CERROR("svc %s: exiting\n", data->name);
+ return 0;
+}
+
+void ptlrpc_stop_thread(struct ptlrpc_service *svc)
+{
+ svc->srv_flags = SVC_STOPPING;
+
+ wake_up(&svc->srv_waitq);
+ wait_event_interruptible(svc->srv_ctl_waitq, (svc->srv_flags & SVC_STOPPED));
+}
+
+int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
+ char *name)
+{
+ struct ptlrpc_svc_data d;
+ int rc;
+ ENTRY;
+
+ d.dev = dev;
+ d.svc = svc;
+ d.name = name;
+
+ init_waitqueue_head(&svc->srv_waitq);
+
+ init_waitqueue_head(&svc->srv_ctl_waitq);
+ rc = kernel_thread(ptlrpc_main, (void *) &d,
+ CLONE_VM | CLONE_FS | CLONE_FILES);
+ if (rc < 0) {
+ CERROR("cannot start thread\n");
+ return -EINVAL;
+ }
+ wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING);
+
+ EXIT;
+ return 0;
+}
+
+
+int rpc_register_service(struct ptlrpc_service *service, char *uuid)
+{
+ struct lustre_peer peer;
+ int rc, i;
+
+ rc = kportal_uuid_to_peer(uuid, &peer);
+ if (rc != 0) {
+ CERROR("Invalid uuid \"%s\"\n", uuid);
+ return -EINVAL;
+ }
+
+ service->srv_ring_length = RPC_RING_LENGTH;
+ service->srv_me_active = 0;
+ service->srv_md_active = 0;
+
+ service->srv_id.addr_kind = PTL_ADDR_GID;
+ service->srv_id.gid = PTL_ID_ANY;
+ service->srv_id.rid = PTL_ID_ANY;
+
+ rc = PtlEQAlloc(peer.peer_ni, 128, server_request_callback,
+ service, &(service->srv_eq_h));
+
+ if (rc != PTL_OK) {
+ CERROR("PtlEQAlloc failed: %d\n", rc);
+ return rc;
+ }
+
+ /* Attach the leading ME on which we build the ring */
+ rc = PtlMEAttach(peer.peer_ni, service->srv_portal,
+ service->srv_id, 0, ~0, PTL_RETAIN,
+ &(service->srv_me_h[0]));
+
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ return rc;
+ }
+
+ for (i = 0; i < service->srv_ring_length; i++) {
+ OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);
+
+ if (service->srv_buf[i] == NULL) {
+ CERROR("no memory\n");
+ return -ENOMEM;
+ }
+
+ /* Insert additional ME's to the ring */
+ if (i > 0) {
+ rc = PtlMEInsert(service->srv_me_h[i-1],
+ service->srv_id, 0, ~0, PTL_RETAIN,
+ PTL_INS_AFTER,&(service->srv_me_h[i]));
+ service->srv_me_tail = i;
+
+ if (rc != PTL_OK) {
+ CERROR("PtlMEInsert failed: %d\n", rc);
+ return rc;
+ }
+ }
+
+ service->srv_ref_count[i] = 0;
+ service->srv_md[i].start = service->srv_buf[i];
+ service->srv_md[i].length = service->srv_buf_size;
+ service->srv_md[i].threshold = PTL_MD_THRESH_INF;
+ service->srv_md[i].options = PTL_MD_OP_PUT;
+ service->srv_md[i].user_ptr = service;
+ service->srv_md[i].eventq = service->srv_eq_h;
+
+ rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i],
+ PTL_RETAIN, &(service->srv_md_h[i]));
+
+ if (rc != PTL_OK) {
+ /* cleanup */
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ return rc;
+ }
+ }
+
+ return 0;
+}
+
+int rpc_unregister_service(struct ptlrpc_service *service)
+{
+ int rc, i;
+
+ for (i = 0; i < service->srv_ring_length; i++) {
+ rc = PtlMDUnlink(service->srv_md_h[i]);
+ if (rc)
+ CERROR("PtlMDUnlink failed: %d\n", rc);
+
+ rc = PtlMEUnlink(service->srv_me_h[i]);
+ if (rc)
+ CERROR("PtlMEUnlink failed: %d\n", rc);
+
+ OBD_FREE(service->srv_buf[i], service->srv_buf_size);
+ }
+
+ rc = PtlEQFree(service->srv_eq_h);
+ if (rc)
+ CERROR("PtlEQFree failed: %d\n", rc);
+
+ return 0;
+}
+