* Message decoding, parsing and finalizing routines
*
* Copyright (c) 2001-2003 Cluster File Systems, Inc.
- * Copyright (c) 2001-2002 Sandia National Laboratories
*
- * This file is part of Lustre, http://www.sf.net/projects/lustre/
+ * This file is part of Lustre, http://www.lustre.org
*
* Lustre is free software; you can redistribute it and/or
* modify it under the terms of version 2 of the GNU General Public
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-#ifndef __KERNEL__
-# include <stdio.h>
-#else
-# define DEBUG_SUBSYSTEM S_PORTALS
-# include <linux/kp30.h>
-#endif
+#define DEBUG_SUBSYSTEM S_LNET
-#include <portals/lib-p30.h>
+#include <lnet/lib-lnet.h>
void
-lib_enq_event_locked (nal_cb_t *nal, void *private,
- lib_eq_t *eq, ptl_event_t *ev)
+lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
{
- ptl_event_t *eq_slot;
- int rc;
-
- ev->sequence = eq->sequence++; /* Allocate the next queue slot */
+ memset(ev, 0, sizeof(*ev));
- /* size must be a power of 2 to handle a wrapped sequence # */
- LASSERT (eq->size != 0 &&
- eq->size == LOWEST_BIT_SET (eq->size));
- eq_slot = eq->base + (ev->sequence & (eq->size - 1));
+ ev->status = 0;
+ ev->unlinked = 1;
+ ev->type = LNET_EVENT_UNLINK;
+ lnet_md_deconstruct(md, &ev->md);
+ lnet_md2handle(&ev->md_handle, md);
+}
- /* Copy the event into the allocated slot, ensuring all the rest of
- * the event's contents have been copied _before_ the sequence
- * number gets updated. A processes 'getting' an event waits on
- * the next queue slot's sequence to be 'new'. When it is, _all_
- * other event fields had better be consistent. I assert
- * 'sequence' is the last member, so I only need a 2 stage copy. */
+void
+lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
+{
+ lnet_event_t *eq_slot;
- LASSERT(sizeof (ptl_event_t) ==
- offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
+ /* Allocate the next queue slot */
+ ev->sequence = eq->eq_enq_seq++;
- rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
- offsetof (ptl_event_t, sequence));
- LASSERT (rc == PTL_OK);
+ /* size must be a power of 2 to handle sequence # overflow */
+ LASSERT (eq->eq_size != 0 &&
+ eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
+ eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
-#ifdef __KERNEL__
- barrier();
-#endif
- /* Updating the sequence number is what makes the event 'new' NB if
- * the cb_write below isn't atomic, this could cause a race with
- * PtlEQGet */
- rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
- (void *)&ev->sequence,sizeof (ev->sequence));
- LASSERT (rc == PTL_OK);
+ /* There is no race since both event consumers and event producers
+ * take the LNET_LOCK, so we don't screw around with memory
+ * barriers, setting the sequence number last or wierd structure
+ * layout assertions. */
+ *eq_slot = *ev;
+
+ /* Call the callback handler (if any) */
+ if (eq->eq_callback != NULL)
+ eq->eq_callback (eq_slot);
#ifdef __KERNEL__
- barrier();
+ /* Wake anyone waiting in LNetEQPoll() */
+ if (cfs_waitq_active(&the_lnet.ln_waitq))
+ cfs_waitq_broadcast(&the_lnet.ln_waitq);
+#else
+# ifndef HAVE_LIBPTHREAD
+ /* LNetEQPoll() calls into _the_ LND to wait for action */
+# else
+ /* Wake anyone waiting in LNetEQPoll() */
+ pthread_cond_broadcast(&the_lnet.ln_cond);
+# endif
#endif
+}
+
+void
+lnet_complete_msg_locked(lnet_msg_t *msg)
+{
+ lnet_handle_wire_t ack_wmd;
+ int rc;
+ int status = msg->msg_ev.status;
+
+ LASSERT (msg->msg_onactivelist);
+
+ if (status == 0 && msg->msg_ack) {
+ /* Only send an ACK if the PUT completed successfully */
+
+ lnet_return_credits_locked(msg);
+
+ msg->msg_ack = 0;
+ LNET_UNLOCK();
+
+ LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
+ LASSERT(!msg->msg_routing);
+
+ ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
+
+ lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
+
+ msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
+ msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
+ msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
+
+ rc = lnet_send(msg->msg_ev.target.nid, msg);
+
+ LNET_LOCK();
+
+ if (rc == 0)
+ return;
+ } else if (status == 0 && /* OK so far */
+ (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
+
+ LASSERT (!msg->msg_receiving); /* called back recv already */
+
+ LNET_UNLOCK();
+
+ rc = lnet_send(LNET_NID_ANY, msg);
+
+ LNET_LOCK();
- if (nal->cb_callback != NULL)
- nal->cb_callback(nal, private, eq, ev);
- else if (eq->event_callback != NULL)
- eq->event_callback(ev);
+ if (rc == 0)
+ return;
+ }
+
+ lnet_return_credits_locked(msg);
+
+ LASSERT (msg->msg_onactivelist);
+ msg->msg_onactivelist = 0;
+ list_del (&msg->msg_activelist);
+ the_lnet.ln_counters.msgs_alloc--;
+ lnet_msg_free(msg);
}
-void
-lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status)
+
+void
+lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
{
- lib_md_t *md;
- int unlink;
- unsigned long flags;
- int rc;
- ptl_hdr_t ack;
-
- /* ni went down while processing this message */
- if (nal->ni.up == 0)
- return;
+#ifdef __KERNEL__
+ int i;
+ int my_slot;
+#endif
+ lnet_libmd_t *md;
+
+ LASSERT (!in_interrupt ());
if (msg == NULL)
return;
+#if 0
+ CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
+ lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
+ msg->msg_target_is_router ? "t" : "",
+ msg->msg_routing ? "X" : "",
+ msg->msg_ack ? "A" : "",
+ msg->msg_sending ? "S" : "",
+ msg->msg_receiving ? "R" : "",
+ msg->msg_delayed ? "d" : "",
+ msg->msg_txcredit ? "C" : "",
+ msg->msg_peertxcredit ? "c" : "",
+ msg->msg_rtrcredit ? "F" : "",
+ msg->msg_peerrtrcredit ? "f" : "",
+ msg->msg_onactivelist ? "!" : "",
+ msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
+ msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
+#endif
+ LNET_LOCK();
+
+ LASSERT (msg->msg_onactivelist);
+
+ msg->msg_ev.status = status;
+
+ md = msg->msg_md;
+ if (md != NULL) {
+ int unlink;
+
+ /* Now it's safe to drop my caller's ref */
+ md->md_refcount--;
+ LASSERT (md->md_refcount >= 0);
- /* Only send an ACK if the PUT completed successfully */
- if (status == PTL_OK &&
- !ptl_is_wire_handle_none(&msg->ack_wmd)) {
-
- LASSERT(msg->ev.type == PTL_EVENT_PUT);
-
- memset (&ack, 0, sizeof (ack));
- ack.type = HTON__u32 (PTL_MSG_ACK);
- ack.dest_nid = HTON__u64 (msg->ev.initiator.nid);
- ack.src_nid = HTON__u64 (nal->ni.nid);
- ack.dest_pid = HTON__u32 (msg->ev.initiator.pid);
- ack.src_pid = HTON__u32 (nal->ni.pid);
- ack.payload_length = 0;
-
- ack.msg.ack.dst_wmd = msg->ack_wmd;
- ack.msg.ack.match_bits = msg->ev.match_bits;
- ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
-
- rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
- msg->ev.initiator.nid, msg->ev.initiator.pid,
- NULL, 0, 0);
- if (rc != PTL_OK) {
- /* send failed: there's nothing else to clean up. */
- CERROR("Error %d sending ACK to "LPX64"\n",
- rc, msg->ev.initiator.nid);
- }
+ unlink = lnet_md_unlinkable(md);
+
+ msg->msg_ev.unlinked = unlink;
+
+ if (md->md_eq != NULL)
+ lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
+
+ if (unlink)
+ lnet_md_unlink(md);
+
+ msg->msg_md = NULL;
}
- md = msg->md;
+ list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
- state_lock(nal, &flags);
+ /* Recursion breaker. Don't complete the message here if I am (or
+ * enough other threads are) already completing messages */
- /* Now it's safe to drop my caller's ref */
- md->pending--;
- LASSERT (md->pending >= 0);
+#ifdef __KERNEL__
+ my_slot = -1;
+ for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
+ if (the_lnet.ln_finalizers[i] == cfs_current())
+ goto out;
+ if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
+ my_slot = i;
+ }
+ if (my_slot < 0)
+ goto out;
- /* Should I unlink this MD? */
- unlink = (md->pending == 0 && /* No other refs */
- (md->threshold == 0 || /* All ops done */
- md->md_flags & PTL_MD_FLAG_UNLINK) != 0); /* black spot */
+ the_lnet.ln_finalizers[my_slot] = cfs_current();
+#else
+ if (the_lnet.ln_finalizing)
+ goto out;
- msg->ev.status = status;
- msg->ev.unlinked = unlink;
+ the_lnet.ln_finalizing = 1;
+#endif
- if (md->eq != NULL)
- lib_enq_event_locked(nal, private, md->eq, &msg->ev);
+ while (!list_empty(&the_lnet.ln_finalizeq)) {
+ msg = list_entry(the_lnet.ln_finalizeq.next,
+ lnet_msg_t, msg_list);
+
+ list_del(&msg->msg_list);
- if (unlink)
- lib_md_unlink(nal, md);
+ /* NB drops and regains the lnet lock if it actually does
+ * anything, so my finalizing friends can chomp along too */
+ lnet_complete_msg_locked(msg);
+ }
- list_del (&msg->msg_list);
- nal->ni.counters.msgs_alloc--;
- lib_msg_free(nal, msg);
+#ifdef __KERNEL__
+ the_lnet.ln_finalizers[my_slot] = NULL;
+#else
+ the_lnet.ln_finalizing = 0;
+#endif
- state_unlock(nal, &flags);
+ out:
+ LNET_UNLOCK();
}
+