4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/klnds/ptllnd/ptllnd_tx.c
36 * Author: PJ Kirner <pjkirner@clusterfs.com>
42 kptllnd_free_tx(kptl_tx_t *tx)
44 if (tx->tx_msg != NULL)
45 LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
47 if (tx->tx_frags != NULL)
48 LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
50 LIBCFS_FREE(tx, sizeof(*tx));
52 cfs_atomic_dec(&kptllnd_data.kptl_ntx);
54 /* Keep the tunable in step for visibility */
55 *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
59 kptllnd_alloc_tx(void)
63 LIBCFS_ALLOC(tx, sizeof(*tx));
65 CERROR("Failed to allocate TX\n");
69 cfs_atomic_inc(&kptllnd_data.kptl_ntx);
71 /* Keep the tunable in step for visibility */
72 *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
75 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
76 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
77 tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
78 tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
83 LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
84 if (tx->tx_msg == NULL) {
85 CERROR("Failed to allocate TX payload\n");
89 LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
90 if (tx->tx_frags == NULL) {
91 CERROR("Failed to allocate TX frags\n");
103 kptllnd_setup_tx_descs()
105 int n = *kptllnd_tunables.kptl_ntx;
108 for (i = 0; i < n; i++) {
109 kptl_tx_t *tx = kptllnd_alloc_tx();
113 cfs_spin_lock(&kptllnd_data.kptl_tx_lock);
114 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
115 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
122 kptllnd_cleanup_tx_descs()
126 /* No locking; single threaded now */
127 LASSERT (kptllnd_data.kptl_shutdown == 2);
129 while (!cfs_list_empty(&kptllnd_data.kptl_idle_txs)) {
130 tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next,
133 cfs_list_del(&tx->tx_list);
137 LASSERT (cfs_atomic_read(&kptllnd_data.kptl_ntx) == 0);
141 kptllnd_get_idle_tx(enum kptl_tx_type type)
143 kptl_tx_t *tx = NULL;
145 if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) &&
146 type == TX_TYPE_PUT_REQUEST) {
147 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
151 if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) &&
152 type == TX_TYPE_GET_REQUEST) {
153 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
157 if (IS_SIMULATION_ENABLED(FAIL_TX)) {
158 CERROR ("FAIL_TX SIMULATION triggered\n");
162 cfs_spin_lock(&kptllnd_data.kptl_tx_lock);
164 if (cfs_list_empty (&kptllnd_data.kptl_idle_txs)) {
165 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
167 tx = kptllnd_alloc_tx();
171 tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next,
173 cfs_list_del(&tx->tx_list);
175 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
178 LASSERT (cfs_atomic_read(&tx->tx_refcount)== 0);
179 LASSERT (tx->tx_idle);
180 LASSERT (!tx->tx_active);
181 LASSERT (tx->tx_lnet_msg == NULL);
182 LASSERT (tx->tx_lnet_replymsg == NULL);
183 LASSERT (tx->tx_peer == NULL);
184 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
185 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
188 cfs_atomic_set(&tx->tx_refcount, 1);
192 tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
194 CDEBUG(D_NET, "tx=%p\n", tx);
198 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
200 kptllnd_tx_abort_netio(kptl_tx_t *tx)
202 kptl_peer_t *peer = tx->tx_peer;
203 ptl_handle_md_t msg_mdh;
204 ptl_handle_md_t rdma_mdh;
207 LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
208 LASSERT (!tx->tx_active);
210 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
212 msg_mdh = tx->tx_msg_mdh;
213 rdma_mdh = tx->tx_rdma_mdh;
215 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
216 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
217 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
221 /* Uncompleted comms: there must have been some error and it must be
222 * propagated to LNET... */
223 LASSERT (tx->tx_status != 0 ||
224 (tx->tx_lnet_msg == NULL &&
225 tx->tx_lnet_replymsg == NULL));
227 /* stash the tx on its peer until it completes */
228 cfs_atomic_set(&tx->tx_refcount, 1);
230 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
232 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
234 /* These unlinks will ensure completion events (normal or unlink) will
237 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
238 PtlMDUnlink(msg_mdh);
240 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
241 PtlMDUnlink(rdma_mdh);
247 kptllnd_tx_abort_netio(kptl_tx_t *tx)
249 ptl_peer_t *peer = tx->tx_peer;
250 ptl_handle_md_t msg_mdh;
251 ptl_handle_md_t rdma_mdh;
255 LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
256 LASSERT (!tx->tx_active);
258 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
260 msg_mdh = tx->tx_msg_mdh;
261 rdma_mdh = tx->tx_rdma_mdh;
263 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
264 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
265 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
269 /* Uncompleted comms: there must have been some error and it must be
270 * propagated to LNET... */
271 LASSERT (tx->tx_status != 0 ||
272 (tx->tx_lnet_msg == NULL &&
273 tx->tx_replymsg == NULL));
275 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
277 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
278 prc = PtlMDUnlink(msg_mdh);
280 msg_mdh = PTL_INVALID_HANDLE;
283 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
284 prc = PtlMDUnlink(rdma_mdh);
286 rdma_mdh = PTL_INVALID_HANDLE;
289 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
291 /* update tx_???_mdh if callback hasn't fired */
292 if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
293 msg_mdh = PTL_INVALID_HANDLE;
295 tx->tx_msg_mdh = msg_mdh;
297 if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
298 rdma_mdh = PTL_INVALID_HANDLE;
300 tx->tx_rdma_mdh = rdma_mdh;
302 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
303 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
304 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
308 /* stash the tx on its peer until it completes */
309 cfs_atomic_set(&tx->tx_refcount, 1);
311 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
313 kptllnd_peer_addref(peer); /* extra ref for me... */
315 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
317 /* This will get the watchdog thread to try aborting all the peer's
318 * comms again. NB, this deems it fair that 1 failing tx which can't
319 * be aborted immediately (i.e. its MDs are still busy) is valid cause
320 * to nuke everything to the same peer! */
321 kptllnd_peer_close(peer, tx->tx_status);
323 kptllnd_peer_decref(peer);
330 kptllnd_tx_fini (kptl_tx_t *tx)
332 lnet_msg_t *replymsg = tx->tx_lnet_replymsg;
333 lnet_msg_t *msg = tx->tx_lnet_msg;
334 kptl_peer_t *peer = tx->tx_peer;
335 int status = tx->tx_status;
338 LASSERT (!cfs_in_interrupt());
339 LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
340 LASSERT (!tx->tx_idle);
341 LASSERT (!tx->tx_active);
343 /* TX has completed or failed */
346 rc = kptllnd_tx_abort_netio(tx);
351 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
352 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
354 tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
358 cfs_spin_lock(&kptllnd_data.kptl_tx_lock);
359 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
360 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
362 /* Must finalize AFTER freeing 'tx' */
364 lnet_finalize(NULL, msg, (replymsg == NULL) ? status : 0);
366 if (replymsg != NULL)
367 lnet_finalize(NULL, replymsg, status);
370 kptllnd_peer_decref(peer);
374 kptllnd_tx_typestr(int type)
378 return "<TYPE UNKNOWN>";
380 case TX_TYPE_SMALL_MESSAGE:
383 case TX_TYPE_PUT_REQUEST:
386 case TX_TYPE_GET_REQUEST:
390 case TX_TYPE_PUT_RESPONSE:
394 case TX_TYPE_GET_RESPONSE:
400 kptllnd_tx_callback(ptl_event_t *ev)
402 kptl_eventarg_t *eva = ev->md.user_ptr;
403 int ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
404 kptl_tx_t *tx = kptllnd_eventarg2obj(eva);
405 kptl_peer_t *peer = tx->tx_peer;
406 int ok = (ev->ni_fail_type == PTL_OK);
410 LASSERT (peer != NULL);
411 LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
412 eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
413 LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
414 LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
416 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
417 unlinked = ev->unlinked;
419 unlinked = (ev->type == PTL_EVENT_UNLINK);
421 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
422 libcfs_id2str(peer->peer_id), peer->peer_credits,
423 peer->peer_outstanding_credits, peer->peer_sent_credits,
424 kptllnd_evtype2str(ev->type), ev->type,
425 tx, kptllnd_errtype2str(ev->ni_fail_type),
426 ev->ni_fail_type, unlinked);
428 switch (tx->tx_type) {
432 case TX_TYPE_SMALL_MESSAGE:
434 LASSERT (ev->type == PTL_EVENT_UNLINK ||
435 ev->type == PTL_EVENT_SEND_END ||
436 (ev->type == PTL_EVENT_ACK && tx->tx_acked));
439 case TX_TYPE_PUT_REQUEST:
440 LASSERT (ev->type == PTL_EVENT_UNLINK ||
441 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
442 (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
443 (!ismsg && ev->type == PTL_EVENT_GET_END));
446 case TX_TYPE_GET_REQUEST:
447 LASSERT (ev->type == PTL_EVENT_UNLINK ||
448 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
449 (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
450 (!ismsg && ev->type == PTL_EVENT_PUT_END));
452 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
453 if (ev->hdr_data == PTLLND_RDMA_OK) {
454 lnet_set_reply_msg_len(NULL,
455 tx->tx_lnet_replymsg,
458 /* no match at peer */
459 tx->tx_status = -EIO;
464 case TX_TYPE_PUT_RESPONSE:
466 LASSERT (ev->type == PTL_EVENT_UNLINK ||
467 ev->type == PTL_EVENT_SEND_END ||
468 ev->type == PTL_EVENT_REPLY_END);
471 case TX_TYPE_GET_RESPONSE:
473 LASSERT (ev->type == PTL_EVENT_UNLINK ||
474 ev->type == PTL_EVENT_SEND_END ||
475 (ev->type == PTL_EVENT_ACK && tx->tx_acked));
480 kptllnd_peer_alive(peer);
482 CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
483 libcfs_id2str(peer->peer_id),
484 kptllnd_evtype2str(ev->type), ev->type,
485 tx, kptllnd_errtype2str(ev->ni_fail_type),
486 ev->ni_fail_type, unlinked);
487 tx->tx_status = -EIO;
488 kptllnd_peer_close(peer, -EIO);
494 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
497 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
499 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
501 if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
502 !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
504 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
508 cfs_list_del(&tx->tx_list);
511 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
513 /* drop peer's ref, but if it was the last one... */
514 if (cfs_atomic_dec_and_test(&tx->tx_refcount)) {
515 /* ...finalize it in thread context! */
516 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
518 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
519 cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq);
521 cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,