ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, bulk_source_eq,
bulk_sink_eq;
-static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL;
+static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL, *gmnal_nip = NULL;
/*
* Free the packet when it has gone out
*/
static int request_out_callback(ptl_event_t *ev)
{
+ struct ptlrpc_request *req = ev->mem_desc.user_ptr;
ENTRY;
LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
LBUG();
}
+ /* this balances the atomic_inc in ptl_send_rpc */
+ ptlrpc_req_finished(req);
RETURN(1);
}
if (req->rq_xid != ev->match_bits) {
CERROR("Reply packet for wrong request\n");
- LBUG();
+ LBUG();
}
if (ev->type == PTL_EVENT_PUT) {
int request_in_callback(ptl_event_t *ev)
{
- struct ptlrpc_service *service = ev->mem_desc.user_ptr;
+ struct ptlrpc_request_buffer_desc *rqbd = ev->mem_desc.user_ptr;
+ struct ptlrpc_service *service = rqbd->rqbd_service;
LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
+ LASSERT (ev->type == PTL_EVENT_PUT); /* we only enable puts */
+ LASSERT (atomic_read (&service->srv_nrqbds_receiving) > 0);
+ LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0);
if (ev->rlength != ev->mlength)
CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
ev->mlength, ev->rlength);
- if (ev->type == PTL_EVENT_PUT)
- wake_up(&service->srv_waitq);
+ if (ptl_is_valid_handle (&ev->unlinked_me))
+ {
+ /* This is the last request to be received into this
+ * request buffer. We don't bump the refcount, since the
+ * thread servicing this event is effectively taking over
+ * portals' reference.
+ */
+#warning ev->unlinked_me.nal_idx is not set properly in a callback
+ LASSERT (ev->unlinked_me.handle_idx == rqbd->rqbd_me_h.handle_idx);
+
+ if (atomic_dec_and_test (&service->srv_nrqbds_receiving)) /* we're off-air */
+ {
+ CERROR ("All request buffers busy\n");
+ /* we'll probably start dropping packets in portals soon */
+ }
+ }
else
- CERROR("Unexpected event type: %d\n", ev->type);
+ atomic_inc (&rqbd->rqbd_refcount); /* +1 ref for service thread */
+
+ wake_up(&service->srv_waitq);
return 0;
}
struct list_head *next;
ENTRY;
+ CDEBUG(D_NET, "got %s event %d\n",
+ (ev->type == PTL_EVENT_SENT) ? "SENT" :
+ (ev->type == PTL_EVENT_ACK) ? "ACK" : "UNEXPECTED", ev->type);
+
+ LASSERT (ev->type == PTL_EVENT_SENT || ev->type == PTL_EVENT_ACK);
+
+ LASSERT (atomic_read (&desc->bd_source_callback_count) > 0 &&
+ atomic_read (&desc->bd_source_callback_count) <= 2);
+
/* 1 fragment for each page always */
LASSERT (ev->mem_desc.niov == desc->bd_page_count);
-
- if (ev->type == PTL_EVENT_SENT) {
- CDEBUG(D_NET, "got SENT event\n");
- } else if (ev->type == PTL_EVENT_ACK) {
- CDEBUG(D_NET, "got ACK event\n");
-
+
+ if (atomic_dec_and_test (&desc->bd_source_callback_count)) {
list_for_each_safe(tmp, next, &desc->bd_page_list) {
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
-
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page,
+ bp_link);
+
if (bulk->bp_cb != NULL)
bulk->bp_cb(bulk);
}
wake_up(&desc->bd_waitq);
if (desc->bd_cb != NULL)
desc->bd_cb(desc, desc->bd_cb_data);
- } else {
- CERROR("Unexpected event type!\n");
- LBUG();
}
- RETURN(1);
+ RETURN(0);
}
static int bulk_sink_callback(ptl_event_t *ev)
LASSERT (ev->mem_desc.niov == desc->bd_page_count);
list_for_each_safe (tmp, next, &desc->bd_page_list) {
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page,
+ bp_link);
total += bulk->bp_buflen;
-
+
if (bulk->bp_cb != NULL)
bulk->bp_cb(bulk);
}
LASSERT (ev->mem_desc.length == total);
-
+
desc->bd_flags |= PTL_BULK_FL_RCVD;
wake_up(&desc->bd_waitq);
if (desc->bd_cb != NULL)
socknal_nip = inter_module_get_request("ksocknal_ni", "ksocknal");
qswnal_nip = inter_module_get_request("kqswnal_ni", "kqswnal");
- if (socknal_nip == NULL && qswnal_nip == NULL) {
- CERROR("get_ni failed: is a NAL module loaded?\n");
- return -EIO;
- }
+ gmnal_nip = inter_module_get_request("kgmnal_ni", "kgmnal");
/* Use the qswnal if it's there */
if (qswnal_nip != NULL)
ni = *qswnal_nip;
- else
+ else if (gmnal_nip != NULL)
+ ni = *gmnal_nip;
+ else if (socknal_nip != NULL)
ni = *socknal_nip;
+ else {
+ CERROR("get_ni failed: is a NAL module loaded?\n");
+ return -EIO;
+ }
rc = PtlEQAlloc(ni, 1024, request_out_callback, &request_out_eq);
if (rc != PTL_OK)
inter_module_put("kqswnal_ni");
if (socknal_nip != NULL)
inter_module_put("ksocknal_ni");
+ if (gmnal_nip != NULL)
+ inter_module_put("kgmnal_ni");
}