Whamcloud - gitweb
LU-936 Remove LUSTRE_KERNEL_VERSION
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_tx.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/klnds/ptllnd/ptllnd_tx.c
35  *
36  * Author: PJ Kirner <pjkirner@clusterfs.com>
37  */
38
39  #include "ptllnd.h"
40
41 void
42 kptllnd_free_tx(kptl_tx_t *tx)
43 {
44         if (tx->tx_msg != NULL)
45                 LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
46                         
47         if (tx->tx_frags != NULL)
48                 LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
49
50         LIBCFS_FREE(tx, sizeof(*tx));
51
52         cfs_atomic_dec(&kptllnd_data.kptl_ntx);
53
54         /* Keep the tunable in step for visibility */
55         *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
56 }
57
58 kptl_tx_t *
59 kptllnd_alloc_tx(void)
60 {
61         kptl_tx_t       *tx;
62
63         LIBCFS_ALLOC(tx, sizeof(*tx));
64         if (tx == NULL) {
65                 CERROR("Failed to allocate TX\n");
66                 return NULL;
67         }
68
69         cfs_atomic_inc(&kptllnd_data.kptl_ntx);
70
71         /* Keep the tunable in step for visibility */
72         *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
73
74         tx->tx_idle = 1;
75         tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
76         tx->tx_msg_mdh = PTL_INVALID_HANDLE;
77         tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
78         tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
79         tx->tx_msg = NULL;
80         tx->tx_peer = NULL;
81         tx->tx_frags = NULL;
82                 
83         LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
84         if (tx->tx_msg == NULL) {
85                 CERROR("Failed to allocate TX payload\n");
86                 goto failed;
87         }
88
89         LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
90         if (tx->tx_frags == NULL) {
91                 CERROR("Failed to allocate TX frags\n");
92                 goto failed;
93         }
94
95         return tx;
96
97  failed:
98         kptllnd_free_tx(tx);
99         return NULL;
100 }
101
102 int
103 kptllnd_setup_tx_descs()
104 {
105         int       n = *kptllnd_tunables.kptl_ntx;
106         int       i;
107
108         for (i = 0; i < n; i++) {
109                 kptl_tx_t *tx = kptllnd_alloc_tx();
110                 if (tx == NULL)
111                         return -ENOMEM;
112
113                 cfs_spin_lock(&kptllnd_data.kptl_tx_lock);
114                 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
115                 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
116         }
117
118         return 0;
119 }
120
121 void
122 kptllnd_cleanup_tx_descs()
123 {
124         kptl_tx_t       *tx;
125
126         /* No locking; single threaded now */
127         LASSERT (kptllnd_data.kptl_shutdown == 2);
128
129         while (!cfs_list_empty(&kptllnd_data.kptl_idle_txs)) {
130                 tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next,
131                                     kptl_tx_t, tx_list);
132
133                 cfs_list_del(&tx->tx_list);
134                 kptllnd_free_tx(tx);
135         }
136
137         LASSERT (cfs_atomic_read(&kptllnd_data.kptl_ntx) == 0);
138 }
139
140 kptl_tx_t *
141 kptllnd_get_idle_tx(enum kptl_tx_type type)
142 {
143         kptl_tx_t      *tx = NULL;
144
145         if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) &&
146             type == TX_TYPE_PUT_REQUEST) {
147                 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
148                 return NULL;
149         }
150
151         if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) &&
152             type == TX_TYPE_GET_REQUEST) {
153                 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
154                 return NULL;
155         }
156
157         if (IS_SIMULATION_ENABLED(FAIL_TX)) {
158                 CERROR ("FAIL_TX SIMULATION triggered\n");
159                 return NULL;
160         }
161
162         cfs_spin_lock(&kptllnd_data.kptl_tx_lock);
163
164         if (cfs_list_empty (&kptllnd_data.kptl_idle_txs)) {
165                 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
166
167                 tx = kptllnd_alloc_tx();
168                 if (tx == NULL)
169                         return NULL;
170         } else {
171                 tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next, 
172                                     kptl_tx_t, tx_list);
173                 cfs_list_del(&tx->tx_list);
174
175                 cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
176         }
177
178         LASSERT (cfs_atomic_read(&tx->tx_refcount)== 0);
179         LASSERT (tx->tx_idle);
180         LASSERT (!tx->tx_active);
181         LASSERT (tx->tx_lnet_msg == NULL);
182         LASSERT (tx->tx_lnet_replymsg == NULL);
183         LASSERT (tx->tx_peer == NULL);
184         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
185         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
186         
187         tx->tx_type = type;
188         cfs_atomic_set(&tx->tx_refcount, 1);
189         tx->tx_status = 0;
190         tx->tx_idle = 0;
191         tx->tx_tposted = 0;
192         tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
193
194         CDEBUG(D_NET, "tx=%p\n", tx);
195         return tx;
196 }
197
198 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
199 int
200 kptllnd_tx_abort_netio(kptl_tx_t *tx)
201 {
202         kptl_peer_t     *peer = tx->tx_peer;
203         ptl_handle_md_t  msg_mdh;
204         ptl_handle_md_t  rdma_mdh;
205         unsigned long    flags;
206
207         LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
208         LASSERT (!tx->tx_active);
209
210         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
211
212         msg_mdh = tx->tx_msg_mdh;
213         rdma_mdh = tx->tx_rdma_mdh;
214
215         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
216             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
217                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
218                 return 0;
219         }
220         
221         /* Uncompleted comms: there must have been some error and it must be
222          * propagated to LNET... */
223         LASSERT (tx->tx_status != 0 ||
224                  (tx->tx_lnet_msg == NULL && 
225                   tx->tx_lnet_replymsg == NULL));
226
227         /* stash the tx on its peer until it completes */
228         cfs_atomic_set(&tx->tx_refcount, 1);
229         tx->tx_active = 1;
230         cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
231         
232         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
233
234         /* These unlinks will ensure completion events (normal or unlink) will
235          * happen ASAP */
236
237         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
238                 PtlMDUnlink(msg_mdh);
239         
240         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
241                 PtlMDUnlink(rdma_mdh);
242
243         return -EAGAIN;
244 }
245 #else
246 int
247 kptllnd_tx_abort_netio(kptl_tx_t *tx)
248 {
249         ptl_peer_t      *peer = tx->tx_peer;
250         ptl_handle_md_t  msg_mdh;
251         ptl_handle_md_t  rdma_mdh;
252         unsigned long    flags;
253         ptl_err_t        prc;
254
255         LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
256         LASSERT (!tx->tx_active);
257
258         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
259
260         msg_mdh = tx->tx_msg_mdh;
261         rdma_mdh = tx->tx_rdma_mdh;
262
263         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
264             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
265                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
266                 return 0;
267         }
268         
269         /* Uncompleted comms: there must have been some error and it must be
270          * propagated to LNET... */
271         LASSERT (tx->tx_status != 0 ||
272                  (tx->tx_lnet_msg == NULL && 
273                   tx->tx_replymsg == NULL));
274
275         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
276
277         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
278                 prc = PtlMDUnlink(msg_mdh);
279                 if (prc == PTL_OK)
280                         msg_mdh = PTL_INVALID_HANDLE;
281         }
282
283         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
284                 prc = PtlMDUnlink(rdma_mdh);
285                 if (prc == PTL_OK)
286                         rdma_mdh = PTL_INVALID_HANDLE;
287         }
288
289         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
290
291         /* update tx_???_mdh if callback hasn't fired */
292         if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
293                 msg_mdh = PTL_INVALID_HANDLE;
294         else
295                 tx->tx_msg_mdh = msg_mdh;
296         
297         if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
298                 rdma_mdh = PTL_INVALID_HANDLE;
299         else
300                 tx->tx_rdma_mdh = rdma_mdh;
301
302         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
303             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
304                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
305                 return 0;
306         }
307
308         /* stash the tx on its peer until it completes */
309         cfs_atomic_set(&tx->tx_refcount, 1);
310         tx->tx_active = 1;
311         cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
312
313         kptllnd_peer_addref(peer);              /* extra ref for me... */
314
315         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
316
317         /* This will get the watchdog thread to try aborting all the peer's
318          * comms again.  NB, this deems it fair that 1 failing tx which can't
319          * be aborted immediately (i.e. its MDs are still busy) is valid cause
320          * to nuke everything to the same peer! */
321         kptllnd_peer_close(peer, tx->tx_status);
322
323         kptllnd_peer_decref(peer);
324
325         return -EAGAIN;
326 }
327 #endif
328
329 void
330 kptllnd_tx_fini (kptl_tx_t *tx)
331 {
332         lnet_msg_t     *replymsg = tx->tx_lnet_replymsg;
333         lnet_msg_t     *msg      = tx->tx_lnet_msg;
334         kptl_peer_t    *peer     = tx->tx_peer;
335         int             status   = tx->tx_status;
336         int             rc;
337
338         LASSERT (!cfs_in_interrupt());
339         LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
340         LASSERT (!tx->tx_idle);
341         LASSERT (!tx->tx_active);
342
343         /* TX has completed or failed */
344
345         if (peer != NULL) {
346                 rc = kptllnd_tx_abort_netio(tx);
347                 if (rc != 0)
348                         return;
349         }
350
351         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
352         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
353
354         tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
355         tx->tx_peer = NULL;
356         tx->tx_idle = 1;
357
358         cfs_spin_lock(&kptllnd_data.kptl_tx_lock);
359         cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
360         cfs_spin_unlock(&kptllnd_data.kptl_tx_lock);
361
362         /* Must finalize AFTER freeing 'tx' */
363         if (msg != NULL)
364                 lnet_finalize(NULL, msg, (replymsg == NULL) ? status : 0);
365
366         if (replymsg != NULL)
367                 lnet_finalize(NULL, replymsg, status);
368
369         if (peer != NULL)
370                 kptllnd_peer_decref(peer);
371 }
372
373 const char *
374 kptllnd_tx_typestr(int type)
375 {
376         switch (type) {
377         default:
378                 return "<TYPE UNKNOWN>";
379                 
380         case TX_TYPE_SMALL_MESSAGE:
381                 return "msg";
382
383         case TX_TYPE_PUT_REQUEST:
384                 return "put_req";
385
386         case TX_TYPE_GET_REQUEST:
387                 return "get_req";
388                 break;
389
390         case TX_TYPE_PUT_RESPONSE:
391                 return "put_rsp";
392                 break;
393
394         case TX_TYPE_GET_RESPONSE:
395                 return "get_rsp";
396         }
397 }
398
399 void
400 kptllnd_tx_callback(ptl_event_t *ev)
401 {
402         kptl_eventarg_t *eva = ev->md.user_ptr;
403         int              ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
404         kptl_tx_t       *tx = kptllnd_eventarg2obj(eva);
405         kptl_peer_t     *peer = tx->tx_peer;
406         int              ok = (ev->ni_fail_type == PTL_OK);
407         int              unlinked;
408         unsigned long    flags;
409
410         LASSERT (peer != NULL);
411         LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
412                  eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
413         LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
414         LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
415
416 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
417         unlinked = ev->unlinked;
418 #else
419         unlinked = (ev->type == PTL_EVENT_UNLINK);
420 #endif
421         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
422                libcfs_id2str(peer->peer_id), peer->peer_credits,
423                peer->peer_outstanding_credits, peer->peer_sent_credits,
424                kptllnd_evtype2str(ev->type), ev->type, 
425                tx, kptllnd_errtype2str(ev->ni_fail_type),
426                ev->ni_fail_type, unlinked);
427
428         switch (tx->tx_type) {
429         default:
430                 LBUG();
431                 
432         case TX_TYPE_SMALL_MESSAGE:
433                 LASSERT (ismsg);
434                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
435                          ev->type == PTL_EVENT_SEND_END ||
436                          (ev->type == PTL_EVENT_ACK && tx->tx_acked));
437                 break;
438
439         case TX_TYPE_PUT_REQUEST:
440                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
441                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
442                          (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
443                          (!ismsg && ev->type == PTL_EVENT_GET_END));
444                 break;
445
446         case TX_TYPE_GET_REQUEST:
447                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
448                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
449                          (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
450                          (!ismsg && ev->type == PTL_EVENT_PUT_END));
451
452                 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
453                         if (ev->hdr_data == PTLLND_RDMA_OK) {
454                                 lnet_set_reply_msg_len(NULL,
455                                         tx->tx_lnet_replymsg,
456                                         ev->mlength);
457                         } else {
458                                 /* no match at peer */
459                                 tx->tx_status = -EIO;
460                         }
461                 }
462                 break;
463
464         case TX_TYPE_PUT_RESPONSE:
465                 LASSERT (!ismsg);
466                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
467                          ev->type == PTL_EVENT_SEND_END ||
468                          ev->type == PTL_EVENT_REPLY_END);
469                 break;
470
471         case TX_TYPE_GET_RESPONSE:
472                 LASSERT (!ismsg);
473                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
474                          ev->type == PTL_EVENT_SEND_END ||
475                          (ev->type == PTL_EVENT_ACK && tx->tx_acked));
476                 break;
477         }
478
479         if (ok) {
480                 kptllnd_peer_alive(peer);
481         } else {
482                 CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
483                        libcfs_id2str(peer->peer_id),
484                        kptllnd_evtype2str(ev->type), ev->type, 
485                        tx, kptllnd_errtype2str(ev->ni_fail_type),
486                        ev->ni_fail_type, unlinked);
487                 tx->tx_status = -EIO; 
488                 kptllnd_peer_close(peer, -EIO);
489                 kptllnd_schedule_ptltrace_dump();
490         }
491
492         if (!unlinked)
493                 return;
494
495         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
496
497         if (ismsg)
498                 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
499         else
500                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
501
502         if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
503             !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
504             !tx->tx_active) {
505                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
506                 return;
507         }
508
509         cfs_list_del(&tx->tx_list);
510         tx->tx_active = 0;
511
512         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
513
514         /* drop peer's ref, but if it was the last one... */
515         if (cfs_atomic_dec_and_test(&tx->tx_refcount)) {
516                 /* ...finalize it in thread context! */
517                 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
518
519                 cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
520                 cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq);
521
522                 cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
523                                            flags);
524         }
525 }