Whamcloud - gitweb
- fixed typo in a comment.
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index 38904c4..c46ad1a 100644 (file)
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
-#define DEBUG_SUBSYSTEM S_PORTALS
+#define DEBUG_SUBSYSTEM S_LNET
 
-#ifndef __KERNEL__
-# include <stdio.h>
-#else
-# include <libcfs/kp30.h>
-#endif
-
-#include <portals/lib-p30.h>
+#include <lnet/lib-lnet.h>
 
 void
-lib_enq_event_locked (lib_nal_t *nal, void *private,
-                      lib_eq_t *eq, ptl_event_t *ev)
+lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
 {
-        ptl_event_t  *eq_slot;
+        lnet_event_t  *eq_slot;
 
         /* Allocate the next queue slot */
-        ev->link = ev->sequence = eq->eq_enq_seq++;
-        /* NB we don't support START events yet and we don't create a separate
-         * UNLINK event unless an explicit unlink succeeds, so the link
-         * sequence is pretty useless */
-
-        /* We don't support different uid/jids yet */
-        ev->uid = 0;
-        ev->jid = 0;
+        ev->sequence = eq->eq_enq_seq++;
 
         /* size must be a power of 2 to handle sequence # overflow */
         LASSERT (eq->eq_size != 0 &&
@@ -54,7 +40,7 @@ lib_enq_event_locked (lib_nal_t *nal, void *private,
         eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
 
         /* There is no race since both event consumers and event producers
-         * take the LIB_LOCK(), so we don't screw around with memory
+         * take the LNET_LOCK, so we don't screw around with memory
          * barriers, setting the sequence number last or wierd structure
          * layout assertions. */
         *eq_slot = *ev;
@@ -63,85 +49,178 @@ lib_enq_event_locked (lib_nal_t *nal, void *private,
         if (eq->eq_callback != NULL)
                 eq->eq_callback (eq_slot);
 
-        /* Wake anyone sleeping for an event (see lib-eq.c) */
 #ifdef __KERNEL__
-        if (cfs_waitq_active(&nal->libnal_ni.ni_waitq))
-                cfs_waitq_broadcast(&nal->libnal_ni.ni_waitq);
+        /* Wake anyone waiting in LNetEQPoll() */
+        if (cfs_waitq_active(&the_lnet.ln_waitq))
+                cfs_waitq_broadcast(&the_lnet.ln_waitq);
 #else
-        pthread_cond_broadcast(&nal->libnal_ni.ni_cond);
+# ifndef HAVE_LIBPTHREAD
+        /* LNetEQPoll() calls into _the_ LND to wait for action */
+# else
+        /* Wake anyone waiting in LNetEQPoll() */
+        pthread_cond_broadcast(&the_lnet.ln_cond);
+# endif
 #endif
 }
 
 void
-lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
+lnet_complete_msg_locked(lnet_msg_t *msg)
 {
-        lib_md_t     *md;
-        int           unlink;
-        unsigned long flags;
-        int           rc;
-        ptl_hdr_t     ack;
+        lnet_handle_wire_t ack_wmd;
+        int                rc;
+        int                status = msg->msg_ev.status;
+
+        LASSERT (msg->msg_onactivelist);
+
+        if (status == 0 && msg->msg_ack) {
+                /* Only send an ACK if the PUT completed successfully */
+
+                lnet_return_credits_locked(msg);
+
+                msg->msg_ack = 0;
+                LNET_UNLOCK();
+        
+                LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
+                LASSERT(!msg->msg_routing);
+
+                ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
+                
+                lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
+
+                msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
+                msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
+                msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
+                
+                rc = lnet_send(msg->msg_ev.target.nid, msg);
+
+                LNET_LOCK();
+
+                if (rc == 0)
+                        return;
+        } else if (status == 0 &&               /* OK so far */
+                   (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
+                
+                LASSERT (!msg->msg_receiving);  /* called back recv already */
+        
+                LNET_UNLOCK();
+                
+                rc = lnet_send(LNET_NID_ANY, msg);
+
+                LNET_LOCK();
+
+                if (rc == 0)
+                        return;
+        }
+
+        lnet_return_credits_locked(msg);
+
+        LASSERT (msg->msg_onactivelist);
+        msg->msg_onactivelist = 0;
+        list_del (&msg->msg_activelist);
+        the_lnet.ln_counters.msgs_alloc--;
+        lnet_msg_free(msg);
+}
+
+
+void
+lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
+{
+#ifdef __KERNEL__
+        int                i;
+        int                my_slot;
+#endif
+        lnet_libmd_t      *md;
+
+        LASSERT (!in_interrupt ());
 
         if (msg == NULL)
                 return;
+#if 0
+        CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
+               lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
+               msg->msg_target_is_router ? "t" : "",
+               msg->msg_routing ? "X" : "",
+               msg->msg_ack ? "A" : "",
+               msg->msg_sending ? "S" : "",
+               msg->msg_receiving ? "R" : "",
+               msg->msg_delayed ? "d" : "",
+               msg->msg_txcredit ? "C" : "",
+               msg->msg_peertxcredit ? "c" : "",
+               msg->msg_rtrcredit ? "F" : "",
+               msg->msg_peerrtrcredit ? "f" : "",
+               msg->msg_onactivelist ? "!" : "",
+               msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
+               msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
+#endif
+        LNET_LOCK();
+
+        LASSERT (msg->msg_onactivelist);
+
+        msg->msg_ev.status = status;
+
+        md = msg->msg_md;
+        if (md != NULL) {
+                int      unlink;
 
-        /* Only send an ACK if the PUT completed successfully */
-        if (status == PTL_OK &&
-            !ptl_is_wire_handle_none(&msg->ack_wmd)) {
-
-                LASSERT(msg->ev.type == PTL_EVENT_PUT_END);
-
-                memset (&ack, 0, sizeof (ack));
-                ack.type     = cpu_to_le32(PTL_MSG_ACK);
-                ack.dest_nid = cpu_to_le64(msg->ev.initiator.nid);
-                ack.dest_pid = cpu_to_le32(msg->ev.initiator.pid);
-                ack.src_nid  = cpu_to_le64(nal->libnal_ni.ni_pid.nid);
-                ack.src_pid  = cpu_to_le32(nal->libnal_ni.ni_pid.pid);
-                ack.payload_length = 0;
-
-                ack.msg.ack.dst_wmd = msg->ack_wmd;
-                ack.msg.ack.match_bits = msg->ev.match_bits;
-                ack.msg.ack.mlength = cpu_to_le32(msg->ev.mlength);
-
-                rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
-                               msg->ev.initiator.nid, msg->ev.initiator.pid,
-                               NULL, 0, 0);
-                if (rc != PTL_OK) {
-                        /* send failed: there's nothing else to clean up. */
-                        CERROR("Error %d sending ACK to "LPX64"\n",
-                               rc, msg->ev.initiator.nid);
-                }
+                /* Now it's safe to drop my caller's ref */
+                md->md_refcount--;
+                LASSERT (md->md_refcount >= 0);
+
+                unlink = lnet_md_unlinkable(md);
+                
+                msg->msg_ev.unlinked = unlink;
+                
+                if (md->md_eq != NULL)
+                        lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
+                
+                if (unlink)
+                        lnet_md_unlink(md);
+
+                msg->msg_md = NULL;
         }
 
-        md = msg->md;
+        list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
 
-        LIB_LOCK(nal, flags);
+        /* Recursion breaker.  Don't complete the message here if I am (or
+         * enough other threads are) already completing messages */
 
-        /* Now it's safe to drop my caller's ref */
-        md->pending--;
-        LASSERT (md->pending >= 0);
+#ifdef __KERNEL__
+        my_slot = -1;
+        for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
+                if (the_lnet.ln_finalizers[i] == cfs_current())
+                        goto out;
+                if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
+                        my_slot = i;
+        }
+        if (my_slot < 0)
+                goto out;
 
-        /* Should I unlink this MD? */
-        if (md->pending != 0)                   /* other refs */
-                unlink = 0;
-        else if ((md->md_flags & PTL_MD_FLAG_ZOMBIE) != 0)
-                unlink = 1;
-        else if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) == 0)
-                unlink = 0;
-        else
-                unlink = lib_md_exhausted(md);
+        the_lnet.ln_finalizers[my_slot] = cfs_current();
+#else
+        if (the_lnet.ln_finalizing)
+                goto out;
 
-        msg->ev.ni_fail_type = status;
-        msg->ev.unlinked = unlink;
+        the_lnet.ln_finalizing = 1;
+#endif
 
-        if (md->eq != NULL)
-                lib_enq_event_locked(nal, private, md->eq, &msg->ev);
+        while (!list_empty(&the_lnet.ln_finalizeq)) {
+                msg = list_entry(the_lnet.ln_finalizeq.next,
+                                 lnet_msg_t, msg_list);
+                
+                list_del(&msg->msg_list);
 
-        if (unlink)
-                lib_md_unlink(nal, md);
+                /* NB drops and regains the lnet lock if it actually does
+                 * anything, so my finalizing friends can chomp along too */
+                lnet_complete_msg_locked(msg);
+        }
 
-        list_del (&msg->msg_list);
-        nal->libnal_ni.ni_counters.msgs_alloc--;
-        lib_msg_free(nal, msg);
+#ifdef __KERNEL__
+        the_lnet.ln_finalizers[my_slot] = NULL;
+#else
+        the_lnet.ln_finalizing = 0;
+#endif
 
-        LIB_UNLOCK(nal, flags);
+ out:
+        LNET_UNLOCK();
 }
+