1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Message decoding, parsing and finalizing routines
41 #define DEBUG_SUBSYSTEM S_LNET
43 #include <lnet/lib-lnet.h>
46 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
48 memset(ev, 0, sizeof(*ev));
52 ev->type = LNET_EVENT_UNLINK;
53 lnet_md_deconstruct(md, &ev->md);
54 lnet_md2handle(&ev->md_handle, md);
58 lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
60 lnet_event_t *eq_slot;
62 /* Allocate the next queue slot */
63 ev->sequence = eq->eq_enq_seq++;
65 /* size must be a power of 2 to handle sequence # overflow */
66 LASSERT (eq->eq_size != 0 &&
67 eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
68 eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
70 /* There is no race since both event consumers and event producers
71 * take the LNET_LOCK, so we don't screw around with memory
72 * barriers, setting the sequence number last or wierd structure
73 * layout assertions. */
76 /* Call the callback handler (if any) */
77 if (eq->eq_callback != NULL)
78 eq->eq_callback (eq_slot);
81 /* Wake anyone waiting in LNetEQPoll() */
82 if (cfs_waitq_active(&the_lnet.ln_waitq))
83 cfs_waitq_broadcast(&the_lnet.ln_waitq);
85 # ifndef HAVE_LIBPTHREAD
86 /* LNetEQPoll() calls into _the_ LND to wait for action */
88 /* Wake anyone waiting in LNetEQPoll() */
89 pthread_cond_broadcast(&the_lnet.ln_cond);
95 lnet_complete_msg_locked(lnet_msg_t *msg)
97 lnet_handle_wire_t ack_wmd;
99 int status = msg->msg_ev.status;
101 LASSERT (msg->msg_onactivelist);
103 if (status == 0 && msg->msg_ack) {
104 /* Only send an ACK if the PUT completed successfully */
106 lnet_return_credits_locked(msg);
111 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
112 LASSERT(!msg->msg_routing);
114 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
116 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
118 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
119 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
120 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
122 rc = lnet_send(msg->msg_ev.target.nid, msg);
128 } else if (status == 0 && /* OK so far */
129 (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
131 LASSERT (!msg->msg_receiving); /* called back recv already */
135 rc = lnet_send(LNET_NID_ANY, msg);
143 lnet_return_credits_locked(msg);
145 LASSERT (msg->msg_onactivelist);
146 msg->msg_onactivelist = 0;
147 list_del (&msg->msg_activelist);
148 the_lnet.ln_counters.msgs_alloc--;
154 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
162 LASSERT (!in_interrupt ());
167 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
168 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
169 msg->msg_target_is_router ? "t" : "",
170 msg->msg_routing ? "X" : "",
171 msg->msg_ack ? "A" : "",
172 msg->msg_sending ? "S" : "",
173 msg->msg_receiving ? "R" : "",
174 msg->msg_delayed ? "d" : "",
175 msg->msg_txcredit ? "C" : "",
176 msg->msg_peertxcredit ? "c" : "",
177 msg->msg_rtrcredit ? "F" : "",
178 msg->msg_peerrtrcredit ? "f" : "",
179 msg->msg_onactivelist ? "!" : "",
180 msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
181 msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
185 LASSERT (msg->msg_onactivelist);
187 msg->msg_ev.status = status;
193 /* Now it's safe to drop my caller's ref */
195 LASSERT (md->md_refcount >= 0);
197 unlink = lnet_md_unlinkable(md);
199 msg->msg_ev.unlinked = unlink;
201 if (md->md_eq != NULL)
202 lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
210 list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
212 /* Recursion breaker. Don't complete the message here if I am (or
213 * enough other threads are) already completing messages */
217 for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
218 if (the_lnet.ln_finalizers[i] == cfs_current())
220 if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
226 the_lnet.ln_finalizers[my_slot] = cfs_current();
228 if (the_lnet.ln_finalizing)
231 the_lnet.ln_finalizing = 1;
234 while (!list_empty(&the_lnet.ln_finalizeq)) {
235 msg = list_entry(the_lnet.ln_finalizeq.next,
236 lnet_msg_t, msg_list);
238 list_del(&msg->msg_list);
240 /* NB drops and regains the lnet lock if it actually does
241 * anything, so my finalizing friends can chomp along too */
242 lnet_complete_msg_locked(msg);
246 the_lnet.ln_finalizers[my_slot] = NULL;
248 the_lnet.ln_finalizing = 0;