#define DEBUG_SUBSYSTEM S_RPC
+#ifdef __KERNEL__
#include <linux/module.h>
-#include <linux/obd_support.h>
+#else
+#include <liblustre.h>
+#endif
+#include <linux/obd_class.h>
#include <linux/lustre_net.h>
-ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq,
- bulk_put_source_eq, bulk_put_sink_eq,
- bulk_get_source_eq, bulk_get_sink_eq;
-static const ptl_handle_ni_t *socknal_nip = NULL, *toenal_nip = NULL,
- *qswnal_nip = NULL, *gmnal_nip = NULL;
+struct ptlrpc_ni ptlrpc_interfaces[NAL_MAX_NR];
+int ptlrpc_ninterfaces;
/*
* Free the packet when it has gone out
if (ev->type == PTL_EVENT_SENT) {
OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
+ } else if (ev->type == PTL_EVENT_ACK) {
+ struct ptlrpc_request *req = ev->mem_desc.user_ptr;
+ if (req->rq_flags & PTL_RPC_FL_WANT_ACK) {
+ req->rq_flags &= ~PTL_RPC_FL_WANT_ACK;
+ wake_up(&req->rq_wait_for_rep);
+ } else {
+ DEBUG_REQ(D_ERROR, req,
+ "ack received for reply, not wanted");
+ }
} else {
- // XXX make sure we understand all events, including ACK's
+ // XXX make sure we understand all events
CERROR("Unknown event %d\n", ev->type);
LBUG();
}
/*
* Wake up the thread waiting for the reply once it comes in.
*/
-static int reply_in_callback(ptl_event_t *ev)
+int reply_in_callback(ptl_event_t *ev)
{
struct ptlrpc_request *req = ev->mem_desc.user_ptr;
ENTRY;
int request_in_callback(ptl_event_t *ev)
{
struct ptlrpc_request_buffer_desc *rqbd = ev->mem_desc.user_ptr;
- struct ptlrpc_service *service = rqbd->rqbd_service;
+ struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
+ struct ptlrpc_service *service = srv_ni->sni_service;
/* requests always contiguous */
LASSERT((ev->mem_desc.options & PTL_MD_IOV) == 0);
/* we only enable puts */
LASSERT(ev->type == PTL_EVENT_PUT);
- LASSERT(atomic_read(&service->srv_nrqbds_receiving) > 0);
+ LASSERT(atomic_read(&srv_ni->sni_nrqbds_receiving) > 0);
LASSERT(atomic_read(&rqbd->rqbd_refcount) > 0);
if (ev->rlength != ev->mlength)
/* we're off the air */
/* we'll probably start dropping packets in portals soon */
- if (atomic_dec_and_test(&service->srv_nrqbds_receiving))
+ if (atomic_dec_and_test(&srv_ni->sni_nrqbds_receiving))
CERROR("All request buffers busy\n");
} else {
/* +1 ref for service thread */
RETURN(0);
}
-int ptlrpc_init_portals(void)
+int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
{
- int rc;
- ptl_handle_ni_t ni;
-
- /* Use the qswnal if it's there */
- if ((qswnal_nip = inter_module_get("kqswnal_ni")) != NULL)
- ni = *qswnal_nip;
- else if ((gmnal_nip = inter_module_get("kgmnal_ni")) != NULL)
- ni = *gmnal_nip;
- else if ((socknal_nip = inter_module_get("ksocknal_ni")) != NULL)
- ni = *socknal_nip;
- else if ((toenal_nip = inter_module_get("ktoenal_ni")) != NULL)
- ni = *toenal_nip;
- else {
- CERROR("get_ni failed: is a NAL module loaded?\n");
- return -EIO;
+ struct ptlrpc_ni *pni;
+ struct lustre_peer lpeer;
+ int i;
+ int rc = lustre_uuid_to_peer (uuid->uuid, &lpeer);
+
+ if (rc != 0)
+ RETURN (rc);
+
+ for (i = 0; i < ptlrpc_ninterfaces; i++) {
+ pni = &ptlrpc_interfaces[i];
+
+ if (!memcmp (&lpeer.peer_ni, &pni->pni_ni_h,
+ sizeof (lpeer.peer_ni))) {
+ peer->peer_nid = lpeer.peer_nid;
+ peer->peer_ni = pni;
+ return (0);
+ }
}
+
+ CERROR ("Can't find ptlrpc interface for "LPX64" ni handle %08lx %08lx\n",
+ lpeer.peer_nid, lpeer.peer_ni.nal_idx, lpeer.peer_ni.handle_idx);
+ return (-ENOENT);
+}
- rc = PtlEQAlloc(ni, 1024, request_out_callback, &request_out_eq);
- if (rc != PTL_OK)
- CERROR("PtlEQAlloc failed: %d\n", rc);
+void ptlrpc_ni_fini (struct ptlrpc_ni *pni)
+{
+ PtlEQFree(pni->pni_request_out_eq_h);
+ PtlEQFree(pni->pni_reply_out_eq_h);
+ PtlEQFree(pni->pni_reply_in_eq_h);
+ PtlEQFree(pni->pni_bulk_put_source_eq_h);
+ PtlEQFree(pni->pni_bulk_put_sink_eq_h);
+ PtlEQFree(pni->pni_bulk_get_source_eq_h);
+ PtlEQFree(pni->pni_bulk_get_sink_eq_h);
+
+ inter_module_put(pni->pni_name);
+}
- rc = PtlEQAlloc(ni, 1024, reply_out_callback, &reply_out_eq);
- if (rc != PTL_OK)
- CERROR("PtlEQAlloc failed: %d\n", rc);
+int ptlrpc_ni_init (char *name, struct ptlrpc_ni *pni)
+{
+ int rc;
+ ptl_handle_ni_t *nip;
- rc = PtlEQAlloc(ni, 1024, reply_in_callback, &reply_in_eq);
+ nip = (ptl_handle_ni_t *)inter_module_get (name);
+ if (nip == NULL) {
+ CDEBUG (D_NET, "Network interface %s not loaded\n", name);
+ return (-ENOENT);
+ }
+
+ CDEBUG (D_NET, "init %s: nal_idx %ld\n", name, nip->nal_idx);
+
+ pni->pni_name = name;
+ pni->pni_ni_h = *nip;
+
+ ptl_set_inv_handle (&pni->pni_request_out_eq_h);
+ ptl_set_inv_handle (&pni->pni_reply_out_eq_h);
+ ptl_set_inv_handle (&pni->pni_reply_in_eq_h);
+ ptl_set_inv_handle (&pni->pni_bulk_put_source_eq_h);
+ ptl_set_inv_handle (&pni->pni_bulk_put_sink_eq_h);
+ ptl_set_inv_handle (&pni->pni_bulk_get_source_eq_h);
+ ptl_set_inv_handle (&pni->pni_bulk_get_sink_eq_h);
+
+ /* NB We never actually PtlEQGet() out of these events queues since
+ * we're only interested in the event callback, so we can just let
+ * them wrap. Their sizes aren't a big deal, apart from providing
+ * a little history for debugging... */
+
+ rc = PtlEQAlloc(pni->pni_ni_h, 1024, request_out_callback,
+ &pni->pni_request_out_eq_h);
if (rc != PTL_OK)
- CERROR("PtlEQAlloc failed: %d\n", rc);
-
- rc = PtlEQAlloc(ni, 1024, bulk_put_source_callback,
- &bulk_put_source_eq);
+ GOTO (fail, rc = -ENOMEM);
+
+ rc = PtlEQAlloc(pni->pni_ni_h, 1024, reply_out_callback,
+ &pni->pni_reply_out_eq_h);
if (rc != PTL_OK)
- CERROR("PtlEQAlloc failed: %d\n", rc);
-
- rc = PtlEQAlloc(ni, 1024, bulk_put_sink_callback, &bulk_put_sink_eq);
+ GOTO (fail, rc = -ENOMEM);
+
+ rc = PtlEQAlloc(pni->pni_ni_h, 1024, reply_in_callback,
+ &pni->pni_reply_in_eq_h);
if (rc != PTL_OK)
- CERROR("PtlEQAlloc failed: %d\n", rc);
-
- rc = PtlEQAlloc(ni, 1024, bulk_get_source_callback,
- &bulk_get_source_eq);
+ GOTO (fail, rc = -ENOMEM);
+
+ rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_put_source_callback,
+ &pni->pni_bulk_put_source_eq_h);
if (rc != PTL_OK)
- CERROR("PtlEQAlloc failed: %d\n", rc);
-
- rc = PtlEQAlloc(ni, 1024, bulk_get_sink_callback, &bulk_get_sink_eq);
+ GOTO (fail, rc = -ENOMEM);
+
+ rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_put_sink_callback,
+ &pni->pni_bulk_put_sink_eq_h);
if (rc != PTL_OK)
- CERROR("PtlEQAlloc failed: %d\n", rc);
+ GOTO (fail, rc = -ENOMEM);
+
+ rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_get_source_callback,
+ &pni->pni_bulk_get_source_eq_h);
+ if (rc != PTL_OK)
+ GOTO (fail, rc = -ENOMEM);
+
+ rc = PtlEQAlloc(pni->pni_ni_h, 1024, bulk_get_sink_callback,
+ &pni->pni_bulk_get_sink_eq_h);
+ if (rc != PTL_OK)
+ GOTO (fail, rc = -ENOMEM);
+
+ return (0);
+ fail:
+ CERROR ("Failed to initialise network interface %s: %d\n",
+ name, rc);
+
+ /* OK to do complete teardown since we invalidated the handles above... */
+ ptlrpc_ni_fini (pni);
+ return (rc);
+}
- return rc;
+int ptlrpc_init_portals(void)
+{
+ /* Add new portals network interface names here.
+ * Order is irrelevent! */
+ char *ni_names[] = { "kqswnal_ni",
+ "kgmnal_ni",
+ "ksocknal_ni",
+ "ktoenal_ni",
+ "tcpnal_ni",
+ NULL };
+ int rc;
+ int i;
+
+ LASSERT (ptlrpc_ninterfaces == 0);
+
+ for (i = 0; ni_names[i] != NULL; i++) {
+ LASSERT (ptlrpc_ninterfaces <
+ sizeof (ptlrpc_interfaces)/sizeof (ptlrpc_interfaces[0]));
+
+ rc = ptlrpc_ni_init (ni_names[i],
+ &ptlrpc_interfaces[ptlrpc_ninterfaces]);
+ if (rc == 0)
+ ptlrpc_ninterfaces++;
+ }
+
+ if (ptlrpc_ninterfaces == 0) {
+ CERROR("network initialisation failed: is a NAL module loaded?\n");
+ return -EIO;
+ }
+ return 0;
}
void ptlrpc_exit_portals(void)
{
- PtlEQFree(request_out_eq);
- PtlEQFree(reply_out_eq);
- PtlEQFree(reply_in_eq);
- PtlEQFree(bulk_put_source_eq);
- PtlEQFree(bulk_put_sink_eq);
- PtlEQFree(bulk_get_source_eq);
- PtlEQFree(bulk_get_sink_eq);
-
- if (qswnal_nip != NULL)
- inter_module_put("kqswnal_ni");
- if (socknal_nip != NULL)
- inter_module_put("ksocknal_ni");
- if (gmnal_nip != NULL)
- inter_module_put("kgmnal_ni");
- if (toenal_nip != NULL)
- inter_module_put("ktoenal_ni");
+ while (ptlrpc_ninterfaces > 0)
+ ptlrpc_ni_fini (&ptlrpc_interfaces[--ptlrpc_ninterfaces]);
}