#include <portals/lib-p30.h>
void
-lib_enq_event_locked (nal_cb_t *nal, void *private,
+lib_enq_event_locked (lib_nal_t *nal, void *private,
lib_eq_t *eq, ptl_event_t *ev)
{
ptl_event_t *eq_slot;
- int rc;
- ev->sequence = eq->sequence++; /* Allocate the next queue slot */
-
- /* size must be a power of 2 to handle a wrapped sequence # */
- LASSERT (eq->size != 0 &&
- eq->size == LOWEST_BIT_SET (eq->size));
- eq_slot = eq->base + (ev->sequence & (eq->size - 1));
+ ev->sequence = eq->eq_enq_seq++; /* Allocate the next queue slot */
- /* Copy the event into the allocated slot, ensuring all the rest of
- * the event's contents have been copied _before_ the sequence
- * number gets updated. A processes 'getting' an event waits on
- * the next queue slot's sequence to be 'new'. When it is, _all_
- * other event fields had better be consistent. I assert
- * 'sequence' is the last member, so I only need a 2 stage copy. */
+ /* size must be a power of 2 to handle sequence # overflow */
+ LASSERT (eq->eq_size != 0 &&
+ eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
+ eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
- LASSERT(sizeof (ptl_event_t) ==
- offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
+ /* There is no race since both event consumers and event producers
+ * take the LIB_LOCK(), so we don't screw around with memory
+ * barriers, setting the sequence number last or wierd structure
+ * layout assertions. */
+ *eq_slot = *ev;
- rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
- offsetof (ptl_event_t, sequence));
- LASSERT (rc == PTL_OK);
+ /* Call the callback handler (if any) */
+ if (eq->eq_callback != NULL)
+ eq->eq_callback (eq_slot);
+ /* Wake anyone sleeping for an event (see lib-eq.c) */
#ifdef __KERNEL__
- barrier();
-#endif
- /* Updating the sequence number is what makes the event 'new' NB if
- * the cb_write below isn't atomic, this could cause a race with
- * PtlEQGet */
- rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
- (void *)&ev->sequence,sizeof (ev->sequence));
- LASSERT (rc == PTL_OK);
-
-#ifdef __KERNEL__
- barrier();
+ if (waitqueue_active(&nal->libnal_ni.ni_waitq))
+ wake_up_all(&nal->libnal_ni.ni_waitq);
+#else
+ pthread_cond_broadcast(&nal->libnal_ni.ni_cond);
#endif
-
- if (nal->cb_callback != NULL)
- nal->cb_callback(nal, private, eq, ev);
- else if (eq->event_callback != NULL)
- eq->event_callback(ev);
}
void
-lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
+lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
{
lib_md_t *md;
int unlink;
memset (&ack, 0, sizeof (ack));
ack.type = HTON__u32 (PTL_MSG_ACK);
ack.dest_nid = HTON__u64 (msg->ev.initiator.nid);
- ack.src_nid = HTON__u64 (nal->ni.nid);
ack.dest_pid = HTON__u32 (msg->ev.initiator.pid);
- ack.src_pid = HTON__u32 (nal->ni.pid);
+ ack.src_nid = HTON__u64 (nal->libnal_ni.ni_pid.nid);
+ ack.src_pid = HTON__u32 (nal->libnal_ni.ni_pid.pid);
ack.payload_length = 0;
ack.msg.ack.dst_wmd = msg->ack_wmd;
md = msg->md;
- state_lock(nal, &flags);
+ LIB_LOCK(nal, flags);
/* Now it's safe to drop my caller's ref */
md->pending--;
lib_md_unlink(nal, md);
list_del (&msg->msg_list);
- nal->ni.counters.msgs_alloc--;
+ nal->libnal_ni.ni_counters.msgs_alloc--;
lib_msg_free(nal, msg);
- state_unlock(nal, &flags);
+ LIB_UNLOCK(nal, flags);
}