#endif
#include <linux/obd_class.h>
#include <linux/lustre_net.h>
+#include "ptlrpc_internal.h"
struct ptlrpc_ni ptlrpc_interfaces[NAL_MAX_NR];
int ptlrpc_ninterfaces;
unsigned long flags;
ENTRY;
- LASSERT (ev->type == PTL_EVENT_SENT ||
+ LASSERT (ev->type == PTL_EVENT_SEND_END ||
ev->type == PTL_EVENT_UNLINK);
LASSERT (ev->unlinked);
- DEBUG_REQ((ev->status == PTL_OK) ? D_NET : D_ERROR, req,
- "type %d, status %d", ev->type, ev->status);
+ DEBUG_REQ((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR, req,
+ "type %d, status %d", ev->type, ev->ni_fail_type);
if (ev->type == PTL_EVENT_UNLINK ||
- ev->status != PTL_OK) {
+ ev->ni_fail_type != PTL_NI_OK) {
/* Failed send: make it seem like the reply timed out, just
* like failing sends in client.c does currently... */
unsigned long flags;
ENTRY;
- LASSERT (ev->type == PTL_EVENT_PUT ||
+ LASSERT (ev->type == PTL_EVENT_PUT_END ||
ev->type == PTL_EVENT_UNLINK);
LASSERT (ev->unlinked);
LASSERT (ev->mem_desc.start == req->rq_repmsg);
LASSERT (ev->offset == 0);
LASSERT (ev->mlength <= req->rq_replen);
- DEBUG_REQ((ev->status == PTL_OK) ? D_NET : D_ERROR, req,
- "type %d, status %d", ev->type, ev->status);
+ DEBUG_REQ((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR, req,
+ "type %d, status %d", ev->type, ev->ni_fail_type);
spin_lock_irqsave (&req->rq_lock, flags);
LASSERT (req->rq_receiving_reply);
req->rq_receiving_reply = 0;
- if (ev->type == PTL_EVENT_PUT &&
- ev->status == PTL_OK) {
+ if (ev->type == PTL_EVENT_PUT_END &&
+ ev->ni_fail_type == PTL_NI_OK) {
req->rq_replied = 1;
req->rq_nob_received = ev->mlength;
}
ENTRY;
LASSERT ((desc->bd_type == BULK_PUT_SINK &&
- ev->type == PTL_EVENT_PUT) ||
+ ev->type == PTL_EVENT_PUT_END) ||
(desc->bd_type == BULK_GET_SOURCE &&
- ev->type == PTL_EVENT_GET) ||
+ ev->type == PTL_EVENT_GET_END) ||
ev->type == PTL_EVENT_UNLINK);
LASSERT (ev->unlinked);
- CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
+ CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR,
"event type %d, status %d, desc %p\n",
- ev->type, ev->status, desc);
+ ev->type, ev->ni_fail_type, desc);
spin_lock_irqsave (&desc->bd_lock, flags);
desc->bd_network_rw = 0;
if (ev->type != PTL_EVENT_UNLINK &&
- ev->status == PTL_OK) {
+ ev->ni_fail_type == PTL_NI_OK) {
desc->bd_success = 1;
desc->bd_nob_transferred = ev->mlength;
}
long flags;
ENTRY;
- LASSERT (ev->type == PTL_EVENT_PUT ||
+ LASSERT (ev->type == PTL_EVENT_PUT_END ||
ev->type == PTL_EVENT_UNLINK);
LASSERT ((char *)ev->mem_desc.start >= rqbd->rqbd_buffer);
LASSERT ((char *)ev->mem_desc.start + ev->offset + ev->mlength <=
rqbd->rqbd_buffer + service->srv_buf_size);
- CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
+ CDEBUG((ev->ni_fail_type == PTL_OK) ? D_NET : D_ERROR,
"event type %d, status %d, service %s\n",
- ev->type, ev->status, service->srv_name);
+ ev->type, ev->ni_fail_type, service->srv_name);
if (ev->unlinked) {
/* If this is the last request message to fit in the
req = &rqbd->rqbd_req;
memset(req, 0, sizeof (*req));
} else {
- LASSERT (ev->type == PTL_EVENT_PUT);
- if (ev->status != PTL_OK) {
+ LASSERT (ev->type == PTL_EVENT_PUT_END);
+ if (ev->ni_fail_type != PTL_NI_OK) {
/* We moaned above already... */
return;
}
* size to non-zero if this was a successful receive. */
req->rq_xid = ev->match_bits;
req->rq_reqmsg = ev->mem_desc.start + ev->offset;
- if (ev->type == PTL_EVENT_PUT &&
- ev->status == PTL_OK)
+ if (ev->type == PTL_EVENT_PUT_END &&
+ ev->ni_fail_type == PTL_NI_OK)
req->rq_reqlen = ev->mlength;
- req->rq_arrival_time = ev->arrival_time;
+ do_gettimeofday(&req->rq_arrival_time);
req->rq_peer.peer_nid = ev->initiator.nid;
req->rq_peer.peer_ni = rqbd->rqbd_srv_ni->sni_ni;
req->rq_rqbd = rqbd;
unsigned long flags;
ENTRY;
- LASSERT (ev->type == PTL_EVENT_SENT ||
+ LASSERT (ev->type == PTL_EVENT_SEND_END ||
ev->type == PTL_EVENT_ACK ||
ev->type == PTL_EVENT_UNLINK);
unsigned long flags;
ENTRY;
- LASSERT (ev->type == PTL_EVENT_SENT ||
+ LASSERT (ev->type == PTL_EVENT_SEND_END ||
ev->type == PTL_EVENT_UNLINK ||
(desc->bd_type == BULK_PUT_SOURCE &&
ev->type == PTL_EVENT_ACK) ||
(desc->bd_type == BULK_GET_SINK &&
- ev->type == PTL_EVENT_REPLY));
+ ev->type == PTL_EVENT_REPLY_END));
- CDEBUG((ev->status == PTL_OK) ? D_NET : D_ERROR,
+ CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR,
"event type %d, status %d, desc %p\n",
- ev->type, ev->status, desc);
+ ev->type, ev->ni_fail_type, desc);
spin_lock_irqsave (&desc->bd_lock, flags);
if ((ev->type == PTL_EVENT_ACK ||
- ev->type == PTL_EVENT_REPLY) &&
- ev->status == PTL_OK) {
+ ev->type == PTL_EVENT_REPLY_END) &&
+ ev->ni_fail_type == PTL_NI_OK) {
/* We heard back from the peer, so even if we get this
* before the SENT event (oh yes we can), we know we
* read/wrote the peer buffer and how much... */
int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
{
struct ptlrpc_ni *pni;
- struct lustre_peer lpeer;
+ ptl_nid_t peer_nid;
+ ptl_handle_ni_t peer_ni;
int i;
- int rc = lustre_uuid_to_peer (uuid->uuid, &lpeer);
-
+ char str[20];
+ int rc = lustre_uuid_to_peer(uuid->uuid,
+ &peer_ni, &peer_nid);
if (rc != 0)
RETURN (rc);
for (i = 0; i < ptlrpc_ninterfaces; i++) {
pni = &ptlrpc_interfaces[i];
- if (!memcmp(&lpeer.peer_ni, &pni->pni_ni_h,
- sizeof (lpeer.peer_ni))) {
- peer->peer_nid = lpeer.peer_nid;
+ if (!memcmp(&peer_ni, &pni->pni_ni_h,
+ sizeof (peer_ni))) {
+ peer->peer_nid = peer_nid;
peer->peer_ni = pni;
return (0);
}
}
- CERROR("Can't find ptlrpc interface for "LPX64" ni handle %08lx."LPX64"\n",
- lpeer.peer_nid, lpeer.peer_ni.nal_idx, lpeer.peer_ni.cookie);
+ PtlSnprintHandle(str, sizeof(str), peer_ni);
+ CERROR("Can't find ptlrpc interface for "LPX64" ni %s\n",
+ peer_nid, str);
return (-ENOENT);
}
kportal_put_ni (pni->pni_number);
return;
- case PTL_EQ_INUSE:
+ case PTL_EQ_IN_USE:
if (retries != 0)
CWARN("Event queue for %s still busy\n",
pni->pni_name);
int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
{
int rc;
+ char str[20];
ptl_handle_ni_t *nip = kportal_get_ni (number);
if (nip == NULL) {
return (-ENOENT);
}
- CDEBUG (D_NET, "init %d %s: nal_idx %ld\n", number, name, nip->nal_idx);
+ PtlSnprintHandle(str, sizeof(str), *nip);
+ CDEBUG (D_NET, "init %d %s: %s\n", number, name, str);
pni->pni_name = name;
pni->pni_number = number;
pni->pni_ni_h = *nip;
- pni->pni_eq_h = PTL_HANDLE_NONE;
+ pni->pni_eq_h = PTL_INVALID_HANDLE;
-#ifdef __KERNEL__
- /* kernel: portals calls the callback when the event is added to the
- * queue, so we don't care if we lose events */
- rc = PtlEQAlloc(pni->pni_ni_h, 1024, ptlrpc_master_callback,
+ rc = PtlEQAlloc(pni->pni_ni_h, PTLRPC_NUM_EQ, PTLRPC_EQ_CALLBACK,
&pni->pni_eq_h);
-#else
- /* liblustre: no asynchronous callback and allocate a nice big event
- * queue so we don't drop any events... */
- rc = PtlEQAlloc(pni->pni_ni_h, 10240, NULL, &pni->pni_eq_h);
-#endif
+
if (rc != PTL_OK)
GOTO (fail, rc = -ENOMEM);
{
ptl_event_t ev;
int rc;
+ int i;
ENTRY;
- if (timeout) {
- rc = PtlEQWait_timeout(ptlrpc_interfaces[0].pni_eq_h, &ev, timeout);
- } else {
- rc = PtlEQGet (ptlrpc_interfaces[0].pni_eq_h, &ev);
- }
+ rc = PtlEQPoll(&ptlrpc_interfaces[0].pni_eq_h, 1, timeout * 1000,
+ &ev, &i);
if (rc == PTL_EQ_EMPTY)
RETURN(0);
LASSERT (rc == PTL_EQ_DROPPED || rc == PTL_OK);
-#ifndef __KERNEL__
/* liblustre: no asynch callback so we can't affort to miss any
* events... */
if (rc == PTL_EQ_DROPPED) {
}
ptlrpc_master_callback (&ev);
-#endif
RETURN(1);
}
+int liblustre_waiting = 0;
+
int
liblustre_wait_event (int timeout)
{
struct liblustre_wait_callback *llwc;
int found_something = 0;
- /* First check for any new events */
- if (liblustre_check_events(0))
- found_something = 1;
+ /* single threaded recursion check... */
+ liblustre_waiting = 1;
- /* Now give all registered callbacks a bite at the cherry */
- list_for_each(tmp, &liblustre_wait_callbacks) {
- llwc = list_entry(tmp, struct liblustre_wait_callback,
- llwc_list);
-
- if (llwc->llwc_fn(llwc->llwc_arg))
+ for (;;) {
+ /* Deal with all pending events */
+ while (liblustre_check_events(0))
found_something = 1;
- }
- /* return to caller if something happened */
- if (found_something)
- return 1;
-
- /* block for an event, returning immediately on timeout */
- if (!liblustre_check_events(timeout))
- return 0;
-
- /* an event occurred; let all registered callbacks progress... */
- list_for_each(tmp, &liblustre_wait_callbacks) {
- llwc = list_entry(tmp, struct liblustre_wait_callback,
- llwc_list);
+ /* Give all registered callbacks a bite at the cherry */
+ list_for_each(tmp, &liblustre_wait_callbacks) {
+ llwc = list_entry(tmp, struct liblustre_wait_callback,
+ llwc_list);
- if (llwc->llwc_fn(llwc->llwc_arg))
- found_something = 1;
+ if (llwc->llwc_fn(llwc->llwc_arg))
+ found_something = 1;
+ }
+
+ if (found_something || timeout == 0)
+ break;
+
+ /* Nothing so far, but I'm allowed to block... */
+ found_something = liblustre_check_events(timeout);
+ if (!found_something) /* still nothing */
+ break; /* I timed out */
}
- /* ...and tell caller something happened */
- return 1;
+ liblustre_waiting = 0;
+
+ return found_something;
}
-#endif
+
+static int cray_portals_callback(ptl_event_t *ev)
+{
+ /* We get a callback from the client Cray portals implementation
+ * whenever anyone calls PtlEQPoll(), and an event queue with a
+ * callback handler has outstanding events.
+ *
+ * If it's not liblustre calling PtlEQPoll(), this lets us know we
+ * have outstanding events which we handle with
+ * liblustre_wait_event().
+ *
+ * Otherwise, we're already eagerly consuming events and we'd
+ * handle events out of order if we recursed. */
+ if (liblustre_waiting)
+ return;
+
+ liblustre_wait_event(0);
+}
+#endif /* __KERNEL__ */
int ptlrpc_init_portals(void)
{