4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Message decoding, parsing and finalizing routines
39 #define DEBUG_SUBSYSTEM S_LNET
41 #include <lnet/lib-lnet.h>
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
48 memset(ev, 0, sizeof(*ev));
52 ev->type = LNET_EVENT_UNLINK;
53 lnet_md_deconstruct(md, &ev->md);
54 lnet_md2handle(&ev->md_handle, md);
59 lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
61 lnet_event_t *eq_slot;
63 /* Allocate the next queue slot */
64 ev->sequence = eq->eq_enq_seq++;
66 /* size must be a power of 2 to handle sequence # overflow */
67 LASSERT (eq->eq_size != 0 &&
68 eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
69 eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
71 /* There is no race since both event consumers and event producers
72 * take the LNET_LOCK, so we don't screw around with memory
73 * barriers, setting the sequence number last or weird structure
74 * layout assertions. */
77 /* Call the callback handler (if any) */
78 if (eq->eq_callback != NULL)
79 eq->eq_callback (eq_slot);
82 /* Wake anyone waiting in LNetEQPoll() */
83 if (cfs_waitq_active(&the_lnet.ln_waitq))
84 cfs_waitq_broadcast(&the_lnet.ln_waitq);
86 # ifndef HAVE_LIBPTHREAD
87 /* LNetEQPoll() calls into _the_ LND to wait for action */
89 /* Wake anyone waiting in LNetEQPoll() */
90 pthread_cond_broadcast(&the_lnet.ln_cond);
96 lnet_complete_msg_locked(lnet_msg_t *msg)
98 lnet_handle_wire_t ack_wmd;
100 int status = msg->msg_ev.status;
102 LASSERT (msg->msg_onactivelist);
104 if (status == 0 && msg->msg_ack) {
105 /* Only send an ACK if the PUT completed successfully */
107 lnet_return_credits_locked(msg);
112 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
113 LASSERT(!msg->msg_routing);
115 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
117 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
119 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
120 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
121 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
123 rc = lnet_send(msg->msg_ev.target.nid, msg);
129 } else if (status == 0 && /* OK so far */
130 (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
132 LASSERT (!msg->msg_receiving); /* called back recv already */
136 rc = lnet_send(LNET_NID_ANY, msg);
144 lnet_return_credits_locked(msg);
146 LASSERT (msg->msg_onactivelist);
147 msg->msg_onactivelist = 0;
148 cfs_list_del (&msg->msg_activelist);
149 the_lnet.ln_counters.msgs_alloc--;
150 lnet_msg_free_locked(msg);
155 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
157 struct lnet_msg_container *container;
162 LASSERT (!cfs_in_interrupt ());
167 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
168 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
169 msg->msg_target_is_router ? "t" : "",
170 msg->msg_routing ? "X" : "",
171 msg->msg_ack ? "A" : "",
172 msg->msg_sending ? "S" : "",
173 msg->msg_receiving ? "R" : "",
174 msg->msg_delayed ? "d" : "",
175 msg->msg_txcredit ? "C" : "",
176 msg->msg_peertxcredit ? "c" : "",
177 msg->msg_rtrcredit ? "F" : "",
178 msg->msg_peerrtrcredit ? "f" : "",
179 msg->msg_onactivelist ? "!" : "",
180 msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
181 msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
185 LASSERT (msg->msg_onactivelist);
187 msg->msg_ev.status = status;
193 /* Now it's safe to drop my caller's ref */
195 LASSERT (md->md_refcount >= 0);
197 unlink = lnet_md_unlinkable(md);
199 msg->msg_ev.unlinked = unlink;
201 if (md->md_eq != NULL)
202 lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
210 container = &the_lnet.ln_msg_container;
211 cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
213 /* Recursion breaker. Don't complete the message here if I am (or
214 * enough other threads are) already completing messages */
218 for (i = 0; i < container->msc_nfinalizers; i++) {
219 if (container->msc_finalizers[i] == cfs_current())
222 if (my_slot < 0 && container->msc_finalizers[i] == NULL)
229 container->msc_finalizers[my_slot] = cfs_current();
231 LASSERT(container->msc_nfinalizers == 1);
232 if (container->msc_finalizers[0] != NULL)
236 container->msc_finalizers[0] = (struct lnet_msg_container *)1;
239 while (!cfs_list_empty(&container->msc_finalizing)) {
240 msg = cfs_list_entry(container->msc_finalizing.next,
241 lnet_msg_t, msg_list);
243 cfs_list_del(&msg->msg_list);
245 /* NB drops and regains the lnet lock if it actually does
246 * anything, so my finalizing friends can chomp along too */
247 lnet_complete_msg_locked(msg);
250 container->msc_finalizers[my_slot] = NULL;
256 lnet_msg_container_cleanup(struct lnet_msg_container *container)
260 if (container->msc_init == 0)
263 while (!cfs_list_empty(&container->msc_active)) {
264 lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
265 lnet_msg_t, msg_activelist);
267 LASSERT(msg->msg_onactivelist);
268 msg->msg_onactivelist = 0;
269 cfs_list_del(&msg->msg_activelist);
275 CERROR("%d active msg on exit\n", count);
277 if (container->msc_finalizers != NULL) {
278 LIBCFS_FREE(container->msc_finalizers,
279 container->msc_nfinalizers *
280 sizeof(*container->msc_finalizers));
281 container->msc_finalizers = NULL;
283 #ifdef LNET_USE_LIB_FREELIST
284 lnet_freelist_fini(&container->msc_freelist);
286 container->msc_init = 0;
290 lnet_msg_container_setup(struct lnet_msg_container *container)
294 container->msc_init = 1;
296 CFS_INIT_LIST_HEAD(&container->msc_active);
297 CFS_INIT_LIST_HEAD(&container->msc_finalizing);
299 #ifdef LNET_USE_LIB_FREELIST
300 memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
302 rc = lnet_freelist_init(&container->msc_freelist,
303 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
305 CERROR("Failed to init freelist for message container\n");
306 lnet_msg_container_cleanup(container);
313 container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
315 LIBCFS_ALLOC(container->msc_finalizers,
316 container->msc_nfinalizers *
317 sizeof(*container->msc_finalizers));
319 if (container->msc_finalizers == NULL) {
320 CERROR("Failed to allocate message finalizers\n");
321 lnet_msg_container_cleanup(container);