#include <portals/lib-p30.h>
-int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
+void
+lib_enq_event_locked (nal_cb_t *nal, void *private,
+ lib_eq_t *eq, ptl_event_t *ev)
{
- lib_md_t *md;
- lib_eq_t *eq;
+ ptl_event_t *eq_slot;
int rc;
+
+ ev->sequence = eq->sequence++; /* Allocate the next queue slot */
+
+ /* size must be a power of 2 to handle a wrapped sequence # */
+ LASSERT (eq->size != 0 &&
+ eq->size == LOWEST_BIT_SET (eq->size));
+ eq_slot = eq->base + (ev->sequence & (eq->size - 1));
+
+ /* Copy the event into the allocated slot, ensuring all the rest of
+ * the event's contents have been copied _before_ the sequence
+ * number gets updated. A processes 'getting' an event waits on
+ * the next queue slot's sequence to be 'new'. When it is, _all_
+ * other event fields had better be consistent. I assert
+ * 'sequence' is the last member, so I only need a 2 stage copy. */
+
+ LASSERT(sizeof (ptl_event_t) ==
+ offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
+
+ rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
+ offsetof (ptl_event_t, sequence));
+ LASSERT (rc == PTL_OK);
+
+#ifdef __KERNEL__
+ barrier();
+#endif
+ /* Updating the sequence number is what makes the event 'new' NB if
+ * the cb_write below isn't atomic, this could cause a race with
+ * PtlEQGet */
+ rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
+ (void *)&ev->sequence,sizeof (ev->sequence));
+ LASSERT (rc == PTL_OK);
+
+#ifdef __KERNEL__
+ barrier();
+#endif
+
+ if (nal->cb_callback != NULL)
+ nal->cb_callback(nal, private, eq, ev);
+ else if (eq->event_callback != NULL)
+ eq->event_callback(ev);
+}
+
+void
+lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
+{
+ lib_md_t *md;
+ int unlink;
unsigned long flags;
+ int rc;
+ ptl_hdr_t ack;
/* ni went down while processing this message */
- if (nal->ni.up == 0) {
- return -1;
- }
+ if (nal->ni.up == 0)
+ return;
if (msg == NULL)
- return 0;
+ return;
- rc = 0;
- if (msg->send_ack) {
- ptl_hdr_t ack;
+ /* Only send an ACK if the PUT completed successfully */
+ if (status == PTL_OK &&
+ !ptl_is_wire_handle_none(&msg->ack_wmd)) {
- LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd));
+ LASSERT(msg->ev.type == PTL_EVENT_PUT);
memset (&ack, 0, sizeof (ack));
ack.type = HTON__u32 (PTL_MSG_ACK);
- ack.dest_nid = HTON__u64 (msg->nid);
+ ack.dest_nid = HTON__u64 (msg->ev.initiator.nid);
ack.src_nid = HTON__u64 (nal->ni.nid);
- ack.dest_pid = HTON__u32 (msg->pid);
+ ack.dest_pid = HTON__u32 (msg->ev.initiator.pid);
ack.src_pid = HTON__u32 (nal->ni.pid);
ack.payload_length = 0;
ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
- msg->nid, msg->pid, NULL, 0, 0);
- /* If this send fails, there's nothing else to clean up */
+ msg->ev.initiator.nid, msg->ev.initiator.pid,
+ NULL, 0, 0);
+ if (rc != PTL_OK) {
+ /* send failed: there's nothing else to clean up. */
+ CERROR("Error %d sending ACK to "LPX64"\n",
+ rc, msg->ev.initiator.nid);
+ }
}
md = msg->md;
- LASSERT (md->pending > 0); /* I've not dropped my ref yet */
- eq = md->eq;
state_lock(nal, &flags);
- if (eq != NULL) {
- ptl_event_t *ev = &msg->ev;
- ptl_event_t *eq_slot;
-
- /* I have to hold the lock while I bump the sequence number
- * and copy the event into the queue. If not, and I was
- * interrupted after bumping the sequence number, other
- * events could fill the queue, including the slot I just
- * allocated to this event. On resuming, I would overwrite
- * a more 'recent' event with old event state, and
- * processes taking events off the queue would not detect
- * overflow correctly.
- */
-
- ev->sequence = eq->sequence++;/* Allocate the next queue slot */
-
- /* size must be a power of 2 to handle a wrapped sequence # */
- LASSERT (eq->size != 0 &&
- eq->size == LOWEST_BIT_SET (eq->size));
- eq_slot = eq->base + (ev->sequence & (eq->size - 1));
-
- /* Invalidate unlinked_me unless this is the last
- * event for an auto-unlinked MD. Note that if md was
- * auto-unlinked, md->pending can only decrease
- */
- if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */
- md->pending != 1) /* not last ref */
- ev->unlinked_me = PTL_HANDLE_NONE;
-
- /* Copy the event into the allocated slot, ensuring all the
- * rest of the event's contents have been copied _before_
- * the sequence number gets updated. A processes 'getting'
- * an event waits on the next queue slot's sequence to be
- * 'new'. When it is, _all_ other event fields had better
- * be consistent. I assert 'sequence' is the last member,
- * so I only need a 2 stage copy.
- */
- LASSERT(sizeof (ptl_event_t) ==
- offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
-
- rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
- offsetof (ptl_event_t, sequence));
- LASSERT (rc == 0);
-
-#ifdef __KERNEL__
- barrier();
-#endif
- /* Updating the sequence number is what makes the event 'new' */
-
- /* cb_write is not necessarily atomic, so this could
- cause a race with PtlEQGet */
- rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
- (void *)&ev->sequence,sizeof (ev->sequence));
- LASSERT (rc == 0);
+ /* Now it's safe to drop my caller's ref */
+ md->pending--;
+ LASSERT (md->pending >= 0);
-#ifdef __KERNEL__
- barrier();
-#endif
+ /* Should I unlink this MD? */
+ unlink = (md->pending == 0 && /* No other refs */
+ (md->threshold == 0 || /* All ops done */
+ md->md_flags & PTL_MD_FLAG_UNLINK) != 0); /* black spot */
- /* I must also ensure that (a) callbacks are made in the
- * same order as the events land in the queue, and (b) the
- * callback occurs before the event can be removed from the
- * queue, so I can't drop the lock during the callback. */
- if (nal->cb_callback != NULL)
- nal->cb_callback(nal, private, eq, ev);
- else if (eq->event_callback != NULL)
- (void)((eq->event_callback) (ev));
- }
+ msg->ev.status = status;
+ msg->ev.unlinked = unlink;
- LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 ||
- (md->md_flags & PTL_MD_FLAG_UNLINK) != 0);
+ if (md->eq != NULL)
+ lib_enq_event_locked(nal, private, md->eq, &msg->ev);
- md->pending--;
- if (md->pending == 0 && /* no more outstanding operations on this md */
- (md->threshold == 0 || /* done its business */
- (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */
+ if (unlink)
lib_md_unlink(nal, md);
list_del (&msg->msg_list);
lib_msg_free(nal, msg);
state_unlock(nal, &flags);
-
- return rc;
}