1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ptllnd/ptllnd_tx.c
38 * Author: PJ Kirner <pjkirner@clusterfs.com>
44 kptllnd_free_tx(kptl_tx_t *tx)
46 if (tx->tx_msg != NULL)
47 LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
49 if (tx->tx_frags != NULL)
50 LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
52 LIBCFS_FREE(tx, sizeof(*tx));
54 cfs_atomic_dec(&kptllnd_data.kptl_ntx);
56 /* Keep the tunable in step for visibility */
57 *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
61 kptllnd_alloc_tx(void)
65 LIBCFS_ALLOC(tx, sizeof(*tx));
67 CERROR("Failed to allocate TX\n");
71 cfs_atomic_inc(&kptllnd_data.kptl_ntx);
73 /* Keep the tunable in step for visibility */
74 *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
77 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
78 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
79 tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
80 tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
85 LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
86 if (tx->tx_msg == NULL) {
87 CERROR("Failed to allocate TX payload\n");
91 LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
92 if (tx->tx_frags == NULL) {
93 CERROR("Failed to allocate TX frags\n");
105 kptllnd_setup_tx_descs()
107 int n = *kptllnd_tunables.kptl_ntx;
110 for (i = 0; i < n; i++) {
111 kptl_tx_t *tx = kptllnd_alloc_tx();
115 cfs_spin_lock(&kptllnd_data.kptl_tx_lock);
116 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
117 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
124 kptllnd_cleanup_tx_descs()
128 /* No locking; single threaded now */
129 LASSERT (kptllnd_data.kptl_shutdown == 2);
131 while (!cfs_list_empty(&kptllnd_data.kptl_idle_txs)) {
132 tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next,
135 cfs_list_del(&tx->tx_list);
139 LASSERT (cfs_atomic_read(&kptllnd_data.kptl_ntx) == 0);
143 kptllnd_get_idle_tx(enum kptl_tx_type type)
145 kptl_tx_t *tx = NULL;
147 if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) &&
148 type == TX_TYPE_PUT_REQUEST) {
149 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
153 if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) &&
154 type == TX_TYPE_GET_REQUEST) {
155 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
159 if (IS_SIMULATION_ENABLED(FAIL_TX)) {
160 CERROR ("FAIL_TX SIMULATION triggered\n");
164 cfs_spin_lock(&kptllnd_data.kptl_tx_lock);
166 if (cfs_list_empty (&kptllnd_data.kptl_idle_txs)) {
167 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
169 tx = kptllnd_alloc_tx();
173 tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next,
175 cfs_list_del(&tx->tx_list);
177 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
180 LASSERT (cfs_atomic_read(&tx->tx_refcount)== 0);
181 LASSERT (tx->tx_idle);
182 LASSERT (!tx->tx_active);
183 LASSERT (tx->tx_lnet_msg == NULL);
184 LASSERT (tx->tx_lnet_replymsg == NULL);
185 LASSERT (tx->tx_peer == NULL);
186 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
187 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
190 cfs_atomic_set(&tx->tx_refcount, 1);
194 tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
196 CDEBUG(D_NET, "tx=%p\n", tx);
200 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
202 kptllnd_tx_abort_netio(kptl_tx_t *tx)
204 kptl_peer_t *peer = tx->tx_peer;
205 ptl_handle_md_t msg_mdh;
206 ptl_handle_md_t rdma_mdh;
209 LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
210 LASSERT (!tx->tx_active);
212 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
214 msg_mdh = tx->tx_msg_mdh;
215 rdma_mdh = tx->tx_rdma_mdh;
217 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
218 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
219 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
223 /* Uncompleted comms: there must have been some error and it must be
224 * propagated to LNET... */
225 LASSERT (tx->tx_status != 0 ||
226 (tx->tx_lnet_msg == NULL &&
227 tx->tx_lnet_replymsg == NULL));
229 /* stash the tx on its peer until it completes */
230 cfs_atomic_set(&tx->tx_refcount, 1);
232 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
234 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
236 /* These unlinks will ensure completion events (normal or unlink) will
239 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
240 PtlMDUnlink(msg_mdh);
242 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
243 PtlMDUnlink(rdma_mdh);
249 kptllnd_tx_abort_netio(kptl_tx_t *tx)
251 ptl_peer_t *peer = tx->tx_peer;
252 ptl_handle_md_t msg_mdh;
253 ptl_handle_md_t rdma_mdh;
257 LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
258 LASSERT (!tx->tx_active);
260 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
262 msg_mdh = tx->tx_msg_mdh;
263 rdma_mdh = tx->tx_rdma_mdh;
265 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
266 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
267 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
271 /* Uncompleted comms: there must have been some error and it must be
272 * propagated to LNET... */
273 LASSERT (tx->tx_status != 0 ||
274 (tx->tx_lnet_msg == NULL &&
275 tx->tx_replymsg == NULL));
277 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
279 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
280 prc = PtlMDUnlink(msg_mdh);
282 msg_mdh = PTL_INVALID_HANDLE;
285 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
286 prc = PtlMDUnlink(rdma_mdh);
288 rdma_mdh = PTL_INVALID_HANDLE;
291 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
293 /* update tx_???_mdh if callback hasn't fired */
294 if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
295 msg_mdh = PTL_INVALID_HANDLE;
297 tx->tx_msg_mdh = msg_mdh;
299 if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
300 rdma_mdh = PTL_INVALID_HANDLE;
302 tx->tx_rdma_mdh = rdma_mdh;
304 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
305 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
306 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
310 /* stash the tx on its peer until it completes */
311 cfs_atomic_set(&tx->tx_refcount, 1);
313 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
315 kptllnd_peer_addref(peer); /* extra ref for me... */
317 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
319 /* This will get the watchdog thread to try aborting all the peer's
320 * comms again. NB, this deems it fair that 1 failing tx which can't
321 * be aborted immediately (i.e. its MDs are still busy) is valid cause
322 * to nuke everything to the same peer! */
323 kptllnd_peer_close(peer, tx->tx_status);
325 kptllnd_peer_decref(peer);
332 kptllnd_tx_fini (kptl_tx_t *tx)
334 lnet_msg_t *replymsg = tx->tx_lnet_replymsg;
335 lnet_msg_t *msg = tx->tx_lnet_msg;
336 kptl_peer_t *peer = tx->tx_peer;
337 int status = tx->tx_status;
340 LASSERT (!cfs_in_interrupt());
341 LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
342 LASSERT (!tx->tx_idle);
343 LASSERT (!tx->tx_active);
345 /* TX has completed or failed */
348 rc = kptllnd_tx_abort_netio(tx);
353 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
354 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
356 tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
360 cfs_spin_lock(&kptllnd_data.kptl_tx_lock);
361 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
362 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
364 /* Must finalize AFTER freeing 'tx' */
366 lnet_finalize(NULL, msg, (replymsg == NULL) ? status : 0);
368 if (replymsg != NULL)
369 lnet_finalize(NULL, replymsg, status);
372 kptllnd_peer_decref(peer);
376 kptllnd_tx_typestr(int type)
380 return "<TYPE UNKNOWN>";
382 case TX_TYPE_SMALL_MESSAGE:
385 case TX_TYPE_PUT_REQUEST:
388 case TX_TYPE_GET_REQUEST:
392 case TX_TYPE_PUT_RESPONSE:
396 case TX_TYPE_GET_RESPONSE:
402 kptllnd_tx_callback(ptl_event_t *ev)
404 kptl_eventarg_t *eva = ev->md.user_ptr;
405 int ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
406 kptl_tx_t *tx = kptllnd_eventarg2obj(eva);
407 kptl_peer_t *peer = tx->tx_peer;
408 int ok = (ev->ni_fail_type == PTL_OK);
412 LASSERT (peer != NULL);
413 LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
414 eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
415 LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
416 LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
418 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
419 unlinked = ev->unlinked;
421 unlinked = (ev->type == PTL_EVENT_UNLINK);
423 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
424 libcfs_id2str(peer->peer_id), peer->peer_credits,
425 peer->peer_outstanding_credits, peer->peer_sent_credits,
426 kptllnd_evtype2str(ev->type), ev->type,
427 tx, kptllnd_errtype2str(ev->ni_fail_type),
428 ev->ni_fail_type, unlinked);
430 switch (tx->tx_type) {
434 case TX_TYPE_SMALL_MESSAGE:
436 LASSERT (ev->type == PTL_EVENT_UNLINK ||
437 ev->type == PTL_EVENT_SEND_END ||
438 (ev->type == PTL_EVENT_ACK && tx->tx_acked));
441 case TX_TYPE_PUT_REQUEST:
442 LASSERT (ev->type == PTL_EVENT_UNLINK ||
443 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
444 (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
445 (!ismsg && ev->type == PTL_EVENT_GET_END));
448 case TX_TYPE_GET_REQUEST:
449 LASSERT (ev->type == PTL_EVENT_UNLINK ||
450 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
451 (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
452 (!ismsg && ev->type == PTL_EVENT_PUT_END));
454 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
455 if (ev->hdr_data == PTLLND_RDMA_OK) {
456 lnet_set_reply_msg_len(NULL,
457 tx->tx_lnet_replymsg,
460 /* no match at peer */
461 tx->tx_status = -EIO;
466 case TX_TYPE_PUT_RESPONSE:
468 LASSERT (ev->type == PTL_EVENT_UNLINK ||
469 ev->type == PTL_EVENT_SEND_END ||
470 ev->type == PTL_EVENT_REPLY_END);
473 case TX_TYPE_GET_RESPONSE:
475 LASSERT (ev->type == PTL_EVENT_UNLINK ||
476 ev->type == PTL_EVENT_SEND_END ||
477 (ev->type == PTL_EVENT_ACK && tx->tx_acked));
482 kptllnd_peer_alive(peer);
484 CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
485 libcfs_id2str(peer->peer_id),
486 kptllnd_evtype2str(ev->type), ev->type,
487 tx, kptllnd_errtype2str(ev->ni_fail_type),
488 ev->ni_fail_type, unlinked);
489 tx->tx_status = -EIO;
490 kptllnd_peer_close(peer, -EIO);
491 kptllnd_schedule_ptltrace_dump();
497 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
500 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
502 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
504 if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
505 !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
507 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
511 cfs_list_del(&tx->tx_list);
514 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
516 /* drop peer's ref, but if it was the last one... */
517 if (cfs_atomic_dec_and_test(&tx->tx_refcount)) {
518 /* ...finalize it in thread context! */
519 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
521 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
522 cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq);
524 cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,