4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Message decoding, parsing and finalizing routines
39 #define DEBUG_SUBSYSTEM S_LNET
41 #include <lnet/lib-lnet.h>
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
48 memset(ev, 0, sizeof(*ev));
52 ev->type = LNET_EVENT_UNLINK;
53 lnet_md_deconstruct(md, &ev->md);
54 lnet_md2handle(&ev->md_handle, md);
59 * Don't need any lock, must be called after lnet_commit_md
62 lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
64 lnet_hdr_t *hdr = &msg->msg_hdr;
65 lnet_event_t *ev = &msg->msg_ev;
67 LASSERT(!msg->msg_routing);
71 if (ev_type == LNET_EVENT_SEND) {
72 /* event for active message */
73 ev->target.nid = le64_to_cpu(hdr->dest_nid);
74 ev->target.pid = le32_to_cpu(hdr->dest_pid);
75 ev->initiator.nid = LNET_NID_ANY;
76 ev->initiator.pid = the_lnet.ln_pid;
77 ev->sender = LNET_NID_ANY;
80 /* event for passive message */
81 ev->target.pid = hdr->dest_pid;
82 ev->target.nid = hdr->dest_nid;
83 ev->initiator.pid = hdr->src_pid;
84 ev->initiator.nid = hdr->src_nid;
85 ev->rlength = hdr->payload_length;
86 ev->sender = msg->msg_from;
87 ev->mlength = msg->msg_wanted;
88 ev->offset = msg->msg_offset;
95 case LNET_EVENT_PUT: /* passive PUT */
96 ev->pt_index = hdr->msg.put.ptl_index;
97 ev->match_bits = hdr->msg.put.match_bits;
98 ev->hdr_data = hdr->msg.put.hdr_data;
101 case LNET_EVENT_GET: /* passive GET */
102 ev->pt_index = hdr->msg.get.ptl_index;
103 ev->match_bits = hdr->msg.get.match_bits;
107 case LNET_EVENT_ACK: /* ACK */
108 ev->match_bits = hdr->msg.ack.match_bits;
109 ev->mlength = hdr->msg.ack.mlength;
112 case LNET_EVENT_REPLY: /* REPLY */
115 case LNET_EVENT_SEND: /* active message */
116 if (msg->msg_type == LNET_MSG_PUT) {
117 ev->pt_index = le32_to_cpu(hdr->msg.put.ptl_index);
118 ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
119 ev->offset = le32_to_cpu(hdr->msg.put.offset);
121 ev->rlength = le32_to_cpu(hdr->payload_length);
122 ev->hdr_data = le64_to_cpu(hdr->msg.put.hdr_data);
125 LASSERT(msg->msg_type == LNET_MSG_GET);
126 ev->pt_index = le32_to_cpu(hdr->msg.get.ptl_index);
127 ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
129 ev->rlength = le32_to_cpu(hdr->msg.get.sink_length);
130 ev->offset = le32_to_cpu(hdr->msg.get.src_offset);
138 lnet_complete_msg_locked(lnet_msg_t *msg)
140 lnet_handle_wire_t ack_wmd;
142 int status = msg->msg_ev.status;
144 LASSERT (msg->msg_onactivelist);
146 if (status == 0 && msg->msg_ack) {
147 /* Only send an ACK if the PUT completed successfully */
149 lnet_return_credits_locked(msg);
154 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
155 LASSERT(!msg->msg_routing);
157 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
159 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
161 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
162 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
163 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
165 rc = lnet_send(msg->msg_ev.target.nid, msg);
171 } else if (status == 0 && /* OK so far */
172 (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
174 LASSERT (!msg->msg_receiving); /* called back recv already */
178 rc = lnet_send(LNET_NID_ANY, msg);
186 lnet_return_credits_locked(msg);
188 LASSERT (msg->msg_onactivelist);
189 msg->msg_onactivelist = 0;
190 cfs_list_del (&msg->msg_activelist);
191 the_lnet.ln_counters.msgs_alloc--;
192 lnet_msg_free_locked(msg);
197 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
199 struct lnet_msg_container *container;
204 LASSERT (!cfs_in_interrupt ());
209 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
210 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
211 msg->msg_target_is_router ? "t" : "",
212 msg->msg_routing ? "X" : "",
213 msg->msg_ack ? "A" : "",
214 msg->msg_sending ? "S" : "",
215 msg->msg_receiving ? "R" : "",
216 msg->msg_delayed ? "d" : "",
217 msg->msg_txcredit ? "C" : "",
218 msg->msg_peertxcredit ? "c" : "",
219 msg->msg_rtrcredit ? "F" : "",
220 msg->msg_peerrtrcredit ? "f" : "",
221 msg->msg_onactivelist ? "!" : "",
222 msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
223 msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
227 LASSERT (msg->msg_onactivelist);
229 msg->msg_ev.status = status;
235 /* Now it's safe to drop my caller's ref */
237 LASSERT (md->md_refcount >= 0);
239 unlink = lnet_md_unlinkable(md);
241 msg->msg_ev.unlinked = unlink;
243 if (md->md_eq != NULL)
244 lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
252 container = &the_lnet.ln_msg_container;
253 cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
255 /* Recursion breaker. Don't complete the message here if I am (or
256 * enough other threads are) already completing messages */
260 for (i = 0; i < container->msc_nfinalizers; i++) {
261 if (container->msc_finalizers[i] == cfs_current())
264 if (my_slot < 0 && container->msc_finalizers[i] == NULL)
271 container->msc_finalizers[my_slot] = cfs_current();
273 LASSERT(container->msc_nfinalizers == 1);
274 if (container->msc_finalizers[0] != NULL)
278 container->msc_finalizers[0] = (struct lnet_msg_container *)1;
281 while (!cfs_list_empty(&container->msc_finalizing)) {
282 msg = cfs_list_entry(container->msc_finalizing.next,
283 lnet_msg_t, msg_list);
285 cfs_list_del(&msg->msg_list);
287 /* NB drops and regains the lnet lock if it actually does
288 * anything, so my finalizing friends can chomp along too */
289 lnet_complete_msg_locked(msg);
292 container->msc_finalizers[my_slot] = NULL;
298 lnet_msg_container_cleanup(struct lnet_msg_container *container)
302 if (container->msc_init == 0)
305 while (!cfs_list_empty(&container->msc_active)) {
306 lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
307 lnet_msg_t, msg_activelist);
309 LASSERT(msg->msg_onactivelist);
310 msg->msg_onactivelist = 0;
311 cfs_list_del(&msg->msg_activelist);
317 CERROR("%d active msg on exit\n", count);
319 if (container->msc_finalizers != NULL) {
320 LIBCFS_FREE(container->msc_finalizers,
321 container->msc_nfinalizers *
322 sizeof(*container->msc_finalizers));
323 container->msc_finalizers = NULL;
325 #ifdef LNET_USE_LIB_FREELIST
326 lnet_freelist_fini(&container->msc_freelist);
328 container->msc_init = 0;
332 lnet_msg_container_setup(struct lnet_msg_container *container)
336 container->msc_init = 1;
338 CFS_INIT_LIST_HEAD(&container->msc_active);
339 CFS_INIT_LIST_HEAD(&container->msc_finalizing);
341 #ifdef LNET_USE_LIB_FREELIST
342 memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
344 rc = lnet_freelist_init(&container->msc_freelist,
345 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
347 CERROR("Failed to init freelist for message container\n");
348 lnet_msg_container_cleanup(container);
355 container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
357 LIBCFS_ALLOC(container->msc_finalizers,
358 container->msc_nfinalizers *
359 sizeof(*container->msc_finalizers));
361 if (container->msc_finalizers == NULL) {
362 CERROR("Failed to allocate message finalizers\n");
363 lnet_msg_container_cleanup(container);