1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * This file is confidential source code owned by Cluster File Systems.
11 * No viewing, modification, compilation, redistribution, or any other
12 * form of use is permitted except through a signed license agreement.
14 * If you have not signed such an agreement, then you have no rights to
15 * this file. Please destroy it immediately and contact CFS.
22 kptllnd_free_tx(kptl_tx_t *tx)
24 if (tx->tx_msg != NULL)
25 LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
27 if (tx->tx_frags != NULL)
28 LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
30 LIBCFS_FREE(tx, sizeof(*tx));
32 atomic_dec(&kptllnd_data.kptl_ntx);
34 /* Keep the tunable in step for visibility */
35 *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
39 kptllnd_alloc_tx(void)
43 LIBCFS_ALLOC(tx, sizeof(*tx));
45 CERROR("Failed to allocate TX\n");
49 atomic_inc(&kptllnd_data.kptl_ntx);
51 /* Keep the tunable in step for visibility */
52 *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
55 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
56 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
57 tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
58 tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
62 LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
63 if (tx->tx_msg == NULL) {
64 CERROR("Failed to allocate TX payload\n");
68 LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
69 if (tx->tx_frags == NULL) {
70 CERROR("Failed to allocate TX frags\n");
82 kptllnd_setup_tx_descs()
84 int n = *kptllnd_tunables.kptl_ntx;
87 for (i = 0; i < n; i++) {
88 kptl_tx_t *tx = kptllnd_alloc_tx();
93 spin_lock(&kptllnd_data.kptl_tx_lock);
95 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
97 spin_unlock(&kptllnd_data.kptl_tx_lock);
104 kptllnd_cleanup_tx_descs()
108 /* No locking; single threaded now */
109 LASSERT (kptllnd_data.kptl_shutdown == 2);
111 while (!list_empty(&kptllnd_data.kptl_idle_txs)) {
112 tx = list_entry(kptllnd_data.kptl_idle_txs.next,
115 list_del(&tx->tx_list);
119 LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
123 kptllnd_get_idle_tx(enum kptl_tx_type type)
125 kptl_tx_t *tx = NULL;
127 if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) &&
128 type == TX_TYPE_PUT_REQUEST) {
129 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
133 if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) &&
134 type == TX_TYPE_GET_REQUEST) {
135 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
139 if (IS_SIMULATION_ENABLED(FAIL_TX)) {
140 CERROR ("FAIL_TX SIMULATION triggered\n");
144 spin_lock(&kptllnd_data.kptl_tx_lock);
146 if (list_empty (&kptllnd_data.kptl_idle_txs)) {
147 spin_unlock(&kptllnd_data.kptl_tx_lock);
149 tx = kptllnd_alloc_tx();
153 tx = list_entry(kptllnd_data.kptl_idle_txs.next,
155 list_del(&tx->tx_list);
157 spin_unlock(&kptllnd_data.kptl_tx_lock);
160 LASSERT (atomic_read(&tx->tx_refcount)== 0);
161 LASSERT (tx->tx_idle);
162 LASSERT (!tx->tx_active);
163 LASSERT (tx->tx_lnet_msg == NULL);
164 LASSERT (tx->tx_lnet_replymsg == NULL);
165 LASSERT (tx->tx_peer == NULL);
166 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
167 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
170 atomic_set(&tx->tx_refcount, 1);
174 tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
176 CDEBUG(D_NET, "tx=%p\n", tx);
180 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
182 kptllnd_tx_abort_netio(kptl_tx_t *tx)
184 kptl_peer_t *peer = tx->tx_peer;
185 ptl_handle_md_t msg_mdh;
186 ptl_handle_md_t rdma_mdh;
189 LASSERT (atomic_read(&tx->tx_refcount) == 0);
190 LASSERT (!tx->tx_active);
192 spin_lock_irqsave(&peer->peer_lock, flags);
194 msg_mdh = tx->tx_msg_mdh;
195 rdma_mdh = tx->tx_rdma_mdh;
197 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
198 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
199 spin_unlock_irqrestore(&peer->peer_lock, flags);
203 /* Uncompleted comms: there must have been some error and it must be
204 * propagated to LNET... */
205 LASSERT (tx->tx_status != 0 ||
206 (tx->tx_lnet_msg == NULL &&
207 tx->tx_lnet_replymsg == NULL));
209 /* stash the tx on its peer until it completes */
210 atomic_set(&tx->tx_refcount, 1);
212 list_add_tail(&tx->tx_list, &peer->peer_activeq);
214 spin_unlock_irqrestore(&peer->peer_lock, flags);
216 /* These unlinks will ensure completion events (normal or unlink) will
219 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
220 PtlMDUnlink(msg_mdh);
222 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
223 PtlMDUnlink(rdma_mdh);
229 kptllnd_tx_abort_netio(kptl_tx_t *tx)
231 ptl_peer_t *peer = tx->tx_peer;
232 ptl_handle_md_t msg_mdh;
233 ptl_handle_md_t rdma_mdh;
237 LASSERT (atomic_read(&tx->tx_refcount) == 0);
238 LASSERT (!tx->tx_active);
240 spin_lock_irqsave(&peer->peer_lock, flags);
242 msg_mdh = tx->tx_msg_mdh;
243 rdma_mdh = tx->tx_rdma_mdh;
245 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
246 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
247 spin_unlock_irqrestore(&peer->peer_lock, flags);
251 /* Uncompleted comms: there must have been some error and it must be
252 * propagated to LNET... */
253 LASSERT (tx->tx_status != 0 ||
254 (tx->tx_lnet_msg == NULL &&
255 tx->tx_replymsg == NULL));
257 spin_unlock_irqrestore(&peer->peer_lock, flags);
259 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
260 prc = PtlMDUnlink(msg_mdh);
262 msg_mdh = PTL_INVALID_HANDLE;
265 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
266 prc = PtlMDUnlink(rdma_mdh);
268 rdma_mdh = PTL_INVALID_HANDLE;
271 spin_lock_irqsave(&peer->peer_lock, flags);
273 /* update tx_???_mdh if callback hasn't fired */
274 if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
275 msg_mdh = PTL_INVALID_HANDLE;
277 tx->tx_msg_mdh = msg_mdh;
279 if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
280 rdma_mdh = PTL_INVALID_HANDLE;
282 tx->tx_rdma_mdh = rdma_mdh;
284 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
285 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
286 spin_unlock_irqrestore(&peer->peer_lock, flags);
290 /* stash the tx on its peer until it completes */
291 atomic_set(&tx->tx_refcount, 1);
293 list_add_tail(&tx->tx_list, &peer->peer_activeq);
295 kptllnd_peer_addref(peer); /* extra ref for me... */
297 spin_unlock_irqrestore(&peer->peer_lock, flags);
299 /* This will get the watchdog thread to try aborting all the peer's
300 * comms again. NB, this deems it fair that 1 failing tx which can't
301 * be aborted immediately (i.e. its MDs are still busy) is valid cause
302 * to nuke everything to the same peer! */
303 kptllnd_peer_close(peer, tx->tx_status);
305 kptllnd_peer_decref(peer);
312 kptllnd_tx_fini (kptl_tx_t *tx)
314 lnet_msg_t *replymsg = tx->tx_lnet_replymsg;
315 lnet_msg_t *msg = tx->tx_lnet_msg;
316 kptl_peer_t *peer = tx->tx_peer;
317 int status = tx->tx_status;
320 LASSERT (!in_interrupt());
321 LASSERT (atomic_read(&tx->tx_refcount) == 0);
322 LASSERT (!tx->tx_idle);
323 LASSERT (!tx->tx_active);
325 /* TX has completed or failed */
328 rc = kptllnd_tx_abort_netio(tx);
333 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
334 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
336 tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
340 spin_lock(&kptllnd_data.kptl_tx_lock);
341 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
342 spin_unlock(&kptllnd_data.kptl_tx_lock);
344 /* Must finalize AFTER freeing 'tx' */
346 lnet_finalize(kptllnd_data.kptl_ni, msg,
347 (replymsg == NULL) ? status : 0);
349 if (replymsg != NULL)
350 lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);
353 kptllnd_peer_decref(peer);
357 kptllnd_tx_typestr(int type)
361 return "<TYPE UNKNOWN>";
363 case TX_TYPE_SMALL_MESSAGE:
366 case TX_TYPE_PUT_REQUEST:
369 case TX_TYPE_GET_REQUEST:
373 case TX_TYPE_PUT_RESPONSE:
377 case TX_TYPE_GET_RESPONSE:
383 kptllnd_tx_callback(ptl_event_t *ev)
385 kptl_eventarg_t *eva = ev->md.user_ptr;
386 int ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
387 kptl_tx_t *tx = kptllnd_eventarg2obj(eva);
388 kptl_peer_t *peer = tx->tx_peer;
389 int ok = (ev->ni_fail_type == PTL_OK);
393 LASSERT (peer != NULL);
394 LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
395 eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
396 LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
397 LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
399 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
400 unlinked = ev->unlinked;
402 unlinked = (ev->type == PTL_EVENT_UNLINK);
404 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
405 libcfs_id2str(peer->peer_id), peer->peer_credits,
406 peer->peer_outstanding_credits, peer->peer_sent_credits,
407 kptllnd_evtype2str(ev->type), ev->type,
408 tx, kptllnd_errtype2str(ev->ni_fail_type),
409 ev->ni_fail_type, unlinked);
411 switch (tx->tx_type) {
415 case TX_TYPE_SMALL_MESSAGE:
417 LASSERT (ev->type == PTL_EVENT_UNLINK ||
418 ev->type == PTL_EVENT_SEND_END ||
419 (ev->type == PTL_EVENT_ACK && tx->tx_acked));
422 case TX_TYPE_PUT_REQUEST:
423 LASSERT (ev->type == PTL_EVENT_UNLINK ||
424 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
425 (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
426 (!ismsg && ev->type == PTL_EVENT_GET_END));
429 case TX_TYPE_GET_REQUEST:
430 LASSERT (ev->type == PTL_EVENT_UNLINK ||
431 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
432 (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
433 (!ismsg && ev->type == PTL_EVENT_PUT_END));
435 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
436 if (ev->hdr_data == PTLLND_RDMA_OK) {
437 lnet_set_reply_msg_len(
438 kptllnd_data.kptl_ni,
439 tx->tx_lnet_replymsg,
442 /* no match at peer */
443 tx->tx_status = -EIO;
448 case TX_TYPE_PUT_RESPONSE:
450 LASSERT (ev->type == PTL_EVENT_UNLINK ||
451 ev->type == PTL_EVENT_SEND_END ||
452 ev->type == PTL_EVENT_REPLY_END);
455 case TX_TYPE_GET_RESPONSE:
457 LASSERT (ev->type == PTL_EVENT_UNLINK ||
458 ev->type == PTL_EVENT_SEND_END ||
459 (ev->type == PTL_EVENT_ACK && tx->tx_acked));
464 kptllnd_peer_alive(peer);
466 CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
467 libcfs_id2str(peer->peer_id),
468 kptllnd_evtype2str(ev->type), ev->type,
469 tx, kptllnd_errtype2str(ev->ni_fail_type),
470 ev->ni_fail_type, unlinked);
471 tx->tx_status = -EIO;
472 kptllnd_peer_close(peer, -EIO);
478 spin_lock_irqsave(&peer->peer_lock, flags);
481 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
483 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
485 if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
486 !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
488 spin_unlock_irqrestore(&peer->peer_lock, flags);
492 list_del(&tx->tx_list);
495 spin_unlock_irqrestore(&peer->peer_lock, flags);
497 /* drop peer's ref, but if it was the last one... */
498 if (atomic_dec_and_test(&tx->tx_refcount)) {
499 /* ...finalize it in thread context! */
500 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
502 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
503 wake_up(&kptllnd_data.kptl_sched_waitq);
505 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);