#include <linux/lustre_net.h>
#include "ptlrpc_internal.h"
+#if !defined(__KERNEL__) && defined(CRAY_PORTALS)
+/* forward ref in events.c */
+static void cray_portals_callback(ptl_event_t *ev);
+#endif
+
+
struct ptlrpc_ni ptlrpc_interfaces[NAL_MAX_NR];
int ptlrpc_ninterfaces;
struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
struct ptlrpc_service *service = srv_ni->sni_service;
struct ptlrpc_request *req;
- long flags;
+ char str[PTL_NALFMT_SIZE];
+ unsigned long flags;
ENTRY;
LASSERT (ev->type == PTL_EVENT_PUT_END ||
OBD_ALLOC_GFP(req, sizeof(*req), GFP_ATOMIC);
if (req == NULL) {
CERROR("Can't allocate incoming request descriptor: "
- "Dropping %s RPC from "LPX64"\n",
- service->srv_name, ev->initiator.nid);
+ "Dropping %s RPC from %s\n",
+ service->srv_name,
+ portals_nid2str(srv_ni->sni_ni->pni_number,
+ ev->initiator.nid, str));
return;
}
}
EXIT;
}
-static int ptlrpc_master_callback(ptl_event_t *ev)
+static void ptlrpc_master_callback(ptl_event_t *ev)
{
struct ptlrpc_cb_id *cbid = ev->mem_desc.user_ptr;
void (*callback)(ptl_event_t *ev) = cbid->cbid_fn;
/* Honestly, it's best to find out early. */
- LASSERT (cbid->cbid_arg != (void *)0x5a5a5a5a5a5a5a5a);
+ LASSERT (cbid->cbid_arg != LP_POISON);
LASSERT (callback == request_out_callback ||
callback == reply_in_callback ||
callback == client_bulk_callback ||
callback == server_bulk_callback);
callback (ev);
- return (0);
}
int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
{
struct ptlrpc_ni *pni;
+ __u32 peer_nal;
ptl_nid_t peer_nid;
- ptl_handle_ni_t peer_ni;
int i;
- char str[20];
+ char str[PTL_NALFMT_SIZE];
int rc = lustre_uuid_to_peer(uuid->uuid,
- &peer_ni, &peer_nid);
+ &peer_nal, &peer_nid);
if (rc != 0)
RETURN (rc);
for (i = 0; i < ptlrpc_ninterfaces; i++) {
pni = &ptlrpc_interfaces[i];
- if (!memcmp(&peer_ni, &pni->pni_ni_h,
- sizeof (peer_ni))) {
+ if (pni->pni_number == peer_nal) {
peer->peer_nid = peer_nid;
peer->peer_ni = pni;
return (0);
}
}
- PtlSnprintHandle(str, sizeof(str), peer_ni);
- CERROR("Can't find ptlrpc interface for "LPX64" ni %s\n",
- peer_nid, str);
+ CERROR("Can't find ptlrpc interface for NAL %d, NID %s\n",
+ peer_nal, portals_nid2str(peer_nal, peer_nid, str));
return (-ENOENT);
}
LBUG();
case PTL_OK:
- kportal_put_ni (pni->pni_number);
+ PtlNIFini(pni->pni_ni_h);
return;
case PTL_EQ_IN_USE:
{
int rc;
char str[20];
- ptl_handle_ni_t *nip = kportal_get_ni (number);
+ ptl_handle_ni_t nih;
- if (nip == NULL) {
- CDEBUG (D_NET, "Network interface %s not loaded\n", name);
+ /* We're not passing any limits yet... */
+ rc = PtlNIInit(number, 0, NULL, NULL, &nih);
+ if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
+ CDEBUG (D_NET, "Can't init network interface %s: %d\n",
+ name, rc);
return (-ENOENT);
}
- PtlSnprintHandle(str, sizeof(str), *nip);
+ PtlSnprintHandle(str, sizeof(str), nih);
CDEBUG (D_NET, "init %d %s: %s\n", number, name, str);
pni->pni_name = name;
pni->pni_number = number;
- pni->pni_ni_h = *nip;
+ pni->pni_ni_h = nih;
pni->pni_eq_h = PTL_INVALID_HANDLE;
- rc = PtlEQAlloc(pni->pni_ni_h, PTLRPC_NUM_EQ, PTLRPC_EQ_CALLBACK,
+ /* CAVEAT EMPTOR: how we process portals events is _radically_
+ * different depending on... */
+#ifdef __KERNEL__
+ /* kernel portals calls our master callback when events are added to
+ * the event queue. In fact lustre never pulls events off this queue,
+ * so it's only sized for some debug history. */
+ rc = PtlEQAlloc(pni->pni_ni_h, 1024, ptlrpc_master_callback,
&pni->pni_eq_h);
-
+#else
+ /* liblustre calls the master callback when it removes events from the
+ * event queue. The event queue has to be big enough not to drop
+ * anything */
+# if CRAY_PORTALS
+ /* cray portals implements a non-standard callback to notify us there
+ * are buffered events even when the app is not doing a filesystem
+ * call. */
+ rc = PtlEQAlloc(pni->pni_ni_h, 10240, cray_portals_callback,
+ &pni->pni_eq_h);
+# else
+ rc = PtlEQAlloc(pni->pni_ni_h, 10240, PTL_EQ_HANDLER_NONE,
+ &pni->pni_eq_h);
+# endif
+#endif
if (rc != PTL_OK)
GOTO (fail, rc = -ENOMEM);
return found_something;
}
-static int cray_portals_callback(ptl_event_t *ev)
+#ifdef CRAY_PORTALS
+static void cray_portals_callback(ptl_event_t *ev)
{
/* We get a callback from the client Cray portals implementation
* whenever anyone calls PtlEQPoll(), and an event queue with a
- * callback handler has outstanding events.
+ * callback handler has outstanding events.
*
* If it's not liblustre calling PtlEQPoll(), this lets us know we
* have outstanding events which we handle with
*
* Otherwise, we're already eagerly consuming events and we'd
* handle events out of order if we recursed. */
- if (liblustre_waiting)
- return;
-
- liblustre_wait_event(0);
+ if (!liblustre_waiting)
+ liblustre_wait_event(0);
}
+#endif
#endif /* __KERNEL__ */
+int ptlrpc_default_nal(void)
+{
+ if (ptlrpc_ninterfaces == 0)
+ return (-ENOENT);
+
+ return (ptlrpc_interfaces[0].pni_number);
+}
+
int ptlrpc_init_portals(void)
{
/* Add new portals network interfaces here.
{GMNAL, "gmnal"},
{IBNAL, "ibnal"},
{TCPNAL, "tcpnal"},
- {SCIMACNAL, "scimacnal"}};
+ {CRAY_KB_ERNAL, "cray_kb_ernal"}};
int rc;
int i;