1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * This file is confidential source code owned by Cluster File Systems.
11 * No viewing, modification, compilation, redistribution, or any other
12 * form of use is permitted except through a signed license agreement.
14 * If you have not signed such an agreement, then you have no rights to
15 * this file. Please destroy it immediately and contact CFS.
22 kptllnd_free_tx(kptl_tx_t *tx)
24 if (tx->tx_msg != NULL)
25 LIBCFS_FREE(tx->tx_msg,
26 *kptllnd_tunables.kptl_max_msg_size);
28 if (tx->tx_rdma_frags != NULL)
29 LIBCFS_FREE(tx->tx_rdma_frags,
30 sizeof(*tx->tx_rdma_frags));
32 LIBCFS_FREE(tx, sizeof(*tx));
34 atomic_dec(&kptllnd_data.kptl_ntx);
36 /* Keep the tunable in step for visibility */
37 *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
41 kptllnd_alloc_tx(void)
45 LIBCFS_ALLOC(tx, sizeof(*tx));
47 CERROR("Failed to allocate TX\n");
51 atomic_inc(&kptllnd_data.kptl_ntx);
53 /* Keep the tunable in step for visibility */
54 *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
57 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
58 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
59 tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
60 tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
62 tx->tx_rdma_frags = NULL;
64 LIBCFS_ALLOC(tx->tx_msg, *kptllnd_tunables.kptl_max_msg_size);
65 if (tx->tx_msg == NULL) {
66 CERROR("Failed to allocate TX payload\n");
70 LIBCFS_ALLOC(tx->tx_rdma_frags, sizeof(*tx->tx_rdma_frags));
71 if (tx->tx_rdma_frags == NULL) {
72 CERROR("Failed to allocate TX frags\n");
84 kptllnd_setup_tx_descs()
86 int n = *kptllnd_tunables.kptl_ntx;
89 for (i = 0; i < n; i++) {
90 kptl_tx_t *tx = kptllnd_alloc_tx();
95 spin_lock(&kptllnd_data.kptl_tx_lock);
97 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
99 spin_unlock(&kptllnd_data.kptl_tx_lock);
106 kptllnd_cleanup_tx_descs()
110 /* No locking; single threaded now */
111 LASSERT (kptllnd_data.kptl_shutdown == 2);
113 while (!list_empty(&kptllnd_data.kptl_idle_txs)) {
114 tx = list_entry(kptllnd_data.kptl_idle_txs.next,
117 list_del(&tx->tx_list);
121 LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
125 kptllnd_get_idle_tx(enum kptl_tx_type type)
127 kptl_tx_t *tx = NULL;
129 if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) &&
130 type == TX_TYPE_PUT_REQUEST) {
131 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
135 if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) &&
136 type == TX_TYPE_GET_REQUEST) {
137 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
141 if (IS_SIMULATION_ENABLED(FAIL_TX)) {
142 CERROR ("FAIL_TX SIMULATION triggered\n");
146 spin_lock(&kptllnd_data.kptl_tx_lock);
148 if (list_empty (&kptllnd_data.kptl_idle_txs)) {
149 spin_unlock(&kptllnd_data.kptl_tx_lock);
151 tx = kptllnd_alloc_tx();
155 tx = list_entry(kptllnd_data.kptl_idle_txs.next,
157 list_del(&tx->tx_list);
159 spin_unlock(&kptllnd_data.kptl_tx_lock);
162 LASSERT (atomic_read(&tx->tx_refcount)== 0);
163 LASSERT (tx->tx_idle);
164 LASSERT (!tx->tx_active);
165 LASSERT (tx->tx_lnet_msg == NULL);
166 LASSERT (tx->tx_lnet_replymsg == NULL);
167 LASSERT (tx->tx_peer == NULL);
168 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
169 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
172 atomic_set(&tx->tx_refcount, 1);
176 CDEBUG(D_NET, "tx=%p\n", tx);
180 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
182 kptllnd_tx_abort_netio(kptl_tx_t *tx)
184 kptl_peer_t *peer = tx->tx_peer;
185 ptl_handle_md_t msg_mdh;
186 ptl_handle_md_t rdma_mdh;
189 LASSERT (atomic_read(&tx->tx_refcount) == 0);
190 LASSERT (!tx->tx_active);
192 spin_lock_irqsave(&peer->peer_lock, flags);
194 msg_mdh = tx->tx_msg_mdh;
195 rdma_mdh = tx->tx_rdma_mdh;
197 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
198 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
199 spin_unlock_irqrestore(&peer->peer_lock, flags);
203 /* Uncompleted comms: there must have been some error and it must be
204 * propagated to LNET... */
205 LASSERT (tx->tx_status != 0 ||
206 (tx->tx_lnet_msg == NULL &&
207 tx->tx_lnet_replymsg == NULL));
209 /* stash the tx on its peer until it completes */
210 atomic_set(&tx->tx_refcount, 1);
212 list_add_tail(&tx->tx_list, &peer->peer_activeq);
214 spin_unlock_irqrestore(&peer->peer_lock, flags);
216 /* These unlinks will ensure completion events (normal or unlink) will
219 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
220 PtlMDUnlink(msg_mdh);
222 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
223 PtlMDUnlink(rdma_mdh);
229 kptllnd_tx_abort_netio(kptl_tx_t *tx)
231 ptl_peer_t *peer = tx->tx_peer;
232 ptl_handle_md_t msg_mdh;
233 ptl_handle_md_t rdma_mdh;
237 LASSERT (atomic_read(&tx->tx_refcount) == 0);
238 LASSERT (!tx->tx_active);
240 spin_lock_irqsave(&peer->peer_lock, flags);
242 msg_mdh = tx->tx_msg_mdh;
243 rdma_mdh = tx->tx_rdma_mdh;
245 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
246 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
247 spin_unlock_irqrestore(&peer->peer_lock, flags);
251 /* Uncompleted comms: there must have been some error and it must be
252 * propagated to LNET... */
253 LASSERT (tx->tx_status != 0 ||
254 (tx->tx_lnet_msg == NULL &&
255 tx->tx_replymsg == NULL));
257 spin_unlock_irqrestore(&peer->peer_lock, flags);
259 if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
260 prc = PtlMDUnlink(msg_mdh);
262 msg_mdh = PTL_INVALID_HANDLE;
265 if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
266 prc = PtlMDUnlink(rdma_mdh);
268 rdma_mdh = PTL_INVALID_HANDLE;
271 spin_lock_irqsave(&peer->peer_lock, flags);
273 /* update tx_???_mdh if callback hasn't fired */
274 if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
275 msg_mdh = PTL_INVALID_HANDLE;
277 tx->tx_msg_mdh = msg_mdh;
279 if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
280 rdma_mdh = PTL_INVALID_HANDLE;
282 tx->tx_rdma_mdh = rdma_mdh;
284 if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
285 PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
286 spin_unlock_irqrestore(&peer->peer_lock, flags);
290 /* stash the tx on its peer until it completes */
291 atomic_set(&tx->tx_refcount, 1);
293 list_add_tail(&tx->tx_list, &peer->peer_activeq);
295 kptllnd_peer_addref(peer); /* extra ref for me... */
297 spin_unlock_irqrestore(&peer->peer_lock, flags);
299 /* This will get the watchdog thread to try aborting all the peer's
300 * comms again. NB, this deems it fair that 1 failing tx which can't
301 * be aborted immediately (i.e. its MDs are still busy) is valid cause
302 * to nuke everything to the same peer! */
303 kptllnd_peer_close(peer, tx->tx_status);
305 kptllnd_peer_decref(peer);
312 kptllnd_tx_fini (kptl_tx_t *tx)
314 lnet_msg_t *replymsg = tx->tx_lnet_replymsg;
315 lnet_msg_t *msg = tx->tx_lnet_msg;
316 kptl_peer_t *peer = tx->tx_peer;
317 int status = tx->tx_status;
320 LASSERT (!in_interrupt());
321 LASSERT (atomic_read(&tx->tx_refcount) == 0);
322 LASSERT (!tx->tx_idle);
323 LASSERT (!tx->tx_active);
325 /* TX has completed or failed */
328 rc = kptllnd_tx_abort_netio(tx);
333 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
334 LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
336 tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
340 spin_lock(&kptllnd_data.kptl_tx_lock);
341 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
342 spin_unlock(&kptllnd_data.kptl_tx_lock);
344 /* Must finalize AFTER freeing 'tx' */
346 lnet_finalize(kptllnd_data.kptl_ni, msg,
347 (replymsg == NULL) ? status : 0);
349 if (replymsg != NULL)
350 lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);
353 kptllnd_peer_decref(peer);
357 kptllnd_tx_typestr(int type)
361 return "<TYPE UNKNOWN>";
363 case TX_TYPE_SMALL_MESSAGE:
366 case TX_TYPE_PUT_REQUEST:
369 case TX_TYPE_GET_REQUEST:
373 case TX_TYPE_PUT_RESPONSE:
377 case TX_TYPE_GET_RESPONSE:
383 kptllnd_tx_callback(ptl_event_t *ev)
385 kptl_eventarg_t *eva = ev->md.user_ptr;
386 int ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
387 kptl_tx_t *tx = kptllnd_eventarg2obj(eva);
388 kptl_peer_t *peer = tx->tx_peer;
389 int ok = (ev->ni_fail_type == PTL_OK);
393 LASSERT (peer != NULL);
394 LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
395 eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
396 LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
397 LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
399 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
400 unlinked = ev->unlinked;
402 unlinked = (ev->type == PTL_EVENT_UNLINK);
404 CDEBUG(D_NET, "%s(%d) tx=%p(%s) fail=%d unlinked=%d\n",
405 kptllnd_evtype2str(ev->type), ev->type,
406 tx, libcfs_id2str(peer->peer_id),
407 ev->ni_fail_type, unlinked);
409 switch (tx->tx_type) {
413 case TX_TYPE_SMALL_MESSAGE:
415 LASSERT (ev->type == PTL_EVENT_UNLINK ||
416 ev->type == PTL_EVENT_SEND_END);
419 case TX_TYPE_PUT_REQUEST:
420 LASSERT (ev->type == PTL_EVENT_UNLINK ||
421 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
422 (!ismsg && ev->type == PTL_EVENT_GET_END));
425 case TX_TYPE_GET_REQUEST:
426 LASSERT (ev->type == PTL_EVENT_UNLINK ||
427 (ismsg && ev->type == PTL_EVENT_SEND_END) ||
428 (!ismsg && ev->type == PTL_EVENT_PUT_END));
430 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
431 if (ev->hdr_data == PTLLND_RDMA_OK) {
432 lnet_set_reply_msg_len(
433 kptllnd_data.kptl_ni,
434 tx->tx_lnet_replymsg,
437 /* no match at peer */
438 tx->tx_status = -EIO;
443 case TX_TYPE_PUT_RESPONSE:
445 LASSERT (ev->type == PTL_EVENT_UNLINK ||
446 ev->type == PTL_EVENT_SEND_END ||
447 ev->type == PTL_EVENT_REPLY_END);
450 case TX_TYPE_GET_RESPONSE:
452 LASSERT (ev->type == PTL_EVENT_UNLINK ||
453 ev->type == PTL_EVENT_SEND_END);
458 kptllnd_peer_close(peer, -EIO);
460 kptllnd_peer_alive(peer);
465 spin_lock_irqsave(&peer->peer_lock, flags);
468 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
470 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
472 if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
473 !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
475 spin_unlock_irqrestore(&peer->peer_lock, flags);
479 list_del(&tx->tx_list);
482 spin_unlock_irqrestore(&peer->peer_lock, flags);
484 /* drop peer's ref, but if it was the last one... */
485 if (atomic_dec_and_test(&tx->tx_refcount)) {
486 /* ...finalize it in thread context! */
487 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
489 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
490 wake_up(&kptllnd_data.kptl_sched_waitq);
492 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);