1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Message decoding, parsing and finalizing routines
41 #define DEBUG_SUBSYSTEM S_LNET
43 #include <lnet/lib-lnet.h>
46 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
50 memset(ev, 0, sizeof(*ev));
54 ev->type = LNET_EVENT_UNLINK;
55 lnet_md_deconstruct(md, &ev->md);
56 lnet_md2handle(&ev->md_handle, md);
61 lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
63 lnet_event_t *eq_slot;
65 /* Allocate the next queue slot */
66 ev->sequence = eq->eq_enq_seq++;
68 /* size must be a power of 2 to handle sequence # overflow */
69 LASSERT (eq->eq_size != 0 &&
70 eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
71 eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
73 /* There is no race since both event consumers and event producers
74 * take the LNET_LOCK, so we don't screw around with memory
75 * barriers, setting the sequence number last or wierd structure
76 * layout assertions. */
79 /* Call the callback handler (if any) */
80 if (eq->eq_callback != NULL)
81 eq->eq_callback (eq_slot);
84 /* Wake anyone waiting in LNetEQPoll() */
85 if (cfs_waitq_active(&the_lnet.ln_waitq))
86 cfs_waitq_broadcast(&the_lnet.ln_waitq);
89 /* LNetEQPoll() calls into _the_ LND to wait for action */
91 /* Wake anyone waiting in LNetEQPoll() */
92 pthread_cond_broadcast(&the_lnet.ln_cond);
98 lnet_complete_msg_locked(lnet_msg_t *msg)
100 lnet_handle_wire_t ack_wmd;
102 int status = msg->msg_ev.status;
104 LASSERT (msg->msg_onactivelist);
106 if (status == 0 && msg->msg_ack) {
107 /* Only send an ACK if the PUT completed successfully */
109 lnet_return_credits_locked(msg);
114 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
115 LASSERT(!msg->msg_routing);
117 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
119 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
121 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
122 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
123 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
125 rc = lnet_send(msg->msg_ev.target.nid, msg);
131 } else if (status == 0 && /* OK so far */
132 (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
134 LASSERT (!msg->msg_receiving); /* called back recv already */
138 rc = lnet_send(LNET_NID_ANY, msg);
146 lnet_return_credits_locked(msg);
148 LASSERT (msg->msg_onactivelist);
149 msg->msg_onactivelist = 0;
150 cfs_list_del (&msg->msg_activelist);
151 the_lnet.ln_counters.msgs_alloc--;
157 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
165 LASSERT (!cfs_in_interrupt ());
170 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
171 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
172 msg->msg_target_is_router ? "t" : "",
173 msg->msg_routing ? "X" : "",
174 msg->msg_ack ? "A" : "",
175 msg->msg_sending ? "S" : "",
176 msg->msg_receiving ? "R" : "",
177 msg->msg_delayed ? "d" : "",
178 msg->msg_txcredit ? "C" : "",
179 msg->msg_peertxcredit ? "c" : "",
180 msg->msg_rtrcredit ? "F" : "",
181 msg->msg_peerrtrcredit ? "f" : "",
182 msg->msg_onactivelist ? "!" : "",
183 msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
184 msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
188 LASSERT (msg->msg_onactivelist);
190 msg->msg_ev.status = status;
196 /* Now it's safe to drop my caller's ref */
198 LASSERT (md->md_refcount >= 0);
200 unlink = lnet_md_unlinkable(md);
202 msg->msg_ev.unlinked = unlink;
204 if (md->md_eq != NULL)
205 lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
213 cfs_list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
215 /* Recursion breaker. Don't complete the message here if I am (or
216 * enough other threads are) already completing messages */
220 for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
221 if (the_lnet.ln_finalizers[i] == cfs_current())
223 if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
229 the_lnet.ln_finalizers[my_slot] = cfs_current();
231 if (the_lnet.ln_finalizing)
234 the_lnet.ln_finalizing = 1;
237 while (!cfs_list_empty(&the_lnet.ln_finalizeq)) {
238 msg = cfs_list_entry(the_lnet.ln_finalizeq.next,
239 lnet_msg_t, msg_list);
241 cfs_list_del(&msg->msg_list);
243 /* NB drops and regains the lnet lock if it actually does
244 * anything, so my finalizing friends can chomp along too */
245 lnet_complete_msg_locked(msg);
249 the_lnet.ln_finalizers[my_slot] = NULL;
251 the_lnet.ln_finalizing = 0;