4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Message decoding, parsing and finalizing routines
39 #define DEBUG_SUBSYSTEM S_LNET
41 #include <lnet/lib-lnet.h>
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
48 memset(ev, 0, sizeof(*ev));
52 ev->type = LNET_EVENT_UNLINK;
53 lnet_md_deconstruct(md, &ev->md);
54 lnet_md2handle(&ev->md_handle, md);
59 lnet_complete_msg_locked(lnet_msg_t *msg)
61 lnet_handle_wire_t ack_wmd;
63 int status = msg->msg_ev.status;
65 LASSERT (msg->msg_onactivelist);
67 if (status == 0 && msg->msg_ack) {
68 /* Only send an ACK if the PUT completed successfully */
70 lnet_return_credits_locked(msg);
75 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
76 LASSERT(!msg->msg_routing);
78 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
80 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
82 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
83 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
84 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
86 rc = lnet_send(msg->msg_ev.target.nid, msg);
92 } else if (status == 0 && /* OK so far */
93 (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
95 LASSERT (!msg->msg_receiving); /* called back recv already */
99 rc = lnet_send(LNET_NID_ANY, msg);
107 lnet_return_credits_locked(msg);
109 LASSERT (msg->msg_onactivelist);
110 msg->msg_onactivelist = 0;
111 cfs_list_del (&msg->msg_activelist);
112 the_lnet.ln_counters.msgs_alloc--;
113 lnet_msg_free_locked(msg);
118 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
120 struct lnet_msg_container *container;
125 LASSERT (!cfs_in_interrupt ());
130 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
131 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
132 msg->msg_target_is_router ? "t" : "",
133 msg->msg_routing ? "X" : "",
134 msg->msg_ack ? "A" : "",
135 msg->msg_sending ? "S" : "",
136 msg->msg_receiving ? "R" : "",
137 msg->msg_delayed ? "d" : "",
138 msg->msg_txcredit ? "C" : "",
139 msg->msg_peertxcredit ? "c" : "",
140 msg->msg_rtrcredit ? "F" : "",
141 msg->msg_peerrtrcredit ? "f" : "",
142 msg->msg_onactivelist ? "!" : "",
143 msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
144 msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
148 LASSERT (msg->msg_onactivelist);
150 msg->msg_ev.status = status;
156 /* Now it's safe to drop my caller's ref */
158 LASSERT (md->md_refcount >= 0);
160 unlink = lnet_md_unlinkable(md);
162 msg->msg_ev.unlinked = unlink;
164 if (md->md_eq != NULL)
165 lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
173 container = &the_lnet.ln_msg_container;
174 cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
176 /* Recursion breaker. Don't complete the message here if I am (or
177 * enough other threads are) already completing messages */
181 for (i = 0; i < container->msc_nfinalizers; i++) {
182 if (container->msc_finalizers[i] == cfs_current())
185 if (my_slot < 0 && container->msc_finalizers[i] == NULL)
192 container->msc_finalizers[my_slot] = cfs_current();
194 LASSERT(container->msc_nfinalizers == 1);
195 if (container->msc_finalizers[0] != NULL)
199 container->msc_finalizers[0] = (struct lnet_msg_container *)1;
202 while (!cfs_list_empty(&container->msc_finalizing)) {
203 msg = cfs_list_entry(container->msc_finalizing.next,
204 lnet_msg_t, msg_list);
206 cfs_list_del(&msg->msg_list);
208 /* NB drops and regains the lnet lock if it actually does
209 * anything, so my finalizing friends can chomp along too */
210 lnet_complete_msg_locked(msg);
213 container->msc_finalizers[my_slot] = NULL;
219 lnet_msg_container_cleanup(struct lnet_msg_container *container)
223 if (container->msc_init == 0)
226 while (!cfs_list_empty(&container->msc_active)) {
227 lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
228 lnet_msg_t, msg_activelist);
230 LASSERT(msg->msg_onactivelist);
231 msg->msg_onactivelist = 0;
232 cfs_list_del(&msg->msg_activelist);
238 CERROR("%d active msg on exit\n", count);
240 if (container->msc_finalizers != NULL) {
241 LIBCFS_FREE(container->msc_finalizers,
242 container->msc_nfinalizers *
243 sizeof(*container->msc_finalizers));
244 container->msc_finalizers = NULL;
246 #ifdef LNET_USE_LIB_FREELIST
247 lnet_freelist_fini(&container->msc_freelist);
249 container->msc_init = 0;
253 lnet_msg_container_setup(struct lnet_msg_container *container)
257 container->msc_init = 1;
259 CFS_INIT_LIST_HEAD(&container->msc_active);
260 CFS_INIT_LIST_HEAD(&container->msc_finalizing);
262 #ifdef LNET_USE_LIB_FREELIST
263 memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
265 rc = lnet_freelist_init(&container->msc_freelist,
266 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
268 CERROR("Failed to init freelist for message container\n");
269 lnet_msg_container_cleanup(container);
276 container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
278 LIBCFS_ALLOC(container->msc_finalizers,
279 container->msc_nfinalizers *
280 sizeof(*container->msc_finalizers));
282 if (container->msc_finalizers == NULL) {
283 CERROR("Failed to allocate message finalizers\n");
284 lnet_msg_container_cleanup(container);