4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Message decoding, parsing and finalizing routines
39 #define DEBUG_SUBSYSTEM S_LNET
41 #include <lnet/lib-lnet.h>
44 lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
48 memset(ev, 0, sizeof(*ev));
52 ev->type = LNET_EVENT_UNLINK;
53 lnet_md_deconstruct(md, &ev->md);
54 lnet_md2handle(&ev->md_handle, md);
59 * Don't need any lock, must be called after lnet_commit_md
62 lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
64 lnet_hdr_t *hdr = &msg->msg_hdr;
65 lnet_event_t *ev = &msg->msg_ev;
67 LASSERT(!msg->msg_routing);
71 if (ev_type == LNET_EVENT_SEND) {
72 /* event for active message */
73 ev->target.nid = le64_to_cpu(hdr->dest_nid);
74 ev->target.pid = le32_to_cpu(hdr->dest_pid);
75 ev->initiator.nid = LNET_NID_ANY;
76 ev->initiator.pid = the_lnet.ln_pid;
77 ev->sender = LNET_NID_ANY;
80 /* event for passive message */
81 ev->target.pid = hdr->dest_pid;
82 ev->target.nid = hdr->dest_nid;
83 ev->initiator.pid = hdr->src_pid;
84 ev->initiator.nid = hdr->src_nid;
85 ev->rlength = hdr->payload_length;
86 ev->sender = msg->msg_from;
87 ev->mlength = msg->msg_wanted;
88 ev->offset = msg->msg_offset;
95 case LNET_EVENT_PUT: /* passive PUT */
96 ev->pt_index = hdr->msg.put.ptl_index;
97 ev->match_bits = hdr->msg.put.match_bits;
98 ev->hdr_data = hdr->msg.put.hdr_data;
101 case LNET_EVENT_GET: /* passive GET */
102 ev->pt_index = hdr->msg.get.ptl_index;
103 ev->match_bits = hdr->msg.get.match_bits;
107 case LNET_EVENT_ACK: /* ACK */
108 ev->match_bits = hdr->msg.ack.match_bits;
109 ev->mlength = hdr->msg.ack.mlength;
112 case LNET_EVENT_REPLY: /* REPLY */
115 case LNET_EVENT_SEND: /* active message */
116 if (msg->msg_type == LNET_MSG_PUT) {
117 ev->pt_index = le32_to_cpu(hdr->msg.put.ptl_index);
118 ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
119 ev->offset = le32_to_cpu(hdr->msg.put.offset);
121 ev->rlength = le32_to_cpu(hdr->payload_length);
122 ev->hdr_data = le64_to_cpu(hdr->msg.put.hdr_data);
125 LASSERT(msg->msg_type == LNET_MSG_GET);
126 ev->pt_index = le32_to_cpu(hdr->msg.get.ptl_index);
127 ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
129 ev->rlength = le32_to_cpu(hdr->msg.get.sink_length);
130 ev->offset = le32_to_cpu(hdr->msg.get.src_offset);
138 lnet_msg_commit(lnet_msg_t *msg, int sending)
140 struct lnet_msg_container *container = &the_lnet.ln_msg_container;
141 lnet_counters_t *counters = the_lnet.ln_counters;
143 /* routed message can be committed for both receiving and sending */
144 LASSERT(!msg->msg_tx_committed);
146 if (msg->msg_rx_committed) { /* routed message, or reply for GET */
148 LASSERT(msg->msg_onactivelist);
149 msg->msg_tx_committed = 1;
153 LASSERT(!msg->msg_onactivelist);
154 msg->msg_onactivelist = 1;
155 cfs_list_add(&msg->msg_activelist, &container->msc_active);
157 counters->msgs_alloc++;
158 if (counters->msgs_alloc > counters->msgs_max)
159 counters->msgs_max = counters->msgs_alloc;
162 msg->msg_tx_committed = 1;
164 msg->msg_rx_committed = 1;
168 lnet_msg_tx_decommit(lnet_msg_t *msg, int status)
170 lnet_counters_t *counters = the_lnet.ln_counters;
171 lnet_event_t *ev = &msg->msg_ev;
173 LASSERT(msg->msg_tx_committed);
178 default: /* routed message */
179 LASSERT(msg->msg_routing);
180 LASSERT(msg->msg_rx_committed);
181 LASSERT(ev->type == 0);
183 counters->route_length += msg->msg_len;
184 counters->route_count++;
188 /* should have been decommitted */
189 LASSERT(!msg->msg_rx_committed);
190 /* overwritten while sending ACK */
191 LASSERT(msg->msg_type == LNET_MSG_ACK);
192 msg->msg_type = LNET_MSG_PUT; /* fix type */
195 case LNET_EVENT_SEND:
196 LASSERT(!msg->msg_rx_committed);
197 if (msg->msg_type == LNET_MSG_PUT)
198 counters->send_length += msg->msg_len;
202 LASSERT(msg->msg_rx_committed);
203 /* overwritten while sending reply */
204 LASSERT(msg->msg_type == LNET_MSG_REPLY);
206 msg->msg_type = LNET_MSG_GET; /* fix type */
207 counters->send_length += msg->msg_len;
211 counters->send_count++;
213 lnet_return_tx_credits_locked(msg);
214 msg->msg_tx_committed = 0;
218 lnet_msg_rx_decommit(lnet_msg_t *msg, int status)
220 lnet_counters_t *counters = the_lnet.ln_counters;
221 lnet_event_t *ev = &msg->msg_ev;
223 LASSERT(!msg->msg_tx_committed); /* decommitted or uncommitted */
224 LASSERT(msg->msg_rx_committed);
231 LASSERT(ev->type == 0);
232 LASSERT(msg->msg_routing);
236 LASSERT(msg->msg_type == LNET_MSG_ACK);
240 LASSERT(msg->msg_type == LNET_MSG_GET);
244 LASSERT(msg->msg_type == LNET_MSG_PUT);
247 case LNET_EVENT_REPLY:
248 LASSERT(msg->msg_type == LNET_MSG_REPLY ||
249 msg->msg_type == LNET_MSG_GET); /* optimized GET */
253 counters->recv_count++;
254 if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
255 counters->recv_length += msg->msg_wanted;
258 lnet_return_rx_credits_locked(msg);
259 msg->msg_rx_committed = 0;
263 lnet_msg_decommit(lnet_msg_t *msg, int status)
265 lnet_counters_t *counters = the_lnet.ln_counters;
267 LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
268 LASSERT(msg->msg_onactivelist);
270 if (msg->msg_tx_committed) /* always decommit for sending first */
271 lnet_msg_tx_decommit(msg, status);
273 if (msg->msg_rx_committed)
274 lnet_msg_rx_decommit(msg, status);
276 cfs_list_del(&msg->msg_activelist);
277 msg->msg_onactivelist = 0;
278 counters->msgs_alloc--;
282 lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
283 unsigned int offset, unsigned int mlen)
285 /* Here, we attach the MD on lnet_msg and mark it busy and
286 * decrementing its threshold. Come what may, the lnet_msg "owns"
287 * the MD until a call to lnet_msg_detach_md or lnet_finalize()
288 * signals completion. */
289 LASSERT(!msg->msg_routing);
292 if (msg->msg_receiving) { /* commited for receiving */
293 msg->msg_offset = offset;
294 msg->msg_wanted = mlen;
298 if (md->md_threshold != LNET_MD_THRESH_INF) {
299 LASSERT(md->md_threshold > 0);
303 /* build umd in event */
304 lnet_md2handle(&msg->msg_ev.md_handle, md);
305 lnet_md_deconstruct(md, &msg->msg_ev.md);
309 lnet_msg_detach_md(lnet_msg_t *msg, int status)
311 lnet_libmd_t *md = msg->msg_md;
314 /* Now it's safe to drop my caller's ref */
316 LASSERT(md->md_refcount >= 0);
318 unlink = lnet_md_unlinkable(md);
319 if (md->md_eq != NULL) {
320 msg->msg_ev.status = status;
321 msg->msg_ev.unlinked = unlink;
322 lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
332 lnet_complete_msg_locked(lnet_msg_t *msg)
334 lnet_handle_wire_t ack_wmd;
336 int status = msg->msg_ev.status;
338 LASSERT (msg->msg_onactivelist);
340 if (status == 0 && msg->msg_ack) {
341 /* Only send an ACK if the PUT completed successfully */
343 lnet_msg_decommit(msg, 0);
348 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
349 LASSERT(!msg->msg_routing);
351 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
353 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
355 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
356 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
357 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
359 rc = lnet_send(msg->msg_ev.target.nid, msg);
365 } else if (status == 0 && /* OK so far */
366 (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
368 LASSERT (!msg->msg_receiving); /* called back recv already */
372 rc = lnet_send(LNET_NID_ANY, msg);
380 lnet_msg_decommit(msg, status);
381 lnet_msg_free_locked(msg);
386 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
388 struct lnet_msg_container *container;
393 LASSERT (!cfs_in_interrupt ());
398 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
399 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
400 msg->msg_target_is_router ? "t" : "",
401 msg->msg_routing ? "X" : "",
402 msg->msg_ack ? "A" : "",
403 msg->msg_sending ? "S" : "",
404 msg->msg_receiving ? "R" : "",
405 msg->msg_delayed ? "d" : "",
406 msg->msg_txcredit ? "C" : "",
407 msg->msg_peertxcredit ? "c" : "",
408 msg->msg_rtrcredit ? "F" : "",
409 msg->msg_peerrtrcredit ? "f" : "",
410 msg->msg_onactivelist ? "!" : "",
411 msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
412 msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
415 LASSERT (msg->msg_onactivelist);
417 msg->msg_ev.status = status;
419 if (msg->msg_md != NULL) {
420 cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
423 lnet_msg_detach_md(msg, status);
424 lnet_res_unlock(cpt);
427 if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
428 /* not commited to network yet */
429 LASSERT(!msg->msg_onactivelist);
435 container = &the_lnet.ln_msg_container;
436 cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
438 /* Recursion breaker. Don't complete the message here if I am (or
439 * enough other threads are) already completing messages */
443 for (i = 0; i < container->msc_nfinalizers; i++) {
444 if (container->msc_finalizers[i] == cfs_current())
447 if (my_slot < 0 && container->msc_finalizers[i] == NULL)
454 container->msc_finalizers[my_slot] = cfs_current();
456 LASSERT(container->msc_nfinalizers == 1);
457 if (container->msc_finalizers[0] != NULL)
461 container->msc_finalizers[0] = (struct lnet_msg_container *)1;
464 while (!cfs_list_empty(&container->msc_finalizing)) {
465 msg = cfs_list_entry(container->msc_finalizing.next,
466 lnet_msg_t, msg_list);
468 cfs_list_del(&msg->msg_list);
470 /* NB drops and regains the lnet lock if it actually does
471 * anything, so my finalizing friends can chomp along too */
472 lnet_complete_msg_locked(msg);
475 container->msc_finalizers[my_slot] = NULL;
481 lnet_msg_container_cleanup(struct lnet_msg_container *container)
485 if (container->msc_init == 0)
488 while (!cfs_list_empty(&container->msc_active)) {
489 lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
490 lnet_msg_t, msg_activelist);
492 LASSERT(msg->msg_onactivelist);
493 msg->msg_onactivelist = 0;
494 cfs_list_del(&msg->msg_activelist);
500 CERROR("%d active msg on exit\n", count);
502 if (container->msc_finalizers != NULL) {
503 LIBCFS_FREE(container->msc_finalizers,
504 container->msc_nfinalizers *
505 sizeof(*container->msc_finalizers));
506 container->msc_finalizers = NULL;
508 #ifdef LNET_USE_LIB_FREELIST
509 lnet_freelist_fini(&container->msc_freelist);
511 container->msc_init = 0;
515 lnet_msg_container_setup(struct lnet_msg_container *container)
519 container->msc_init = 1;
521 CFS_INIT_LIST_HEAD(&container->msc_active);
522 CFS_INIT_LIST_HEAD(&container->msc_finalizing);
524 #ifdef LNET_USE_LIB_FREELIST
525 memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
527 rc = lnet_freelist_init(&container->msc_freelist,
528 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
530 CERROR("Failed to init freelist for message container\n");
531 lnet_msg_container_cleanup(container);
538 container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table,
540 LIBCFS_ALLOC(container->msc_finalizers,
541 container->msc_nfinalizers *
542 sizeof(*container->msc_finalizers));
544 if (container->msc_finalizers == NULL) {
545 CERROR("Failed to allocate message finalizers\n");
546 lnet_msg_container_cleanup(container);