4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Message decoding, parsing and finalizing routines
39 #define DEBUG_SUBSYSTEM S_LNET
41 #include <lnet/lib-lnet.h>
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
48 memset(ev, 0, sizeof(*ev));
52 ev->type = LNET_EVENT_UNLINK;
53 lnet_md_deconstruct(md, &ev->md);
54 lnet_md2handle(&ev->md_handle, md);
59 lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
61 lnet_event_t *eq_slot;
63 /* Allocate the next queue slot */
64 ev->sequence = eq->eq_enq_seq++;
66 /* size must be a power of 2 to handle sequence # overflow */
67 LASSERT (eq->eq_size != 0 &&
68 eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
69 eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
71 /* There is no race since both event consumers and event producers
72 * take the LNET_LOCK, so we don't screw around with memory
73 * barriers, setting the sequence number last or weird structure
74 * layout assertions. */
77 /* Call the callback handler (if any) */
78 if (eq->eq_callback != NULL)
79 eq->eq_callback (eq_slot);
82 /* Wake anyone waiting in LNetEQPoll() */
83 if (cfs_waitq_active(&the_lnet.ln_waitq))
84 cfs_waitq_broadcast(&the_lnet.ln_waitq);
86 # ifndef HAVE_LIBPTHREAD
87 /* LNetEQPoll() calls into _the_ LND to wait for action */
89 /* Wake anyone waiting in LNetEQPoll() */
90 pthread_cond_broadcast(&the_lnet.ln_cond);
96 lnet_complete_msg_locked(lnet_msg_t *msg)
98 lnet_handle_wire_t ack_wmd;
100 int status = msg->msg_ev.status;
102 LASSERT (msg->msg_onactivelist);
104 if (status == 0 && msg->msg_ack) {
105 /* Only send an ACK if the PUT completed successfully */
107 lnet_return_credits_locked(msg);
112 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
113 LASSERT(!msg->msg_routing);
115 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
117 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
119 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
120 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
121 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
123 rc = lnet_send(msg->msg_ev.target.nid, msg);
129 } else if (status == 0 && /* OK so far */
130 (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
132 LASSERT (!msg->msg_receiving); /* called back recv already */
136 rc = lnet_send(LNET_NID_ANY, msg);
144 lnet_return_credits_locked(msg);
146 LASSERT (msg->msg_onactivelist);
147 msg->msg_onactivelist = 0;
148 cfs_list_del (&msg->msg_activelist);
149 the_lnet.ln_counters.msgs_alloc--;
155 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
163 LASSERT (!cfs_in_interrupt ());
168 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
169 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
170 msg->msg_target_is_router ? "t" : "",
171 msg->msg_routing ? "X" : "",
172 msg->msg_ack ? "A" : "",
173 msg->msg_sending ? "S" : "",
174 msg->msg_receiving ? "R" : "",
175 msg->msg_delayed ? "d" : "",
176 msg->msg_txcredit ? "C" : "",
177 msg->msg_peertxcredit ? "c" : "",
178 msg->msg_rtrcredit ? "F" : "",
179 msg->msg_peerrtrcredit ? "f" : "",
180 msg->msg_onactivelist ? "!" : "",
181 msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
182 msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
186 LASSERT (msg->msg_onactivelist);
188 msg->msg_ev.status = status;
194 /* Now it's safe to drop my caller's ref */
196 LASSERT (md->md_refcount >= 0);
198 unlink = lnet_md_unlinkable(md);
200 msg->msg_ev.unlinked = unlink;
202 if (md->md_eq != NULL)
203 lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
211 cfs_list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
213 /* Recursion breaker. Don't complete the message here if I am (or
214 * enough other threads are) already completing messages */
218 for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
219 if (the_lnet.ln_finalizers[i] == cfs_current())
221 if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
227 the_lnet.ln_finalizers[my_slot] = cfs_current();
229 if (the_lnet.ln_finalizing)
232 the_lnet.ln_finalizing = 1;
235 while (!cfs_list_empty(&the_lnet.ln_finalizeq)) {
236 msg = cfs_list_entry(the_lnet.ln_finalizeq.next,
237 lnet_msg_t, msg_list);
239 cfs_list_del(&msg->msg_list);
241 /* NB drops and regains the lnet lock if it actually does
242 * anything, so my finalizing friends can chomp along too */
243 lnet_complete_msg_locked(msg);
247 the_lnet.ln_finalizers[my_slot] = NULL;
249 the_lnet.ln_finalizing = 0;