Whamcloud - gitweb
- merge 2 weeks of b1_4 fixes onto HEAD
[fs/lustre-release.git] / lustre / portals / portals / lib-msg.c
index 1b69533..328b8d8 100644 (file)
 #include <portals/lib-p30.h>
 
 void
-lib_enq_event_locked (nal_cb_t *nal, void *private, 
+lib_enq_event_locked (lib_nal_t *nal, void *private, 
                       lib_eq_t *eq, ptl_event_t *ev)
 {
         ptl_event_t  *eq_slot;
-        int           rc;
         
-        ev->sequence = eq->sequence++; /* Allocate the next queue slot */
-
-        /* size must be a power of 2 to handle a wrapped sequence # */
-        LASSERT (eq->size != 0 &&
-                 eq->size == LOWEST_BIT_SET (eq->size));
-        eq_slot = eq->base + (ev->sequence & (eq->size - 1));
+        ev->sequence = eq->eq_enq_seq++; /* Allocate the next queue slot */
 
-        /* Copy the event into the allocated slot, ensuring all the rest of
-         * the event's contents have been copied _before_ the sequence
-         * number gets updated.  A processes 'getting' an event waits on
-         * the next queue slot's sequence to be 'new'.  When it is, _all_
-         * other event fields had better be consistent.  I assert
-         * 'sequence' is the last member, so I only need a 2 stage copy. */
+        /* size must be a power of 2 to handle sequence # overflow */
+        LASSERT (eq->eq_size != 0 &&
+                 eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
+        eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
 
-        LASSERT(sizeof (ptl_event_t) ==
-                offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
+        /* There is no race since both event consumers and event producers
+         * take the LIB_LOCK(), so we don't screw around with memory
+         * barriers, setting the sequence number last or wierd structure
+         * layout assertions. */
+        *eq_slot = *ev;
 
-        rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
-                            offsetof (ptl_event_t, sequence));
-        LASSERT (rc == PTL_OK);
+        /* Call the callback handler (if any) */
+        if (eq->eq_callback != NULL)
+                eq->eq_callback (eq_slot);
 
+        /* Wake anyone sleeping for an event (see lib-eq.c) */
 #ifdef __KERNEL__
-        barrier();
-#endif
-        /* Updating the sequence number is what makes the event 'new' NB if
-         * the cb_write below isn't atomic, this could cause a race with
-         * PtlEQGet */
-        rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
-                           (void *)&ev->sequence,sizeof (ev->sequence));
-        LASSERT (rc == PTL_OK);
-
-#ifdef __KERNEL__
-        barrier();
+        if (waitqueue_active(&nal->libnal_ni.ni_waitq))
+                wake_up_all(&nal->libnal_ni.ni_waitq);
+#else
+        pthread_cond_broadcast(&nal->libnal_ni.ni_cond);
 #endif
-
-        if (nal->cb_callback != NULL)
-                nal->cb_callback(nal, private, eq, ev);
-        else if (eq->event_callback != NULL)
-                eq->event_callback(ev);
 }
 
 void 
-lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
+lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
 {
         lib_md_t     *md;
         int           unlink;
@@ -101,9 +85,9 @@ lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
                 memset (&ack, 0, sizeof (ack));
                 ack.type     = HTON__u32 (PTL_MSG_ACK);
                 ack.dest_nid = HTON__u64 (msg->ev.initiator.nid);
-                ack.src_nid  = HTON__u64 (nal->ni.nid);
                 ack.dest_pid = HTON__u32 (msg->ev.initiator.pid);
-                ack.src_pid  = HTON__u32 (nal->ni.pid);
+                ack.src_nid  = HTON__u64 (nal->libnal_ni.ni_pid.nid);
+                ack.src_pid  = HTON__u32 (nal->libnal_ni.ni_pid.pid);
                 ack.payload_length = 0;
 
                 ack.msg.ack.dst_wmd = msg->ack_wmd;
@@ -122,7 +106,7 @@ lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
 
         md = msg->md;
 
-        state_lock(nal, &flags);
+        LIB_LOCK(nal, flags);
 
         /* Now it's safe to drop my caller's ref */
         md->pending--;
@@ -148,8 +132,8 @@ lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
                 lib_md_unlink(nal, md);
 
         list_del (&msg->msg_list);
-        nal->ni.counters.msgs_alloc--;
+        nal->libnal_ni.ni_counters.msgs_alloc--;
         lib_msg_free(nal, msg);
 
-        state_unlock(nal, &flags);
+        LIB_UNLOCK(nal, flags);
 }