1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Message decoding, parsing and finalizing routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
9 * This file is part of Lustre, http://www.lustre.org
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #define DEBUG_SUBSYSTEM S_LNET
27 #include <lnet/lib-lnet.h>
30 lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
32 lnet_event_t *eq_slot;
34 /* Allocate the next queue slot */
35 ev->sequence = eq->eq_enq_seq++;
37 /* size must be a power of 2 to handle sequence # overflow */
38 LASSERT (eq->eq_size != 0 &&
39 eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
40 eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
42 /* There is no race since both event consumers and event producers
43 * take the LNET_LOCK, so we don't screw around with memory
44 * barriers, setting the sequence number last or wierd structure
45 * layout assertions. */
48 /* Call the callback handler (if any) */
49 if (eq->eq_callback != NULL)
50 eq->eq_callback (eq_slot);
53 /* Wake anyone waiting in LNetEQPoll() */
54 if (cfs_waitq_active(&the_lnet.ln_waitq))
55 cfs_waitq_broadcast(&the_lnet.ln_waitq);
57 # ifndef HAVE_LIBPTHREAD
58 /* LNetEQPoll() calls into _the_ LND to wait for action */
60 /* Wake anyone waiting in LNetEQPoll() */
61 pthread_cond_broadcast(&the_lnet.ln_cond);
67 lnet_complete_msg_locked(lnet_msg_t *msg)
69 lnet_handle_wire_t ack_wmd;
71 int status = msg->msg_ev.status;
73 LASSERT (msg->msg_onactivelist);
75 if (status == 0 && msg->msg_ack) {
76 /* Only send an ACK if the PUT completed successfully */
78 lnet_return_credits_locked(msg);
83 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
84 LASSERT(!msg->msg_routing);
86 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
88 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
90 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
91 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
92 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
94 rc = lnet_send(msg->msg_ev.target.nid, msg);
100 } else if (status == 0 && /* OK so far */
101 (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
103 LASSERT (!msg->msg_receiving); /* called back recv already */
107 rc = lnet_send(LNET_NID_ANY, msg);
115 lnet_return_credits_locked(msg);
117 LASSERT (msg->msg_onactivelist);
118 msg->msg_onactivelist = 0;
119 list_del (&msg->msg_activelist);
120 the_lnet.ln_counters.msgs_alloc--;
126 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
134 LASSERT (!in_interrupt ());
139 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
140 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
141 msg->msg_target_is_router ? "t" : "",
142 msg->msg_routing ? "X" : "",
143 msg->msg_ack ? "A" : "",
144 msg->msg_sending ? "S" : "",
145 msg->msg_receiving ? "R" : "",
146 msg->msg_delayed ? "d" : "",
147 msg->msg_txcredit ? "C" : "",
148 msg->msg_peertxcredit ? "c" : "",
149 msg->msg_rtrcredit ? "F" : "",
150 msg->msg_peerrtrcredit ? "f" : "",
151 msg->msg_onactivelist ? "!" : "",
152 msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
153 msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
157 LASSERT (msg->msg_onactivelist);
159 msg->msg_ev.status = status;
165 /* Now it's safe to drop my caller's ref */
167 LASSERT (md->md_refcount >= 0);
169 unlink = lnet_md_unlinkable(md);
171 msg->msg_ev.unlinked = unlink;
173 if (md->md_eq != NULL)
174 lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
182 list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
184 /* Recursion breaker. Don't complete the message here if I am (or
185 * enough other threads are) already completing messages */
189 for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
190 if (the_lnet.ln_finalizers[i] == cfs_current())
192 if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
198 the_lnet.ln_finalizers[my_slot] = cfs_current();
200 if (the_lnet.ln_finalizing)
203 the_lnet.ln_finalizing = 1;
206 while (!list_empty(&the_lnet.ln_finalizeq)) {
207 msg = list_entry(the_lnet.ln_finalizeq.next,
208 lnet_msg_t, msg_list);
210 list_del(&msg->msg_list);
212 /* NB drops and regains the lnet lock if it actually does
213 * anything, so my finalizing friends can chomp along too */
214 lnet_complete_msg_locked(msg);
218 the_lnet.ln_finalizers[my_slot] = NULL;
220 the_lnet.ln_finalizing = 0;