Whamcloud - gitweb
b=16034,i=nic:
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_tx.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_tx.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  */
40
41  #include "ptllnd.h"
42
43 void
44 kptllnd_free_tx(kptl_tx_t *tx)
45 {
46         if (tx->tx_msg != NULL)
47                 LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
48                         
49         if (tx->tx_frags != NULL)
50                 LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
51
52         LIBCFS_FREE(tx, sizeof(*tx));
53
54         atomic_dec(&kptllnd_data.kptl_ntx);
55
56         /* Keep the tunable in step for visibility */
57         *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
58 }
59
60 kptl_tx_t *
61 kptllnd_alloc_tx(void)
62 {
63         kptl_tx_t       *tx;
64
65         LIBCFS_ALLOC(tx, sizeof(*tx));
66         if (tx == NULL) {
67                 CERROR("Failed to allocate TX\n");
68                 return NULL;
69         }
70
71         atomic_inc(&kptllnd_data.kptl_ntx);
72
73         /* Keep the tunable in step for visibility */
74         *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
75
76         tx->tx_idle = 1;
77         tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
78         tx->tx_msg_mdh = PTL_INVALID_HANDLE;
79         tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
80         tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
81         tx->tx_msg = NULL;
82         tx->tx_frags = NULL;
83                 
84         LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
85         if (tx->tx_msg == NULL) {
86                 CERROR("Failed to allocate TX payload\n");
87                 goto failed;
88         }
89
90         LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
91         if (tx->tx_frags == NULL) {
92                 CERROR("Failed to allocate TX frags\n");
93                 goto failed;
94         }
95
96         return tx;
97
98  failed:
99         kptllnd_free_tx(tx);
100         return NULL;
101 }
102
103 int
104 kptllnd_setup_tx_descs()
105 {
106         int       n = *kptllnd_tunables.kptl_ntx;
107         int       i;
108         
109         for (i = 0; i < n; i++) {
110                 kptl_tx_t *tx = kptllnd_alloc_tx();
111                 
112                 if (tx == NULL)
113                         return -ENOMEM;
114                 
115                 spin_lock(&kptllnd_data.kptl_tx_lock);
116                 
117                 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
118                 
119                 spin_unlock(&kptllnd_data.kptl_tx_lock);
120         }
121         
122         return 0;
123 }
124
125 void
126 kptllnd_cleanup_tx_descs()
127 {
128         kptl_tx_t       *tx;
129
130         /* No locking; single threaded now */
131         LASSERT (kptllnd_data.kptl_shutdown == 2);
132
133         while (!list_empty(&kptllnd_data.kptl_idle_txs)) {
134                 tx = list_entry(kptllnd_data.kptl_idle_txs.next,
135                                 kptl_tx_t, tx_list);
136                 
137                 list_del(&tx->tx_list);
138                 kptllnd_free_tx(tx);
139         }
140
141         LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
142 }
143
144 kptl_tx_t *
145 kptllnd_get_idle_tx(enum kptl_tx_type type)
146 {
147         kptl_tx_t      *tx = NULL;
148
149         if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) && 
150             type == TX_TYPE_PUT_REQUEST) {
151                 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
152                 return NULL;
153         }
154
155         if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) && 
156             type == TX_TYPE_GET_REQUEST) {
157                 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
158                 return NULL;
159         }
160
161         if (IS_SIMULATION_ENABLED(FAIL_TX)) {
162                 CERROR ("FAIL_TX SIMULATION triggered\n");
163                 return NULL;
164         }
165
166         spin_lock(&kptllnd_data.kptl_tx_lock);
167
168         if (list_empty (&kptllnd_data.kptl_idle_txs)) {
169                 spin_unlock(&kptllnd_data.kptl_tx_lock);
170
171                 tx = kptllnd_alloc_tx();
172                 if (tx == NULL)
173                         return NULL;
174         } else {
175                 tx = list_entry(kptllnd_data.kptl_idle_txs.next, 
176                                 kptl_tx_t, tx_list);
177                 list_del(&tx->tx_list);
178
179                 spin_unlock(&kptllnd_data.kptl_tx_lock);
180         }
181
182         LASSERT (atomic_read(&tx->tx_refcount)== 0);
183         LASSERT (tx->tx_idle);
184         LASSERT (!tx->tx_active);
185         LASSERT (tx->tx_lnet_msg == NULL);
186         LASSERT (tx->tx_lnet_replymsg == NULL);
187         LASSERT (tx->tx_peer == NULL);
188         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
189         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
190         
191         tx->tx_type = type;
192         atomic_set(&tx->tx_refcount, 1);
193         tx->tx_status = 0;
194         tx->tx_idle = 0;
195         tx->tx_tposted = 0;
196         tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
197
198         CDEBUG(D_NET, "tx=%p\n", tx);
199         return tx;
200 }
201
202 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
203 int
204 kptllnd_tx_abort_netio(kptl_tx_t *tx)
205 {
206         kptl_peer_t     *peer = tx->tx_peer;
207         ptl_handle_md_t  msg_mdh;
208         ptl_handle_md_t  rdma_mdh;
209         unsigned long    flags;
210
211         LASSERT (atomic_read(&tx->tx_refcount) == 0);
212         LASSERT (!tx->tx_active);
213
214         spin_lock_irqsave(&peer->peer_lock, flags);
215
216         msg_mdh = tx->tx_msg_mdh;
217         rdma_mdh = tx->tx_rdma_mdh;
218
219         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
220             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
221                 spin_unlock_irqrestore(&peer->peer_lock, flags);
222                 return 0;
223         }
224         
225         /* Uncompleted comms: there must have been some error and it must be
226          * propagated to LNET... */
227         LASSERT (tx->tx_status != 0 ||
228                  (tx->tx_lnet_msg == NULL && 
229                   tx->tx_lnet_replymsg == NULL));
230
231         /* stash the tx on its peer until it completes */
232         atomic_set(&tx->tx_refcount, 1);
233         tx->tx_active = 1;
234         list_add_tail(&tx->tx_list, &peer->peer_activeq);
235         
236         spin_unlock_irqrestore(&peer->peer_lock, flags);
237
238         /* These unlinks will ensure completion events (normal or unlink) will
239          * happen ASAP */
240
241         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE))
242                 PtlMDUnlink(msg_mdh);
243         
244         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE))
245                 PtlMDUnlink(rdma_mdh);
246
247         return -EAGAIN;
248 }
249 #else
250 int
251 kptllnd_tx_abort_netio(kptl_tx_t *tx)
252 {
253         ptl_peer_t      *peer = tx->tx_peer;
254         ptl_handle_md_t  msg_mdh;
255         ptl_handle_md_t  rdma_mdh;
256         unsigned long    flags;
257         ptl_err_t        prc;
258
259         LASSERT (atomic_read(&tx->tx_refcount) == 0);
260         LASSERT (!tx->tx_active);
261
262         spin_lock_irqsave(&peer->peer_lock, flags);
263
264         msg_mdh = tx->tx_msg_mdh;
265         rdma_mdh = tx->tx_rdma_mdh;
266
267         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
268             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
269                 spin_unlock_irqrestore(&peer->peer_lock, flags);
270                 return 0;
271         }
272         
273         /* Uncompleted comms: there must have been some error and it must be
274          * propagated to LNET... */
275         LASSERT (tx->tx_status != 0 ||
276                  (tx->tx_lnet_msg == NULL && 
277                   tx->tx_replymsg == NULL));
278
279         spin_unlock_irqrestore(&peer->peer_lock, flags);
280
281         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
282                 prc = PtlMDUnlink(msg_mdh);
283                 if (prc == PTL_OK)
284                         msg_mdh = PTL_INVALID_HANDLE;
285         }
286
287         if (!PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
288                 prc = PtlMDUnlink(rdma_mdh);
289                 if (prc == PTL_OK)
290                         rdma_mdh = PTL_INVALID_HANDLE;
291         }
292
293         spin_lock_irqsave(&peer->peer_lock, flags);
294
295         /* update tx_???_mdh if callback hasn't fired */
296         if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
297                 msg_mdh = PTL_INVALID_HANDLE;
298         else
299                 tx->tx_msg_mdh = msg_mdh;
300         
301         if (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE))
302                 rdma_mdh = PTL_INVALID_HANDLE;
303         else
304                 tx->tx_rdma_mdh = rdma_mdh;
305
306         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
307             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
308                 spin_unlock_irqrestore(&peer->peer_lock, flags);
309                 return 0;
310         }
311
312         /* stash the tx on its peer until it completes */
313         atomic_set(&tx->tx_refcount, 1);
314         tx->tx_active = 1;
315         list_add_tail(&tx->tx_list, &peer->peer_activeq);
316
317         kptllnd_peer_addref(peer);              /* extra ref for me... */
318
319         spin_unlock_irqrestore(&peer->peer_lock, flags);
320
321         /* This will get the watchdog thread to try aborting all the peer's
322          * comms again.  NB, this deems it fair that 1 failing tx which can't
323          * be aborted immediately (i.e. its MDs are still busy) is valid cause
324          * to nuke everything to the same peer! */
325         kptllnd_peer_close(peer, tx->tx_status);
326
327         kptllnd_peer_decref(peer);
328
329         return -EAGAIN;
330 }
331 #endif
332
333 void
334 kptllnd_tx_fini (kptl_tx_t *tx)
335 {
336         lnet_msg_t     *replymsg = tx->tx_lnet_replymsg;
337         lnet_msg_t     *msg      = tx->tx_lnet_msg;
338         kptl_peer_t    *peer     = tx->tx_peer;
339         int             status   = tx->tx_status;
340         int             rc;
341
342         LASSERT (!in_interrupt());
343         LASSERT (atomic_read(&tx->tx_refcount) == 0);
344         LASSERT (!tx->tx_idle);
345         LASSERT (!tx->tx_active);
346
347         /* TX has completed or failed */
348
349         if (peer != NULL) {
350                 rc = kptllnd_tx_abort_netio(tx);
351                 if (rc != 0)
352                         return;
353         }
354
355         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
356         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
357
358         tx->tx_lnet_msg = tx->tx_lnet_replymsg = NULL;
359         tx->tx_peer = NULL;
360         tx->tx_idle = 1;
361
362         spin_lock(&kptllnd_data.kptl_tx_lock);
363         list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
364         spin_unlock(&kptllnd_data.kptl_tx_lock);
365
366         /* Must finalize AFTER freeing 'tx' */
367         if (msg != NULL)
368                 lnet_finalize(kptllnd_data.kptl_ni, msg,
369                               (replymsg == NULL) ? status : 0);
370
371         if (replymsg != NULL)
372                 lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);
373
374         if (peer != NULL)
375                 kptllnd_peer_decref(peer);
376 }
377
378 const char *
379 kptllnd_tx_typestr(int type)
380 {
381         switch (type) {
382         default:
383                 return "<TYPE UNKNOWN>";
384                 
385         case TX_TYPE_SMALL_MESSAGE:
386                 return "msg";
387
388         case TX_TYPE_PUT_REQUEST:
389                 return "put_req";
390
391         case TX_TYPE_GET_REQUEST:
392                 return "get_req";
393                 break;
394
395         case TX_TYPE_PUT_RESPONSE:
396                 return "put_rsp";
397                 break;
398
399         case TX_TYPE_GET_RESPONSE:
400                 return "get_rsp";
401         }
402 }
403
404 void
405 kptllnd_tx_callback(ptl_event_t *ev)
406 {
407         kptl_eventarg_t *eva = ev->md.user_ptr;
408         int              ismsg = (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG);
409         kptl_tx_t       *tx = kptllnd_eventarg2obj(eva);
410         kptl_peer_t     *peer = tx->tx_peer;
411         int              ok = (ev->ni_fail_type == PTL_OK);
412         int              unlinked;
413         unsigned long    flags;
414
415         LASSERT (peer != NULL);
416         LASSERT (eva->eva_type == PTLLND_EVENTARG_TYPE_MSG ||
417                  eva->eva_type == PTLLND_EVENTARG_TYPE_RDMA);
418         LASSERT (!ismsg || !PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
419         LASSERT (ismsg || !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
420
421 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
422         unlinked = ev->unlinked;
423 #else
424         unlinked = (ev->type == PTL_EVENT_UNLINK);
425 #endif
426         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
427                libcfs_id2str(peer->peer_id), peer->peer_credits,
428                peer->peer_outstanding_credits, peer->peer_sent_credits,
429                kptllnd_evtype2str(ev->type), ev->type, 
430                tx, kptllnd_errtype2str(ev->ni_fail_type),
431                ev->ni_fail_type, unlinked);
432
433         switch (tx->tx_type) {
434         default:
435                 LBUG();
436                 
437         case TX_TYPE_SMALL_MESSAGE:
438                 LASSERT (ismsg);
439                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
440                          ev->type == PTL_EVENT_SEND_END ||
441                          (ev->type == PTL_EVENT_ACK && tx->tx_acked));
442                 break;
443
444         case TX_TYPE_PUT_REQUEST:
445                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
446                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
447                          (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
448                          (!ismsg && ev->type == PTL_EVENT_GET_END));
449                 break;
450
451         case TX_TYPE_GET_REQUEST:
452                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
453                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
454                          (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
455                          (!ismsg && ev->type == PTL_EVENT_PUT_END));
456
457                 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
458                         if (ev->hdr_data == PTLLND_RDMA_OK) {
459                                 lnet_set_reply_msg_len(
460                                         kptllnd_data.kptl_ni,
461                                         tx->tx_lnet_replymsg,
462                                         ev->mlength);
463                         } else {
464                                 /* no match at peer */
465                                 tx->tx_status = -EIO;
466                         }
467                 }
468                 break;
469
470         case TX_TYPE_PUT_RESPONSE:
471                 LASSERT (!ismsg);
472                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
473                          ev->type == PTL_EVENT_SEND_END ||
474                          ev->type == PTL_EVENT_REPLY_END);
475                 break;
476
477         case TX_TYPE_GET_RESPONSE:
478                 LASSERT (!ismsg);
479                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
480                          ev->type == PTL_EVENT_SEND_END ||
481                          (ev->type == PTL_EVENT_ACK && tx->tx_acked));
482                 break;
483         }
484
485         if (ok) {
486                 kptllnd_peer_alive(peer);
487         } else {
488                 CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
489                        libcfs_id2str(peer->peer_id),
490                        kptllnd_evtype2str(ev->type), ev->type, 
491                        tx, kptllnd_errtype2str(ev->ni_fail_type),
492                        ev->ni_fail_type, unlinked);
493                 tx->tx_status = -EIO; 
494                 kptllnd_peer_close(peer, -EIO);
495                 kptllnd_schedule_ptltrace_dump();
496         }
497
498         if (!unlinked)
499                 return;
500
501         spin_lock_irqsave(&peer->peer_lock, flags);
502
503         if (ismsg)
504                 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
505         else
506                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
507
508         if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
509             !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
510             !tx->tx_active) {
511                 spin_unlock_irqrestore(&peer->peer_lock, flags);
512                 return;
513         }
514
515         list_del(&tx->tx_list);
516         tx->tx_active = 0;
517
518         spin_unlock_irqrestore(&peer->peer_lock, flags);
519
520         /* drop peer's ref, but if it was the last one... */
521         if (atomic_dec_and_test(&tx->tx_refcount)) {
522                 /* ...finalize it in thread context! */
523                 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
524
525                 list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
526                 wake_up(&kptllnd_data.kptl_sched_waitq);
527
528                 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
529         }
530 }