1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ptllnd/ptllnd_tx.c
38 * Author: PJ Kirner <pjkirner@clusterfs.com>
44 kptllnd_free_tx(kptl_tx_t *tx)
46 if (tx->tx_msg != NULL)
47 LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
49 if (tx->tx_frags != NULL)
50 LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
52 LIBCFS_FREE(tx, sizeof(*tx));
54 atomic_dec(&kptllnd_data.kptl_ntx);
56 /* Keep the tunable in step for visibility */
57 *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
61 kptllnd_alloc_tx(void)
65 LIBCFS_ALLOC(tx, sizeof(*tx));
67 CERROR("Failed to allocate TX\n");
71 atomic_inc(&kptllnd_data.kptl_ntx);
73 /* Keep the tunable in step for visibility */
74 *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
77 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
78 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
79 tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
80 tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
84 LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
85 if (tx->tx_msg == NULL) {
86 CERROR("Failed to allocate TX payload\n");
90 LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
91 if (tx->tx_frags == NULL) {
92 CERROR("Failed to allocate TX frags\n");
104 kptllnd_setup_tx_descs()
106 int n = *kptllnd_tunables.kptl_ntx;
109 for (i = 0; i < n; i++) {
110 kptl_tx_t *tx = kptllnd_alloc_tx();
115 spin_lock(&kptllnd_data.kptl_tx_lock);
117 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
119 spin_unlock(&kptllnd_data.kptl_tx_lock);
126 kptllnd_cleanup_tx_descs()
130 /* No locking; single threaded now */
131 LASSERT (kptllnd_data.kptl_shutdown == 2);
133 while (!list_empty(&kptllnd_data.kptl_idle_txs)) {
134 tx = list_entry(kptllnd_data.kptl_idle_txs.next,
137 list_del(&tx->tx_list);
141 LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
145 kptllnd_get_idle_tx(enum kptl_tx_type type)
147 kptl_tx_t *tx = NULL;
149 if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) &&
150 type == TX_TYPE_PUT_REQUEST) {
151 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
155 if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) &&
156 type == TX_TYPE_GET_REQUEST) {
157 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
161 if (IS_SIMULATION_ENABLED(FAIL_TX)) {
162 CERROR ("FAIL_TX SIMULATION triggered\n");
166 spin_lock(&kptllnd_data.kptl_tx_lock);
168 if (list_empty (&kptllnd_data.kptl_idle_txs)) {
169 spin_unlock(&kptllnd_data.kptl_tx_lock);
171 tx = kptllnd_alloc_tx();
175 tx = list_entry(kptllnd_data.kptl_idle_txs.next,
177 list_del(&tx->tx_list);
179 spin_unlock(&kptllnd_data.kptl_tx_lock);
182 LASSERT (atomic_read(&tx->tx_refcount)== 0);
183 LASSERT (tx->tx_idle);
184 LASSERT (!tx->tx_active);
185 LASSERT (tx->tx_lnet_msg == NULL);
186 LASSERT (tx->tx_lnet_replymsg == NULL);
187 LASSERT (tx->tx_peer == NULL);
188 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
189 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
192 atomic_set(&tx->tx_refcount, 1);
196 tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
198 CDEBUG(D_NET, "tx=%p\n", tx);
202 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
204 kptllnd_tx_abort_netio(kptl_tx_t *tx)
206 kptl_peer_t *peer = tx->tx_peer;
207 ptl_handle_md_t msg_mdh;
208 ptl_handle_md_t rdma_mdh;
211 LASSERT (atomic_read(&tx->tx_refcount) == 0);
212 LASSERT (!tx->tx_active);
214 spin_lock_irqsave(&peer->peer_lock, flags);
216 msg_mdh = tx->tx_msg_mdh;
217 rdma_mdh = tx->tx_rdma_mdh;
219 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
220 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
221 spin_unlock_irqrestore(&peer->peer_lock, flags);
225 /* Uncompleted comms: there must have been some error and it must be
226 * propagated to LNET... */
227 LASSERT (tx->tx_status != 0 ||
228 (tx->tx_lnet_msg == NULL &&
229 tx->tx_lnet_replymsg == NULL));
231 /* stash the tx on its peer until it completes */
232 atomic_set(&tx->tx_refcount, 1);
234 list_add_tail(&tx->tx_list, &peer->peer_activeq);
236 spin_unlock_irqrestore(&peer->peer_lock, flags);
238 /* These unlinks will ensure completion events (normal or unlink) will
241 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
242 PtlMDUnlink(msg_mdh);
244 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
245 PtlMDUnlink(rdma_mdh);
251 kptllnd_tx_abort_netio(kptl_tx_t *tx)
253 ptl_peer_t *peer = tx->tx_peer;
254 ptl_handle_md_t msg_mdh;
255 ptl_handle_md_t rdma_mdh;
259 LASSERT (atomic_read(&tx->tx_refcount) == 0);
260 LASSERT (!tx->tx_active);
262 spin_lock_irqsave(&peer->peer_lock, flags);
264 msg_mdh = tx->tx_msg_mdh;
265 rdma_mdh = tx->tx_rdma_mdh;
267 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
268 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
269 spin_unlock_irqrestore(&peer->peer_lock, flags);
273 /* Uncompleted comms: there must have been some error and it must be
274 * propagated to LNET... */
275 LASSERT (tx->tx_status != 0 ||
276 (tx->tx_lnet_msg == NULL &&
277 tx->tx_replymsg == NULL));
279 spin_unlock_irqrestore(&peer->peer_lock, flags);
281 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
282 prc = PtlMDUnlink(msg_mdh);
284 msg_mdh = PTL_INVALID_HANDLE;
287 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
288 prc = PtlMDUnlink(rdma_mdh);
290 rdma_mdh = PTL_INVALID_HANDLE;
293 spin_lock_irqsave(&peer->peer_lock, flags);
295 /* update tx_???_mdh if callback hasn't fired */
296 if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
297 msg_mdh = PTL_INVALID_HANDLE;
299 tx->tx_msg_mdh = msg_mdh;
301 if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
302 rdma_mdh = PTL_INVALID_HANDLE;
304 tx->tx_rdma_mdh = rdma_mdh;
306 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
307 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
308 spin_unlock_irqrestore(&peer->peer_lock, flags);
312 /* stash the tx on its peer until it completes */
313 atomic_set(&tx->tx_refcount, 1);
315 list_add_tail(&tx->tx_list, &peer->peer_activeq);
317 kptllnd_peer_addref(peer); /* extra ref for me... */
319 spin_unlock_irqrestore(&peer->peer_lock, flags);
321 /* This will get the watchdog thread to try aborting all the peer's
322 * comms again. NB, this deems it fair that 1 failing tx which can't
323 * be aborted immediately (i.e. its MDs are still busy) is valid cause
324 * to nuke everything to the same peer! */
325 kptllnd_peer_close(peer, tx->tx_status);
327 kptllnd_peer_decref(peer);
334 kptllnd_tx_fini (kptl_tx_t *tx)
336 lnet_msg_t *replymsg = tx->tx_lnet_replymsg;
337 lnet_msg_t *msg = tx->tx_lnet_msg;
338 kptl_peer_t *peer = tx->tx_peer;
339 int status = tx->tx_status;
342 LASSERT (!in_interrupt());
343 LASSERT (atomic_read(&tx->tx_refcount) == 0);
344 LASSERT (!tx->tx_idle);
345 LASSERT (!tx->tx_active);
347 /* TX has completed or failed */
350 rc = kptllnd_tx_abort_netio(tx);
355 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
356 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
358 tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
362 spin_lock(&kptllnd_data.kptl_tx_lock);
363 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
364 spin_unlock(&kptllnd_data.kptl_tx_lock);
366 /* Must finalize AFTER freeing 'tx' */
368 lnet_finalize(kptllnd_data.kptl_ni, msg,
369 (replymsg == NULL) ? status : 0);
371 if (replymsg != NULL)
372 lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);
375 kptllnd_peer_decref(peer);
379 kptllnd_tx_typestr(int type)
383 return "<TYPE UNKNOWN>";
385 case TX_TYPE_SMALL_MESSAGE:
388 case TX_TYPE_PUT_REQUEST:
391 case TX_TYPE_GET_REQUEST:
395 case TX_TYPE_PUT_RESPONSE:
399 case TX_TYPE_GET_RESPONSE:
405 kptllnd_tx_callback(ptl_event_t *ev)
407 kptl_eventarg_t *eva = ev->md.user_ptr;
408 int ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
409 kptl_tx_t *tx = kptllnd_eventarg2obj(eva);
410 kptl_peer_t *peer = tx->tx_peer;
411 int ok = (ev->ni_fail_type == PTL_OK);
415 LASSERT (peer != NULL);
416 LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
417 eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
418 LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
419 LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
421 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
422 unlinked = ev->unlinked;
424 unlinked = (ev->type == PTL_EVENT_UNLINK);
426 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
427 libcfs_id2str(peer->peer_id), peer->peer_credits,
428 peer->peer_outstanding_credits, peer->peer_sent_credits,
429 kptllnd_evtype2str(ev->type), ev->type,
430 tx, kptllnd_errtype2str(ev->ni_fail_type),
431 ev->ni_fail_type, unlinked);
433 switch (tx->tx_type) {
437 case TX_TYPE_SMALL_MESSAGE:
439 LASSERT (ev->type == PTL_EVENT_UNLINK ||
440 ev->type == PTL_EVENT_SEND_END ||
441 (ev->type == PTL_EVENT_ACK && tx->tx_acked));
444 case TX_TYPE_PUT_REQUEST:
445 LASSERT (ev->type == PTL_EVENT_UNLINK ||
446 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
447 (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
448 (!ismsg && ev->type == PTL_EVENT_GET_END));
451 case TX_TYPE_GET_REQUEST:
452 LASSERT (ev->type == PTL_EVENT_UNLINK ||
453 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
454 (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
455 (!ismsg && ev->type == PTL_EVENT_PUT_END));
457 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
458 if (ev->hdr_data == PTLLND_RDMA_OK) {
459 lnet_set_reply_msg_len(
460 kptllnd_data.kptl_ni,
461 tx->tx_lnet_replymsg,
464 /* no match at peer */
465 tx->tx_status = -EIO;
470 case TX_TYPE_PUT_RESPONSE:
472 LASSERT (ev->type == PTL_EVENT_UNLINK ||
473 ev->type == PTL_EVENT_SEND_END ||
474 ev->type == PTL_EVENT_REPLY_END);
477 case TX_TYPE_GET_RESPONSE:
479 LASSERT (ev->type == PTL_EVENT_UNLINK ||
480 ev->type == PTL_EVENT_SEND_END ||
481 (ev->type == PTL_EVENT_ACK && tx->tx_acked));
486 kptllnd_peer_alive(peer);
488 CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
489 libcfs_id2str(peer->peer_id),
490 kptllnd_evtype2str(ev->type), ev->type,
491 tx, kptllnd_errtype2str(ev->ni_fail_type),
492 ev->ni_fail_type, unlinked);
493 tx->tx_status = -EIO;
494 kptllnd_peer_close(peer, -EIO);
500 spin_lock_irqsave(&peer->peer_lock, flags);
503 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
505 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
507 if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
508 !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
510 spin_unlock_irqrestore(&peer->peer_lock, flags);
514 list_del(&tx->tx_list);
517 spin_unlock_irqrestore(&peer->peer_lock, flags);
519 /* drop peer's ref, but if it was the last one... */
520 if (atomic_dec_and_test(&tx->tx_refcount)) {
521 /* ...finalize it in thread context! */
522 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
524 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
525 wake_up(&kptllnd_data.kptl_sched_waitq);
527 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);