Whamcloud - gitweb
Severity : major
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_tx.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19  #include "ptllnd.h"
20
21 void
22 kptllnd_free_tx(kptl_tx_t *tx)
23 {
24         if (tx->tx_msg != NULL)
25                 LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
26                         
27         if (tx->tx_frags != NULL)
28                 LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
29
30         LIBCFS_FREE(tx, sizeof(*tx));
31
32         atomic_dec(&kptllnd_data.kptl_ntx);
33
34         /* Keep the tunable in step for visibility */
35         *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
36 }
37
38 kptl_tx_t *
39 kptllnd_alloc_tx(void)
40 {
41         kptl_tx_t       *tx;
42
43         LIBCFS_ALLOC(tx, sizeof(*tx));
44         if (tx == NULL) {
45                 CERROR("Failed to allocate TX\n");
46                 return NULL;
47         }
48
49         atomic_inc(&kptllnd_data.kptl_ntx);
50
51         /* Keep the tunable in step for visibility */
52         *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
53
54         tx->tx_idle = 1;
55         tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
56         tx->tx_msg_mdh = PTL_INVALID_HANDLE;
57         tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
58         tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
59         tx->tx_msg = NULL;
60         tx->tx_frags = NULL;
61                 
62         LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
63         if (tx->tx_msg == NULL) {
64                 CERROR("Failed to allocate TX payload\n");
65                 goto failed;
66         }
67
68         LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
69         if (tx->tx_frags == NULL) {
70                 CERROR("Failed to allocate TX frags\n");
71                 goto failed;
72         }
73
74         return tx;
75
76  failed:
77         kptllnd_free_tx(tx);
78         return NULL;
79 }
80
81 int
82 kptllnd_setup_tx_descs()
83 {
84         int       n = *kptllnd_tunables.kptl_ntx;
85         int       i;
86         
87         for (i = 0; i < n; i++) {
88                 kptl_tx_t *tx = kptllnd_alloc_tx();
89                 
90                 if (tx == NULL)
91                         return -ENOMEM;
92                 
93                 spin_lock(&kptllnd_data.kptl_tx_lock);
94                 
95                 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
96                 
97                 spin_unlock(&kptllnd_data.kptl_tx_lock);
98         }
99         
100         return 0;
101 }
102
103 void
104 kptllnd_cleanup_tx_descs()
105 {
106         kptl_tx_t       *tx;
107
108         /* No locking; single threaded now */
109         LASSERT (kptllnd_data.kptl_shutdown == 2);
110
111         while (!list_empty(&kptllnd_data.kptl_idle_txs)) {
112                 tx = list_entry(kptllnd_data.kptl_idle_txs.next,
113                                 kptl_tx_t, tx_list);
114                 
115                 list_del(&tx->tx_list);
116                 kptllnd_free_tx(tx);
117         }
118
119         LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
120 }
121
122 kptl_tx_t *
123 kptllnd_get_idle_tx(enum kptl_tx_type type)
124 {
125         kptl_tx_t      *tx = NULL;
126
127         if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) && 
128             type == TX_TYPE_PUT_REQUEST) {
129                 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
130                 return NULL;
131         }
132
133         if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) && 
134             type == TX_TYPE_GET_REQUEST) {
135                 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
136                 return NULL;
137         }
138
139         if (IS_SIMULATION_ENABLED(FAIL_TX)) {
140                 CERROR ("FAIL_TX SIMULATION triggered\n");
141                 return NULL;
142         }
143
144         spin_lock(&kptllnd_data.kptl_tx_lock);
145
146         if (list_empty (&kptllnd_data.kptl_idle_txs)) {
147                 spin_unlock(&kptllnd_data.kptl_tx_lock);
148
149                 tx = kptllnd_alloc_tx();
150                 if (tx == NULL)
151                         return NULL;
152         } else {
153                 tx = list_entry(kptllnd_data.kptl_idle_txs.next, 
154                                 kptl_tx_t, tx_list);
155                 list_del(&tx->tx_list);
156
157                 spin_unlock(&kptllnd_data.kptl_tx_lock);
158         }
159
160         LASSERT (atomic_read(&tx->tx_refcount)== 0);
161         LASSERT (tx->tx_idle);
162         LASSERT (!tx->tx_active);
163         LASSERT (tx->tx_lnet_msg == NULL);
164         LASSERT (tx->tx_lnet_replymsg == NULL);
165         LASSERT (tx->tx_peer == NULL);
166         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
167         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
168         
169         tx->tx_type = type;
170         atomic_set(&tx->tx_refcount, 1);
171         tx->tx_status = 0;
172         tx->tx_idle = 0;
173         tx->tx_tposted = 0;
174         tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
175
176         CDEBUG(D_NET, "tx=%p\n", tx);
177         return tx;
178 }
179
180 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
181 int
182 kptllnd_tx_abort_netio(kptl_tx_t *tx)
183 {
184         kptl_peer_t     *peer = tx->tx_peer;
185         ptl_handle_md_t  msg_mdh;
186         ptl_handle_md_t  rdma_mdh;
187         unsigned long    flags;
188
189         LASSERT (atomic_read(&tx->tx_refcount) == 0);
190         LASSERT (!tx->tx_active);
191
192         spin_lock_irqsave(&peer->peer_lock, flags);
193
194         msg_mdh = tx->tx_msg_mdh;
195         rdma_mdh = tx->tx_rdma_mdh;
196
197         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
198             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
199                 spin_unlock_irqrestore(&peer->peer_lock, flags);
200                 return 0;
201         }
202         
203         /* Uncompleted comms: there must have been some error and it must be
204          * propagated to LNET... */
205         LASSERT (tx->tx_status != 0 ||
206                  (tx->tx_lnet_msg == NULL && 
207                   tx->tx_lnet_replymsg == NULL));
208
209         /* stash the tx on its peer until it completes */
210         atomic_set(&tx->tx_refcount, 1);
211         tx->tx_active = 1;
212         list_add_tail(&tx->tx_list, &peer->peer_activeq);
213         
214         spin_unlock_irqrestore(&peer->peer_lock, flags);
215
216         /* These unlinks will ensure completion events (normal or unlink) will
217          * happen ASAP */
218
219         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
220                 PtlMDUnlink(msg_mdh);
221         
222         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
223                 PtlMDUnlink(rdma_mdh);
224
225         return -EAGAIN;
226 }
227 #else
228 int
229 kptllnd_tx_abort_netio(kptl_tx_t *tx)
230 {
231         ptl_peer_t      *peer = tx->tx_peer;
232         ptl_handle_md_t  msg_mdh;
233         ptl_handle_md_t  rdma_mdh;
234         unsigned long    flags;
235         ptl_err_t        prc;
236
237         LASSERT (atomic_read(&tx->tx_refcount) == 0);
238         LASSERT (!tx->tx_active);
239
240         spin_lock_irqsave(&peer->peer_lock, flags);
241
242         msg_mdh = tx->tx_msg_mdh;
243         rdma_mdh = tx->tx_rdma_mdh;
244
245         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
246             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
247                 spin_unlock_irqrestore(&peer->peer_lock, flags);
248                 return 0;
249         }
250         
251         /* Uncompleted comms: there must have been some error and it must be
252          * propagated to LNET... */
253         LASSERT (tx->tx_status != 0 ||
254                  (tx->tx_lnet_msg == NULL && 
255                   tx->tx_replymsg == NULL));
256
257         spin_unlock_irqrestore(&peer->peer_lock, flags);
258
259         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
260                 prc = PtlMDUnlink(msg_mdh);
261                 if (prc == PTL_OK)
262                         msg_mdh = PTL_INVALID_HANDLE;
263         }
264
265         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
266                 prc = PtlMDUnlink(rdma_mdh);
267                 if (prc == PTL_OK)
268                         rdma_mdh = PTL_INVALID_HANDLE;
269         }
270
271         spin_lock_irqsave(&peer->peer_lock, flags);
272
273         /* update tx_???_mdh if callback hasn't fired */
274         if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
275                 msg_mdh = PTL_INVALID_HANDLE;
276         else
277                 tx->tx_msg_mdh = msg_mdh;
278         
279         if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
280                 rdma_mdh = PTL_INVALID_HANDLE;
281         else
282                 tx->tx_rdma_mdh = rdma_mdh;
283
284         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
285             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
286                 spin_unlock_irqrestore(&peer->peer_lock, flags);
287                 return 0;
288         }
289
290         /* stash the tx on its peer until it completes */
291         atomic_set(&tx->tx_refcount, 1);
292         tx->tx_active = 1;
293         list_add_tail(&tx->tx_list, &peer->peer_activeq);
294
295         kptllnd_peer_addref(peer);              /* extra ref for me... */
296
297         spin_unlock_irqrestore(&peer->peer_lock, flags);
298
299         /* This will get the watchdog thread to try aborting all the peer's
300          * comms again.  NB, this deems it fair that 1 failing tx which can't
301          * be aborted immediately (i.e. its MDs are still busy) is valid cause
302          * to nuke everything to the same peer! */
303         kptllnd_peer_close(peer, tx->tx_status);
304
305         kptllnd_peer_decref(peer);
306
307         return -EAGAIN;
308 }
309 #endif
310
311 void
312 kptllnd_tx_fini (kptl_tx_t *tx)
313 {
314         lnet_msg_t     *replymsg = tx->tx_lnet_replymsg;
315         lnet_msg_t     *msg      = tx->tx_lnet_msg;
316         kptl_peer_t    *peer     = tx->tx_peer;
317         int             status   = tx->tx_status;
318         int             rc;
319
320         LASSERT (!in_interrupt());
321         LASSERT (atomic_read(&tx->tx_refcount) == 0);
322         LASSERT (!tx->tx_idle);
323         LASSERT (!tx->tx_active);
324
325         /* TX has completed or failed */
326
327         if (peer != NULL) {
328                 rc = kptllnd_tx_abort_netio(tx);
329                 if (rc != 0)
330                         return;
331         }
332
333         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
334         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
335
336         tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
337         tx->tx_peer = NULL;
338         tx->tx_idle = 1;
339
340         spin_lock(&kptllnd_data.kptl_tx_lock);
341         list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
342         spin_unlock(&kptllnd_data.kptl_tx_lock);
343
344         /* Must finalize AFTER freeing 'tx' */
345         if (msg != NULL)
346                 lnet_finalize(kptllnd_data.kptl_ni, msg,
347                               (replymsg == NULL) ? status : 0);
348
349         if (replymsg != NULL)
350                 lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);
351
352         if (peer != NULL)
353                 kptllnd_peer_decref(peer);
354 }
355
356 const char *
357 kptllnd_tx_typestr(int type)
358 {
359         switch (type) {
360         default:
361                 return "<TYPE UNKNOWN>";
362                 
363         case TX_TYPE_SMALL_MESSAGE:
364                 return "msg";
365
366         case TX_TYPE_PUT_REQUEST:
367                 return "put_req";
368
369         case TX_TYPE_GET_REQUEST:
370                 return "get_req";
371                 break;
372
373         case TX_TYPE_PUT_RESPONSE:
374                 return "put_rsp";
375                 break;
376
377         case TX_TYPE_GET_RESPONSE:
378                 return "get_rsp";
379         }
380 }
381
382 void
383 kptllnd_tx_callback(ptl_event_t *ev)
384 {
385         kptl_eventarg_t *eva = ev->md.user_ptr;
386         int              ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
387         kptl_tx_t       *tx = kptllnd_eventarg2obj(eva);
388         kptl_peer_t     *peer = tx->tx_peer;
389         int              ok = (ev->ni_fail_type == PTL_OK);
390         int              unlinked;
391         unsigned long    flags;
392
393         LASSERT (peer != NULL);
394         LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
395                  eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
396         LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
397         LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
398
399 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
400         unlinked = ev->unlinked;
401 #else
402         unlinked = (ev->type == PTL_EVENT_UNLINK);
403 #endif
404         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
405                libcfs_id2str(peer->peer_id), peer->peer_credits,
406                peer->peer_outstanding_credits, peer->peer_sent_credits,
407                kptllnd_evtype2str(ev->type), ev->type, 
408                tx, kptllnd_errtype2str(ev->ni_fail_type),
409                ev->ni_fail_type, unlinked);
410
411         switch (tx->tx_type) {
412         default:
413                 LBUG();
414                 
415         case TX_TYPE_SMALL_MESSAGE:
416                 LASSERT (ismsg);
417                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
418                          ev->type == PTL_EVENT_SEND_END ||
419                          (ev->type == PTL_EVENT_ACK && tx->tx_acked));
420                 break;
421
422         case TX_TYPE_PUT_REQUEST:
423                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
424                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
425                          (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
426                          (!ismsg && ev->type == PTL_EVENT_GET_END));
427                 break;
428
429         case TX_TYPE_GET_REQUEST:
430                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
431                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
432                          (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
433                          (!ismsg && ev->type == PTL_EVENT_PUT_END));
434
435                 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
436                         if (ev->hdr_data == PTLLND_RDMA_OK) {
437                                 lnet_set_reply_msg_len(
438                                         kptllnd_data.kptl_ni,
439                                         tx->tx_lnet_replymsg,
440                                         ev->mlength);
441                         } else {
442                                 /* no match at peer */
443                                 tx->tx_status = -EIO;
444                         }
445                 }
446                 break;
447
448         case TX_TYPE_PUT_RESPONSE:
449                 LASSERT (!ismsg);
450                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
451                          ev->type == PTL_EVENT_SEND_END ||
452                          ev->type == PTL_EVENT_REPLY_END);
453                 break;
454
455         case TX_TYPE_GET_RESPONSE:
456                 LASSERT (!ismsg);
457                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
458                          ev->type == PTL_EVENT_SEND_END ||
459                          (ev->type == PTL_EVENT_ACK && tx->tx_acked));
460                 break;
461         }
462
463         if (ok) {
464                 kptllnd_peer_alive(peer);
465         } else {
466                 CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
467                        libcfs_id2str(peer->peer_id),
468                        kptllnd_evtype2str(ev->type), ev->type, 
469                        tx, kptllnd_errtype2str(ev->ni_fail_type),
470                        ev->ni_fail_type, unlinked);
471                 tx->tx_status = -EIO; 
472                 kptllnd_peer_close(peer, -EIO);
473         }
474
475         if (!unlinked)
476                 return;
477
478         spin_lock_irqsave(&peer->peer_lock, flags);
479
480         if (ismsg)
481                 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
482         else
483                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
484
485         if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
486             !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
487             !tx->tx_active) {
488                 spin_unlock_irqrestore(&peer->peer_lock, flags);
489                 return;
490         }
491
492         list_del(&tx->tx_list);
493         tx->tx_active = 0;
494
495         spin_unlock_irqrestore(&peer->peer_lock, flags);
496
497         /* drop peer's ref, but if it was the last one... */
498         if (atomic_dec_and_test(&tx->tx_refcount)) {
499                 /* ...finalize it in thread context! */
500                 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
501
502                 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
503                 wake_up(&kptllnd_data.kptl_sched_waitq);
504
505                 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
506         }
507 }