Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index f10892c..04c69b1 100644 (file)
 
 #include <portals/lib-p30.h>
 
-int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
+void
+lib_enq_event_locked (nal_cb_t *nal, void *private, 
+                      lib_eq_t *eq, ptl_event_t *ev)
 {
-        lib_md_t     *md;
-        lib_eq_t     *eq;
+        ptl_event_t  *eq_slot;
         int           rc;
+        
+        ev->sequence = eq->sequence++; /* Allocate the next queue slot */
+
+        /* size must be a power of 2 to handle a wrapped sequence # */
+        LASSERT (eq->size != 0 &&
+                 eq->size == LOWEST_BIT_SET (eq->size));
+        eq_slot = eq->base + (ev->sequence & (eq->size - 1));
+
+        /* Copy the event into the allocated slot, ensuring all the rest of
+         * the event's contents have been copied _before_ the sequence
+         * number gets updated.  A processes 'getting' an event waits on
+         * the next queue slot's sequence to be 'new'.  When it is, _all_
+         * other event fields had better be consistent.  I assert
+         * 'sequence' is the last member, so I only need a 2 stage copy. */
+
+        LASSERT(sizeof (ptl_event_t) ==
+                offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
+
+        rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
+                            offsetof (ptl_event_t, sequence));
+        LASSERT (rc == PTL_OK);
+
+#ifdef __KERNEL__
+        barrier();
+#endif
+        /* Updating the sequence number is what makes the event 'new' NB if
+         * the cb_write below isn't atomic, this could cause a race with
+         * PtlEQGet */
+        rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
+                           (void *)&ev->sequence,sizeof (ev->sequence));
+        LASSERT (rc == PTL_OK);
+
+#ifdef __KERNEL__
+        barrier();
+#endif
+
+        if (nal->cb_callback != NULL)
+                nal->cb_callback(nal, private, eq, ev);
+        else if (eq->event_callback != NULL)
+                eq->event_callback(ev);
+}
+
+void 
+lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
+{
+        lib_md_t     *md;
+        int           unlink;
         unsigned long flags;
+        int           rc;
+        ptl_hdr_t     ack;
 
         /* ni went down while processing this message */
-        if (nal->ni.up == 0) {
-                return -1;
-        }
+        if (nal->ni.up == 0)
+                return;
 
         if (msg == NULL)
-                return 0;
+                return;
 
-        rc = 0;
-        if (msg->send_ack) {
-                ptl_hdr_t ack;
+        /* Only send an ACK if the PUT completed successfully */
+        if (status == PTL_OK &&
+            !ptl_is_wire_handle_none(&msg->ack_wmd)) {
 
-                LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd));
+                LASSERT(msg->ev.type == PTL_EVENT_PUT);
 
                 memset (&ack, 0, sizeof (ack));
                 ack.type     = HTON__u32 (PTL_MSG_ACK);
-                ack.dest_nid = HTON__u64 (msg->nid);
+                ack.dest_nid = HTON__u64 (msg->ev.initiator.nid);
                 ack.src_nid  = HTON__u64 (nal->ni.nid);
-                ack.dest_pid = HTON__u32 (msg->pid);
+                ack.dest_pid = HTON__u32 (msg->ev.initiator.pid);
                 ack.src_pid  = HTON__u32 (nal->ni.pid);
-                PTL_HDR_LENGTH(&ack) = 0;
+                ack.payload_length = 0;
 
                 ack.msg.ack.dst_wmd = msg->ack_wmd;
                 ack.msg.ack.match_bits = msg->ev.match_bits;
                 ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
 
                 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
-                               msg->nid, msg->pid, NULL, 0, 0);
+                               msg->ev.initiator.nid, msg->ev.initiator.pid, 
+                               NULL, 0, 0);
+                if (rc != PTL_OK) {
+                        /* send failed: there's nothing else to clean up. */
+                        CERROR("Error %d sending ACK to "LPX64"\n", 
+                               rc, msg->ev.initiator.nid);
+                }
         }
 
         md = msg->md;
-        LASSERT (md->pending > 0);  /* I've not dropped my ref yet */
-        eq = md->eq;
 
         state_lock(nal, &flags);
 
-        if (eq != NULL) {
-                ptl_event_t  *ev = &msg->ev;
-                ptl_event_t  *eq_slot;
-
-                /* I have to hold the lock while I bump the sequence number
-                 * and copy the event into the queue.  If not, and I was
-                 * interrupted after bumping the sequence number, other
-                 * events could fill the queue, including the slot I just
-                 * allocated to this event.  On resuming, I would overwrite
-                 * a more 'recent' event with old event state, and
-                 * processes taking events off the queue would not detect
-                 * overflow correctly.
-                 */
-
-                ev->sequence = eq->sequence++;/* Allocate the next queue slot */
-
-                /* size must be a power of 2 to handle a wrapped sequence # */
-                LASSERT (eq->size != 0 &&
-                         eq->size == LOWEST_BIT_SET (eq->size));
-                eq_slot = eq->base + (ev->sequence & (eq->size - 1));
-
-                /* Invalidate unlinked_me unless this is the last
-                 * event for an auto-unlinked MD.  Note that if md was
-                 * auto-unlinked, md->pending can only decrease
-                 */
-                if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */
-                    md->pending != 1)                       /* not last ref */
-                        ev->unlinked_me = PTL_HANDLE_NONE;
-
-                /* Copy the event into the allocated slot, ensuring all the
-                 * rest of the event's contents have been copied _before_
-                 * the sequence number gets updated.  A processes 'getting'
-                 * an event waits on the next queue slot's sequence to be
-                 * 'new'.  When it is, _all_ other event fields had better
-                 * be consistent.  I assert 'sequence' is the last member,
-                 * so I only need a 2 stage copy.
-                 */
-                LASSERT(sizeof (ptl_event_t) ==
-                        offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
-
-                rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
-                                    offsetof (ptl_event_t, sequence));
-                LASSERT (rc == 0);
-
-#ifdef __KERNEL__
-                barrier();
-#endif
-                /* Updating the sequence number is what makes the event 'new' */
-
-                /* cb_write is not necessarily atomic, so this could
-                   cause a race with PtlEQGet */
-                rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
-                                   (void *)&ev->sequence,sizeof (ev->sequence));
-                LASSERT (rc == 0);
+        /* Now it's safe to drop my caller's ref */
+        md->pending--;
+        LASSERT (md->pending >= 0);
 
-#ifdef __KERNEL__
-                barrier();
-#endif
+        /* Should I unlink this MD? */
+        unlink = (md->pending == 0 &&           /* No other refs */
+                  (md->threshold == 0 ||        /* All ops done */
+                   md->md_flags & PTL_MD_FLAG_UNLINK) != 0); /* black spot */
 
-                /* I must also ensure that (a) callbacks are made in the
-                 * same order as the events land in the queue, and (b) the
-                 * callback occurs before the event can be removed from the
-                 * queue, so I can't drop the lock during the callback. */
-                if (nal->cb_callback != NULL)
-                        nal->cb_callback(nal, private, eq, ev);
-                else  if (eq->event_callback != NULL)
-                        (void)((eq->event_callback) (ev));
-        }
+        msg->ev.status = status;
+        msg->ev.unlinked = unlink;
 
-        LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 ||
-                 (md->md_flags & PTL_MD_FLAG_UNLINK) != 0);
+        if (md->eq != NULL)
+                lib_enq_event_locked(nal, private, md->eq, &msg->ev);
 
-        md->pending--;
-        if (md->pending == 0 && /* no more outstanding operations on this md */
-            (md->threshold == 0 ||              /* done its business */
-             (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */
+        if (unlink)
                 lib_md_unlink(nal, md);
 
         list_del (&msg->msg_list);
@@ -158,6 +151,4 @@ int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
         lib_msg_free(nal, msg);
 
         state_unlock(nal, &flags);
-
-        return rc;
 }