if (req->rq_xid != ev->match_bits) {
CERROR("Reply packet for wrong request\n");
- LBUG();
+ LBUG();
}
if (ev->type == PTL_EVENT_PUT) {
int request_in_callback(ptl_event_t *ev)
{
- struct ptlrpc_service *service = ev->mem_desc.user_ptr;
+ struct ptlrpc_request_buffer_desc *rqbd = ev->mem_desc.user_ptr;
+ struct ptlrpc_service *service = rqbd->rqbd_service;
LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
-
+
if (ev->rlength != ev->mlength)
CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
ev->mlength, ev->rlength);
struct list_head *next;
ENTRY;
+ CDEBUG(D_NET, "got %s event %d\n",
+ (ev->type == PTL_EVENT_SENT) ? "SENT" :
+ (ev->type == PTL_EVENT_ACK) ? "ACK" : "UNEXPECTED", ev->type);
+
+ LASSERT (ev->type == PTL_EVENT_SENT || ev->type == PTL_EVENT_ACK);
+
+ LASSERT (atomic_read (&desc->bd_source_callback_count) > 0 &&
+ atomic_read (&desc->bd_source_callback_count) <= 2);
+
/* 1 fragment for each page always */
- LASSERT (ev->mem_desc.niov == desc->b_page_count);
-
- if (ev->type == PTL_EVENT_SENT) {
- CDEBUG(D_NET, "got SENT event\n");
- } else if (ev->type == PTL_EVENT_ACK) {
- CDEBUG(D_NET, "got ACK event\n");
-
- list_for_each_safe(tmp, next, &desc->b_page_list) {
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
-
- if (bulk->b_cb != NULL)
- bulk->b_cb(bulk);
+ LASSERT (ev->mem_desc.niov == desc->bd_page_count);
+
+ if (atomic_dec_and_test (&desc->bd_source_callback_count)) {
+ list_for_each_safe(tmp, next, &desc->bd_page_list) {
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page,
+ bp_link);
+
+ if (bulk->bp_cb != NULL)
+ bulk->bp_cb(bulk);
}
- desc->b_flags |= PTL_BULK_FL_SENT;
- wake_up(&desc->b_waitq);
- if (desc->b_cb != NULL)
- desc->b_cb(desc, desc->b_cb_data);
- } else {
- CERROR("Unexpected event type!\n");
- LBUG();
+ desc->bd_flags |= PTL_BULK_FL_SENT;
+ wake_up(&desc->bd_waitq);
+ if (desc->bd_cb != NULL)
+ desc->bd_cb(desc, desc->bd_cb_data);
}
- RETURN(1);
+ RETURN(0);
}
static int bulk_sink_callback(ptl_event_t *ev)
/* used iovs */
LASSERT ((ev->mem_desc.options & PTL_MD_IOV) != 0);
/* 1 fragment for each page always */
- LASSERT (ev->mem_desc.niov == desc->b_page_count);
+ LASSERT (ev->mem_desc.niov == desc->bd_page_count);
+
+ list_for_each_safe (tmp, next, &desc->bd_page_list) {
+ bulk = list_entry(tmp, struct ptlrpc_bulk_page,
+ bp_link);
- list_for_each_safe (tmp, next, &desc->b_page_list) {
- bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+ total += bulk->bp_buflen;
- total += bulk->b_buflen;
-
- if (bulk->b_cb != NULL)
- bulk->b_cb(bulk);
+ if (bulk->bp_cb != NULL)
+ bulk->bp_cb(bulk);
}
LASSERT (ev->mem_desc.length == total);
-
- desc->b_flags |= PTL_BULK_FL_RCVD;
- wake_up(&desc->b_waitq);
- if (desc->b_cb != NULL)
- desc->b_cb(desc, desc->b_cb_data);
+
+ desc->bd_flags |= PTL_BULK_FL_RCVD;
+ wake_up(&desc->bd_waitq);
+ if (desc->bd_cb != NULL)
+ desc->bd_cb(desc, desc->bd_cb_data);
} else {
CERROR("Unexpected event type!\n");
LBUG();