+ spin_unlock_irqrestore (&desc->bd_lock, flags);
+ RETURN(0);
+}
+
+struct ptlrpc_bulk_desc ptlrpc_bad_desc;
+ptl_event_t ptlrpc_bad_event;
+
+static int bulk_put_sink_callback(ptl_event_t *ev)
+{
+ struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
+ unsigned long flags;
+ ENTRY;
+
+ LASSERT(ev->type == PTL_EVENT_PUT);
+
+ /* used iovs */
+ LASSERT((ev->mem_desc.options & (PTL_MD_IOV | PTL_MD_KIOV)) ==
+ PTL_MD_KIOV);
+ /* Honestly, it's best to find out early. */
+ if (desc->bd_page_count == 0x5a5a5a5a ||
+ desc->bd_page_count != ev->mem_desc.niov ||
+ ev->mem_desc.start != &desc->bd_iov) {
+ /* not guaranteed (don't LASSERT) but good for this bug hunt */
+ ptlrpc_bad_event = *ev;
+ ptlrpc_bad_desc = *desc;
+ CERROR ("XXX ev %p type %d portal %d match "LPX64", seq %ld\n",
+ ev, ev->type, ev->portal, ev->match_bits, ev->sequence);
+ CERROR ("XXX desc %p, export %p import %p gen %d "
+ " portal %d\n",
+ desc, desc->bd_export,
+ desc->bd_import, desc->bd_import_generation,
+ desc->bd_portal);
+ RETURN (0);
+ }
+
+ LASSERT(desc->bd_page_count != 0x5a5a5a5a);
+ /* 1 fragment for each page always */
+ LASSERT(ev->mem_desc.niov == desc->bd_page_count);
+ LASSERT(ev->match_bits == desc->bd_req->rq_xid);
+
+ /* peer must put with zero offset */
+ if (ev->offset != 0) {
+ /* Bug 1190: handle this as a protocol failure */
+ CERROR ("Bad offset %d\n", ev->offset);
+ LBUG ();
+ }
+
+ /* No check for total # bytes; this could be a short read */
+
+ spin_lock_irqsave (&desc->bd_lock, flags);
+ desc->bd_network_rw = 0;
+ desc->bd_complete = 1;
+ if (desc->bd_req->rq_set != NULL)
+ wake_up (&desc->bd_req->rq_set->set_waitq);
+ else
+ wake_up (&desc->bd_req->rq_reply_waitq);
+ spin_unlock_irqrestore (&desc->bd_lock, flags);
+