*/
static int request_out_callback(ptl_event_t *ev)
{
+ struct ptlrpc_request *req = ev->mem_desc.user_ptr;
ENTRY;
LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
LBUG();
}
+ /* this balances the atomic_inc in ptl_send_rpc */
+ ptlrpc_req_finished(req);
RETURN(1);
}
struct ptlrpc_service *service = rqbd->rqbd_service;
LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
-
+ LASSERT (ev->type == PTL_EVENT_PUT); /* we only enable puts */
+ LASSERT (atomic_read (&service->srv_nrqbds_receiving) > 0);
+ LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0);
+
if (ev->rlength != ev->mlength)
CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
ev->mlength, ev->rlength);
- if (ev->type == PTL_EVENT_PUT)
- wake_up(&service->srv_waitq);
+ if (ptl_is_valid_handle (&ev->unlinked_me))
+ {
+ /* This is the last request to be received into this
+ * request buffer. We don't bump the refcount, since the
+ * thread servicing this event is effectively taking over
+ * portals' reference.
+ */
+#warning ev->unlinked_me.nal_idx is not set properly in a callback
+ LASSERT (ev->unlinked_me.handle_idx == rqbd->rqbd_me_h.handle_idx);
+
+ if (atomic_dec_and_test (&service->srv_nrqbds_receiving)) /* we're off-air */
+ {
+ CERROR ("All request buffers busy\n");
+ /* we'll probably start dropping packets in portals soon */
+ }
+ }
else
- CERROR("Unexpected event type: %d\n", ev->type);
+ atomic_inc (&rqbd->rqbd_refcount); /* +1 ref for service thread */
+
+ wake_up(&service->srv_waitq);
return 0;
}