Whamcloud - gitweb
Severity : enhancement
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_tx.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19  #include "ptllnd.h"
20
21 void
22 kptllnd_free_tx(kptl_tx_t *tx)
23 {
24         if (tx->tx_msg != NULL)
25                 LIBCFS_FREE(tx->tx_msg, 
26                             *kptllnd_tunables.kptl_max_msg_size);
27                         
28         if (tx->tx_rdma_frags != NULL)
29                 LIBCFS_FREE(tx->tx_rdma_frags, 
30                             sizeof(*tx->tx_rdma_frags));
31
32         LIBCFS_FREE(tx, sizeof(*tx));
33
34         atomic_dec(&kptllnd_data.kptl_ntx);
35
36         /* Keep the tunable in step for visibility */
37         *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
38 }
39
40 kptl_tx_t *
41 kptllnd_alloc_tx(void)
42 {
43         kptl_tx_t       *tx;
44
45         LIBCFS_ALLOC(tx, sizeof(*tx));
46         if (tx == NULL) {
47                 CERROR("Failed to allocate TX\n");
48                 return NULL;
49         }
50
51         atomic_inc(&kptllnd_data.kptl_ntx);
52
53         /* Keep the tunable in step for visibility */
54         *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
55
56         tx->tx_idle = 1;
57         tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
58         tx->tx_msg_mdh = PTL_INVALID_HANDLE;
59         tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
60         tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
61         tx->tx_msg = NULL;
62         tx->tx_rdma_frags = NULL;
63                 
64         LIBCFS_ALLOC(tx->tx_msg, *kptllnd_tunables.kptl_max_msg_size);
65         if (tx->tx_msg == NULL) {
66                 CERROR("Failed to allocate TX payload\n");
67                 goto failed;
68         }
69
70         LIBCFS_ALLOC(tx->tx_rdma_frags, sizeof(*tx->tx_rdma_frags));
71         if (tx->tx_rdma_frags == NULL) {
72                 CERROR("Failed to allocate TX frags\n");
73                 goto failed;
74         }
75
76         return tx;
77
78  failed:
79         kptllnd_free_tx(tx);
80         return NULL;
81 }
82
83 int
84 kptllnd_setup_tx_descs()
85 {
86         int       n = *kptllnd_tunables.kptl_ntx;
87         int       i;
88         
89         for (i = 0; i < n; i++) {
90                 kptl_tx_t *tx = kptllnd_alloc_tx();
91                 
92                 if (tx == NULL)
93                         return -ENOMEM;
94                 
95                 spin_lock(&kptllnd_data.kptl_tx_lock);
96                 
97                 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
98                 
99                 spin_unlock(&kptllnd_data.kptl_tx_lock);
100         }
101         
102         return 0;
103 }
104
105 void
106 kptllnd_cleanup_tx_descs()
107 {
108         kptl_tx_t       *tx;
109
110         /* No locking; single threaded now */
111         LASSERT (kptllnd_data.kptl_shutdown == 2);
112
113         while (!list_empty(&kptllnd_data.kptl_idle_txs)) {
114                 tx = list_entry(kptllnd_data.kptl_idle_txs.next,
115                                 kptl_tx_t, tx_list);
116                 
117                 list_del(&tx->tx_list);
118                 kptllnd_free_tx(tx);
119         }
120
121         LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
122 }
123
124 kptl_tx_t *
125 kptllnd_get_idle_tx(enum kptl_tx_type type)
126 {
127         kptl_tx_t      *tx = NULL;
128
129         if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) && 
130             type == TX_TYPE_PUT_REQUEST) {
131                 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
132                 return NULL;
133         }
134
135         if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) && 
136             type == TX_TYPE_GET_REQUEST) {
137                 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
138                 return NULL;
139         }
140
141         if (IS_SIMULATION_ENABLED(FAIL_TX)) {
142                 CERROR ("FAIL_TX SIMULATION triggered\n");
143                 return NULL;
144         }
145
146         spin_lock(&kptllnd_data.kptl_tx_lock);
147
148         if (list_empty (&kptllnd_data.kptl_idle_txs)) {
149                 spin_unlock(&kptllnd_data.kptl_tx_lock);
150
151                 tx = kptllnd_alloc_tx();
152                 if (tx == NULL)
153                         return NULL;
154         } else {
155                 tx = list_entry(kptllnd_data.kptl_idle_txs.next, 
156                                 kptl_tx_t, tx_list);
157                 list_del(&tx->tx_list);
158
159                 spin_unlock(&kptllnd_data.kptl_tx_lock);
160         }
161
162         LASSERT (atomic_read(&tx->tx_refcount)== 0);
163         LASSERT (tx->tx_idle);
164         LASSERT (!tx->tx_active);
165         LASSERT (tx->tx_lnet_msg == NULL);
166         LASSERT (tx->tx_lnet_replymsg == NULL);
167         LASSERT (tx->tx_peer == NULL);
168         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
169         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
170         
171         tx->tx_type = type;
172         atomic_set(&tx->tx_refcount, 1);
173         tx->tx_status = 0;
174         tx->tx_idle = 0;
175
176         CDEBUG(D_NET, "tx=%p\n", tx);
177         return tx;
178 }
179
180 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
181 int
182 kptllnd_tx_abort_netio(kptl_tx_t *tx)
183 {
184         kptl_peer_t     *peer = tx->tx_peer;
185         ptl_handle_md_t  msg_mdh;
186         ptl_handle_md_t  rdma_mdh;
187         unsigned long    flags;
188
189         LASSERT (atomic_read(&tx->tx_refcount) == 0);
190         LASSERT (!tx->tx_active);
191
192         spin_lock_irqsave(&peer->peer_lock, flags);
193
194         msg_mdh = tx->tx_msg_mdh;
195         rdma_mdh = tx->tx_rdma_mdh;
196
197         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
198             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
199                 spin_unlock_irqrestore(&peer->peer_lock, flags);
200                 return 0;
201         }
202         
203         /* Uncompleted comms: there must have been some error and it must be
204          * propagated to LNET... */
205         LASSERT (tx->tx_status != 0 ||
206                  (tx->tx_lnet_msg == NULL && 
207                   tx->tx_lnet_replymsg == NULL));
208
209         /* stash the tx on its peer until it completes */
210         atomic_set(&tx->tx_refcount, 1);
211         tx->tx_active = 1;
212         list_add_tail(&tx->tx_list, &peer->peer_activeq);
213         
214         spin_unlock_irqrestore(&peer->peer_lock, flags);
215
216         /* These unlinks will ensure completion events (normal or unlink) will
217          * happen ASAP */
218
219         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
220                 PtlMDUnlink(msg_mdh);
221         
222         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
223                 PtlMDUnlink(rdma_mdh);
224
225         return -EAGAIN;
226 }
227 #else
228 int
229 kptllnd_tx_abort_netio(kptl_tx_t *tx)
230 {
231         ptl_peer_t      *peer = tx->tx_peer;
232         ptl_handle_md_t  msg_mdh;
233         ptl_handle_md_t  rdma_mdh;
234         unsigned long    flags;
235         ptl_err_t        prc;
236
237         LASSERT (atomic_read(&tx->tx_refcount) == 0);
238         LASSERT (!tx->tx_active);
239
240         spin_lock_irqsave(&peer->peer_lock, flags);
241
242         msg_mdh = tx->tx_msg_mdh;
243         rdma_mdh = tx->tx_rdma_mdh;
244
245         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
246             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
247                 spin_unlock_irqrestore(&peer->peer_lock, flags);
248                 return 0;
249         }
250         
251         /* Uncompleted comms: there must have been some error and it must be
252          * propagated to LNET... */
253         LASSERT (tx->tx_status != 0 ||
254                  (tx->tx_lnet_msg == NULL && 
255                   tx->tx_replymsg == NULL));
256
257         spin_unlock_irqrestore(&peer->peer_lock, flags);
258
259         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
260                 prc = PtlMDUnlink(msg_mdh);
261                 if (prc == PTL_OK)
262                         msg_mdh = PTL_INVALID_HANDLE;
263         }
264
265         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
266                 prc = PtlMDUnlink(rdma_mdh);
267                 if (prc == PTL_OK)
268                         rdma_mdh = PTL_INVALID_HANDLE;
269         }
270
271         spin_lock_irqsave(&peer->peer_lock, flags);
272
273         /* update tx_???_mdh if callback hasn't fired */
274         if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
275                 msg_mdh = PTL_INVALID_HANDLE;
276         else
277                 tx->tx_msg_mdh = msg_mdh;
278         
279         if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
280                 rdma_mdh = PTL_INVALID_HANDLE;
281         else
282                 tx->tx_rdma_mdh = rdma_mdh;
283
284         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
285             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
286                 spin_unlock_irqrestore(&peer->peer_lock, flags);
287                 return 0;
288         }
289
290         /* stash the tx on its peer until it completes */
291         atomic_set(&tx->tx_refcount, 1);
292         tx->tx_active = 1;
293         list_add_tail(&tx->tx_list, &peer->peer_activeq);
294
295         kptllnd_peer_addref(peer);              /* extra ref for me... */
296
297         spin_unlock_irqrestore(&peer->peer_lock, flags);
298
299         /* This will get the watchdog thread to try aborting all the peer's
300          * comms again.  NB, this deems it fair that 1 failing tx which can't
301          * be aborted immediately (i.e. its MDs are still busy) is valid cause
302          * to nuke everything to the same peer! */
303         kptllnd_peer_close(peer, tx->tx_status);
304
305         kptllnd_peer_decref(peer);
306
307         return -EAGAIN;
308 }
309 #endif
310
311 void
312 kptllnd_tx_fini (kptl_tx_t *tx)
313 {
314         lnet_msg_t     *replymsg = tx->tx_lnet_replymsg;
315         lnet_msg_t     *msg      = tx->tx_lnet_msg;
316         kptl_peer_t    *peer     = tx->tx_peer;
317         int             status   = tx->tx_status;
318         int             rc;
319
320         LASSERT (!in_interrupt());
321         LASSERT (atomic_read(&tx->tx_refcount) == 0);
322         LASSERT (!tx->tx_idle);
323         LASSERT (!tx->tx_active);
324
325         /* TX has completed or failed */
326
327         if (peer != NULL) {
328                 rc = kptllnd_tx_abort_netio(tx);
329                 if (rc != 0)
330                         return;
331         }
332
333         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
334         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
335
336         tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
337         tx->tx_peer = NULL;
338         tx->tx_idle = 1;
339
340         spin_lock(&kptllnd_data.kptl_tx_lock);
341         list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
342         spin_unlock(&kptllnd_data.kptl_tx_lock);
343
344         /* Must finalize AFTER freeing 'tx' */
345         if (msg != NULL)
346                 lnet_finalize(kptllnd_data.kptl_ni, msg,
347                               (replymsg == NULL) ? status : 0);
348
349         if (replymsg != NULL)
350                 lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);
351
352         if (peer != NULL)
353                 kptllnd_peer_decref(peer);
354 }
355
356 const char *
357 kptllnd_tx_typestr(int type)
358 {
359         switch (type) {
360         default:
361                 return "<TYPE UNKNOWN>";
362                 
363         case TX_TYPE_SMALL_MESSAGE:
364                 return "msg";
365
366         case TX_TYPE_PUT_REQUEST:
367                 return "put_req";
368
369         case TX_TYPE_GET_REQUEST:
370                 return "get_req";
371                 break;
372
373         case TX_TYPE_PUT_RESPONSE:
374                 return "put_rsp";
375                 break;
376
377         case TX_TYPE_GET_RESPONSE:
378                 return "get_rsp";
379         }
380 }
381
382 void
383 kptllnd_tx_callback(ptl_event_t *ev)
384 {
385         kptl_eventarg_t *eva = ev->md.user_ptr;
386         int              ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
387         kptl_tx_t       *tx = kptllnd_eventarg2obj(eva);
388         kptl_peer_t     *peer = tx->tx_peer;
389         int              ok = (ev->ni_fail_type == PTL_OK);
390         int              unlinked;
391         unsigned long    flags;
392
393         LASSERT (peer != NULL);
394         LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
395                  eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
396         LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
397         LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
398
399 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
400         unlinked = ev->unlinked;
401 #else
402         unlinked = (ev->type == PTL_EVENT_UNLINK);
403 #endif
404         CDEBUG(D_NETTRACE, "%s[%d/%d]: %s(%d) tx=%p fail=%d unlinked=%d\n",
405                libcfs_id2str(peer->peer_id),
406                peer->peer_credits, peer->peer_outstanding_credits,
407                kptllnd_evtype2str(ev->type), ev->type, 
408                tx, ev->ni_fail_type, unlinked);
409
410         switch (tx->tx_type) {
411         default:
412                 LBUG();
413                 
414         case TX_TYPE_SMALL_MESSAGE:
415                 LASSERT (ismsg);
416                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
417                          ev->type == PTL_EVENT_SEND_END);
418                 break;
419
420         case TX_TYPE_PUT_REQUEST:
421                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
422                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
423                          (!ismsg && ev->type == PTL_EVENT_GET_END));
424                 break;
425
426         case TX_TYPE_GET_REQUEST:
427                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
428                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
429                          (!ismsg && ev->type == PTL_EVENT_PUT_END));
430
431                 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
432                         if (ev->hdr_data == PTLLND_RDMA_OK) {
433                                 lnet_set_reply_msg_len(
434                                         kptllnd_data.kptl_ni,
435                                         tx->tx_lnet_replymsg,
436                                         ev->mlength);
437                         } else {
438                                 /* no match at peer */
439                                 tx->tx_status = -EIO;
440                         }
441                 }
442                 break;
443
444         case TX_TYPE_PUT_RESPONSE:
445                 LASSERT (!ismsg);
446                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
447                          ev->type == PTL_EVENT_SEND_END ||
448                          ev->type == PTL_EVENT_REPLY_END);
449                 break;
450
451         case TX_TYPE_GET_RESPONSE:
452                 LASSERT (!ismsg);
453                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
454                          ev->type == PTL_EVENT_SEND_END);
455                 break;
456         }
457
458         if (!ok)
459                 kptllnd_peer_close(peer, -EIO);
460         else
461                 kptllnd_peer_alive(peer);
462
463         if (!unlinked)
464                 return;
465
466         spin_lock_irqsave(&peer->peer_lock, flags);
467
468         if (ismsg)
469                 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
470         else
471                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
472
473         if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
474             !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
475             !tx->tx_active) {
476                 spin_unlock_irqrestore(&peer->peer_lock, flags);
477                 return;
478         }
479
480         list_del(&tx->tx_list);
481         tx->tx_active = 0;
482
483         spin_unlock_irqrestore(&peer->peer_lock, flags);
484
485         /* drop peer's ref, but if it was the last one... */
486         if (atomic_dec_and_test(&tx->tx_refcount)) {
487                 /* ...finalize it in thread context! */
488                 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
489
490                 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
491                 wake_up(&kptllnd_data.kptl_sched_waitq);
492
493                 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
494         }
495 }