Whamcloud - gitweb
LU-1346 gnilnd: remove libcfs abstractions
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_tx.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_tx.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  */
40
41  #include "ptllnd.h"
42
43 void
44 kptllnd_free_tx(kptl_tx_t *tx)
45 {
46         if (tx->tx_msg != NULL)
47                 LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
48                         
49         if (tx->tx_frags != NULL)
50                 LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
51
52         LIBCFS_FREE(tx, sizeof(*tx));
53
54         cfs_atomic_dec(&kptllnd_data.kptl_ntx);
55
56         /* Keep the tunable in step for visibility */
57         *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
58 }
59
60 kptl_tx_t *
61 kptllnd_alloc_tx(void)
62 {
63         kptl_tx_t       *tx;
64
65         LIBCFS_ALLOC(tx, sizeof(*tx));
66         if (tx == NULL) {
67                 CERROR("Failed to allocate TX\n");
68                 return NULL;
69         }
70
71         cfs_atomic_inc(&kptllnd_data.kptl_ntx);
72
73         /* Keep the tunable in step for visibility */
74         *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
75
76         tx->tx_idle = 1;
77         tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
78         tx->tx_msg_mdh = PTL_INVALID_HANDLE;
79         tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
80         tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
81         tx->tx_msg = NULL;
82         tx->tx_peer = NULL;
83         tx->tx_frags = NULL;
84                 
85         LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
86         if (tx->tx_msg == NULL) {
87                 CERROR("Failed to allocate TX payload\n");
88                 goto failed;
89         }
90
91         LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
92         if (tx->tx_frags == NULL) {
93                 CERROR("Failed to allocate TX frags\n");
94                 goto failed;
95         }
96
97         return tx;
98
99  failed:
100         kptllnd_free_tx(tx);
101         return NULL;
102 }
103
104 int
105 kptllnd_setup_tx_descs()
106 {
107         int       n = *kptllnd_tunables.kptl_ntx;
108         int       i;
109
110         for (i = 0; i < n; i++) {
111                 kptl_tx_t *tx = kptllnd_alloc_tx();
112                 if (tx == NULL)
113                         return -ENOMEM;
114
115                 spin_lock(&kptllnd_data.kptl_tx_lock);
116                 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
117                 spin_unlock(&kptllnd_data.kptl_tx_lock);
118         }
119
120         return 0;
121 }
122
123 void
124 kptllnd_cleanup_tx_descs()
125 {
126         kptl_tx_t       *tx;
127
128         /* No locking; single threaded now */
129         LASSERT (kptllnd_data.kptl_shutdown == 2);
130
131         while (!cfs_list_empty(&kptllnd_data.kptl_idle_txs)) {
132                 tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next,
133                                     kptl_tx_t, tx_list);
134
135                 cfs_list_del(&tx->tx_list);
136                 kptllnd_free_tx(tx);
137         }
138
139         LASSERT (cfs_atomic_read(&kptllnd_data.kptl_ntx) == 0);
140 }
141
142 kptl_tx_t *
143 kptllnd_get_idle_tx(enum kptl_tx_type type)
144 {
145         kptl_tx_t      *tx = NULL;
146
147         if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) &&
148             type == TX_TYPE_PUT_REQUEST) {
149                 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
150                 return NULL;
151         }
152
153         if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) &&
154             type == TX_TYPE_GET_REQUEST) {
155                 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
156                 return NULL;
157         }
158
159         if (IS_SIMULATION_ENABLED(FAIL_TX)) {
160                 CERROR ("FAIL_TX SIMULATION triggered\n");
161                 return NULL;
162         }
163
164         spin_lock(&kptllnd_data.kptl_tx_lock);
165
166         if (cfs_list_empty (&kptllnd_data.kptl_idle_txs)) {
167                 spin_unlock(&kptllnd_data.kptl_tx_lock);
168
169                 tx = kptllnd_alloc_tx();
170                 if (tx == NULL)
171                         return NULL;
172         } else {
173                 tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next, 
174                                     kptl_tx_t, tx_list);
175                 cfs_list_del(&tx->tx_list);
176
177                 spin_unlock(&kptllnd_data.kptl_tx_lock);
178         }
179
180         LASSERT (cfs_atomic_read(&tx->tx_refcount)== 0);
181         LASSERT (tx->tx_idle);
182         LASSERT (!tx->tx_active);
183         LASSERT (tx->tx_lnet_msg == NULL);
184         LASSERT (tx->tx_lnet_replymsg == NULL);
185         LASSERT (tx->tx_peer == NULL);
186         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
187         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
188         
189         tx->tx_type = type;
190         cfs_atomic_set(&tx->tx_refcount, 1);
191         tx->tx_status = 0;
192         tx->tx_idle = 0;
193         tx->tx_tposted = 0;
194         tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
195
196         CDEBUG(D_NET, "tx=%p\n", tx);
197         return tx;
198 }
199
200 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
201 int
202 kptllnd_tx_abort_netio(kptl_tx_t *tx)
203 {
204         kptl_peer_t     *peer = tx->tx_peer;
205         ptl_handle_md_t  msg_mdh;
206         ptl_handle_md_t  rdma_mdh;
207         unsigned long    flags;
208
209         LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
210         LASSERT (!tx->tx_active);
211
212         spin_lock_irqsave(&peer->peer_lock, flags);
213
214         msg_mdh = tx->tx_msg_mdh;
215         rdma_mdh = tx->tx_rdma_mdh;
216
217         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
218             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
219                 spin_unlock_irqrestore(&peer->peer_lock, flags);
220                 return 0;
221         }
222         
223         /* Uncompleted comms: there must have been some error and it must be
224          * propagated to LNET... */
225         LASSERT (tx->tx_status != 0 ||
226                  (tx->tx_lnet_msg == NULL && 
227                   tx->tx_lnet_replymsg == NULL));
228
229         /* stash the tx on its peer until it completes */
230         cfs_atomic_set(&tx->tx_refcount, 1);
231         tx->tx_active = 1;
232         cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
233         
234         spin_unlock_irqrestore(&peer->peer_lock, flags);
235
236         /* These unlinks will ensure completion events (normal or unlink) will
237          * happen ASAP */
238
239         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
240                 PtlMDUnlink(msg_mdh);
241         
242         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
243                 PtlMDUnlink(rdma_mdh);
244
245         return -EAGAIN;
246 }
247 #else
248 int
249 kptllnd_tx_abort_netio(kptl_tx_t *tx)
250 {
251         ptl_peer_t      *peer = tx->tx_peer;
252         ptl_handle_md_t  msg_mdh;
253         ptl_handle_md_t  rdma_mdh;
254         unsigned long    flags;
255         ptl_err_t        prc;
256
257         LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
258         LASSERT (!tx->tx_active);
259
260         spin_lock_irqsave(&peer->peer_lock, flags);
261
262         msg_mdh = tx->tx_msg_mdh;
263         rdma_mdh = tx->tx_rdma_mdh;
264
265         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
266             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
267                 spin_unlock_irqrestore(&peer->peer_lock, flags);
268                 return 0;
269         }
270         
271         /* Uncompleted comms: there must have been some error and it must be
272          * propagated to LNET... */
273         LASSERT (tx->tx_status != 0 ||
274                  (tx->tx_lnet_msg == NULL && 
275                   tx->tx_replymsg == NULL));
276
277         spin_unlock_irqrestore(&peer->peer_lock, flags);
278
279         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
280                 prc = PtlMDUnlink(msg_mdh);
281                 if (prc == PTL_OK)
282                         msg_mdh = PTL_INVALID_HANDLE;
283         }
284
285         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
286                 prc = PtlMDUnlink(rdma_mdh);
287                 if (prc == PTL_OK)
288                         rdma_mdh = PTL_INVALID_HANDLE;
289         }
290
291         spin_lock_irqsave(&peer->peer_lock, flags);
292
293         /* update tx_???_mdh if callback hasn't fired */
294         if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
295                 msg_mdh = PTL_INVALID_HANDLE;
296         else
297                 tx->tx_msg_mdh = msg_mdh;
298         
299         if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
300                 rdma_mdh = PTL_INVALID_HANDLE;
301         else
302                 tx->tx_rdma_mdh = rdma_mdh;
303
304         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
305             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
306                 spin_unlock_irqrestore(&peer->peer_lock, flags);
307                 return 0;
308         }
309
310         /* stash the tx on its peer until it completes */
311         cfs_atomic_set(&tx->tx_refcount, 1);
312         tx->tx_active = 1;
313         cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
314
315         kptllnd_peer_addref(peer);              /* extra ref for me... */
316
317         spin_unlock_irqrestore(&peer->peer_lock, flags);
318
319         /* This will get the watchdog thread to try aborting all the peer's
320          * comms again.  NB, this deems it fair that 1 failing tx which can't
321          * be aborted immediately (i.e. its MDs are still busy) is valid cause
322          * to nuke everything to the same peer! */
323         kptllnd_peer_close(peer, tx->tx_status);
324
325         kptllnd_peer_decref(peer);
326
327         return -EAGAIN;
328 }
329 #endif
330
331 void
332 kptllnd_tx_fini (kptl_tx_t *tx)
333 {
334         lnet_msg_t     *replymsg = tx->tx_lnet_replymsg;
335         lnet_msg_t     *msg      = tx->tx_lnet_msg;
336         kptl_peer_t    *peer     = tx->tx_peer;
337         int             status   = tx->tx_status;
338         int             rc;
339
340         LASSERT (!cfs_in_interrupt());
341         LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
342         LASSERT (!tx->tx_idle);
343         LASSERT (!tx->tx_active);
344
345         /* TX has completed or failed */
346
347         if (peer != NULL) {
348                 rc = kptllnd_tx_abort_netio(tx);
349                 if (rc != 0)
350                         return;
351         }
352
353         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
354         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
355
356         tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
357         tx->tx_peer = NULL;
358         tx->tx_idle = 1;
359
360         spin_lock(&kptllnd_data.kptl_tx_lock);
361         cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
362         spin_unlock(&kptllnd_data.kptl_tx_lock);
363
364         /* Must finalize AFTER freeing 'tx' */
365         if (msg != NULL)
366                 lnet_finalize(NULL, msg, (replymsg == NULL) ? status : 0);
367
368         if (replymsg != NULL)
369                 lnet_finalize(NULL, replymsg, status);
370
371         if (peer != NULL)
372                 kptllnd_peer_decref(peer);
373 }
374
375 const char *
376 kptllnd_tx_typestr(int type)
377 {
378         switch (type) {
379         default:
380                 return "<TYPE UNKNOWN>";
381                 
382         case TX_TYPE_SMALL_MESSAGE:
383                 return "msg";
384
385         case TX_TYPE_PUT_REQUEST:
386                 return "put_req";
387
388         case TX_TYPE_GET_REQUEST:
389                 return "get_req";
390                 break;
391
392         case TX_TYPE_PUT_RESPONSE:
393                 return "put_rsp";
394                 break;
395
396         case TX_TYPE_GET_RESPONSE:
397                 return "get_rsp";
398         }
399 }
400
401 void
402 kptllnd_tx_callback(ptl_event_t *ev)
403 {
404         kptl_eventarg_t *eva = ev->md.user_ptr;
405         int              ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
406         kptl_tx_t       *tx = kptllnd_eventarg2obj(eva);
407         kptl_peer_t     *peer = tx->tx_peer;
408         int              ok = (ev->ni_fail_type == PTL_OK);
409         int              unlinked;
410         unsigned long    flags;
411
412         LASSERT (peer != NULL);
413         LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
414                  eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
415         LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
416         LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
417
418 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
419         unlinked = ev->unlinked;
420 #else
421         unlinked = (ev->type == PTL_EVENT_UNLINK);
422 #endif
423         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
424                libcfs_id2str(peer->peer_id), peer->peer_credits,
425                peer->peer_outstanding_credits, peer->peer_sent_credits,
426                kptllnd_evtype2str(ev->type), ev->type, 
427                tx, kptllnd_errtype2str(ev->ni_fail_type),
428                ev->ni_fail_type, unlinked);
429
430         switch (tx->tx_type) {
431         default:
432                 LBUG();
433                 
434         case TX_TYPE_SMALL_MESSAGE:
435                 LASSERT (ismsg);
436                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
437                          ev->type == PTL_EVENT_SEND_END ||
438                          (ev->type == PTL_EVENT_ACK && tx->tx_acked));
439                 break;
440
441         case TX_TYPE_PUT_REQUEST:
442                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
443                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
444                          (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
445                          (!ismsg && ev->type == PTL_EVENT_GET_END));
446                 break;
447
448         case TX_TYPE_GET_REQUEST:
449                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
450                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
451                          (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
452                          (!ismsg && ev->type == PTL_EVENT_PUT_END));
453
454                 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
455                         if (ev->hdr_data == PTLLND_RDMA_OK) {
456                                 lnet_set_reply_msg_len(NULL,
457                                         tx->tx_lnet_replymsg,
458                                         ev->mlength);
459                         } else {
460                                 /* no match at peer */
461                                 tx->tx_status = -EIO;
462                         }
463                 }
464                 break;
465
466         case TX_TYPE_PUT_RESPONSE:
467                 LASSERT (!ismsg);
468                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
469                          ev->type == PTL_EVENT_SEND_END ||
470                          ev->type == PTL_EVENT_REPLY_END);
471                 break;
472
473         case TX_TYPE_GET_RESPONSE:
474                 LASSERT (!ismsg);
475                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
476                          ev->type == PTL_EVENT_SEND_END ||
477                          (ev->type == PTL_EVENT_ACK && tx->tx_acked));
478                 break;
479         }
480
481         if (ok) {
482                 kptllnd_peer_alive(peer);
483         } else {
484                 CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
485                        libcfs_id2str(peer->peer_id),
486                        kptllnd_evtype2str(ev->type), ev->type, 
487                        tx, kptllnd_errtype2str(ev->ni_fail_type),
488                        ev->ni_fail_type, unlinked);
489                 tx->tx_status = -EIO; 
490                 kptllnd_peer_close(peer, -EIO);
491         }
492
493         if (!unlinked)
494                 return;
495
496         spin_lock_irqsave(&peer->peer_lock, flags);
497
498         if (ismsg)
499                 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
500         else
501                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
502
503         if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
504             !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
505             !tx->tx_active) {
506                 spin_unlock_irqrestore(&peer->peer_lock, flags);
507                 return;
508         }
509
510         cfs_list_del(&tx->tx_list);
511         tx->tx_active = 0;
512
513         spin_unlock_irqrestore(&peer->peer_lock, flags);
514
515         /* drop peer's ref, but if it was the last one... */
516         if (cfs_atomic_dec_and_test(&tx->tx_refcount)) {
517                 /* ...finalize it in thread context! */
518                 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
519
520                 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
521                 wake_up(&kptllnd_data.kptl_sched_waitq);
522
523                 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
524                                            flags);
525         }
526 }