Whamcloud - gitweb
- minor code cleanups.
[fs/lustre-release.git] / lnet / klnds / o2iblnd / o2iblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/o2iblnd/o2iblnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "o2iblnd.h"
42
43 lnd_t the_kiblnd = {
44         .lnd_type       = O2IBLND,
45         .lnd_startup    = kiblnd_startup,
46         .lnd_shutdown   = kiblnd_shutdown,
47         .lnd_ctl        = kiblnd_ctl,
48         .lnd_send       = kiblnd_send,
49         .lnd_recv       = kiblnd_recv,
50 };
51
52 kib_data_t              kiblnd_data;
53
54 __u32
55 kiblnd_cksum (void *ptr, int nob)
56 {
57         char  *c  = ptr;
58         __u32  sum = 0;
59
60         while (nob-- > 0)
61                 sum = ((sum << 1) | (sum >> 31)) + *c++;
62
63         /* ensure I don't return 0 (== no checksum) */
64         return (sum == 0) ? 1 : sum;
65 }
66
67 void
68 kiblnd_init_msg (kib_msg_t *msg, int type, int body_nob)
69 {
70         msg->ibm_type = type;
71         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
72 }
73
74 void
75 kiblnd_pack_msg (lnet_ni_t *ni, kib_msg_t *msg,
76                  int credits, lnet_nid_t dstnid, __u64 dststamp)
77 {
78         kib_net_t *net = ni->ni_data;
79
80         /* CAVEAT EMPTOR! all message fields not set here should have been
81          * initialised previously. */
82         msg->ibm_magic    = IBLND_MSG_MAGIC;
83         msg->ibm_version  = IBLND_MSG_VERSION;
84         /*   ibm_type */
85         msg->ibm_credits  = credits;
86         /*   ibm_nob */
87         msg->ibm_cksum    = 0;
88         msg->ibm_srcnid   = ni->ni_nid;
89         msg->ibm_srcstamp = net->ibn_incarnation;
90         msg->ibm_dstnid   = dstnid;
91         msg->ibm_dststamp = dststamp;
92
93         if (*kiblnd_tunables.kib_cksum) {
94                 /* NB ibm_cksum zero while computing cksum */
95                 msg->ibm_cksum = kiblnd_cksum(msg, msg->ibm_nob);
96         }
97 }
98
99 int
100 kiblnd_unpack_msg(kib_msg_t *msg, int nob)
101 {
102         const int hdr_size = offsetof(kib_msg_t, ibm_u);
103         __u32     msg_cksum;
104         int       flip;
105         int       msg_nob;
106 #if !IBLND_MAP_ON_DEMAND
107         int       i;
108         int       n;
109 #endif
110         /* 6 bytes are enough to have received magic + version */
111         if (nob < 6) {
112                 CERROR("Short message: %d\n", nob);
113                 return -EPROTO;
114         }
115
116         if (msg->ibm_magic == IBLND_MSG_MAGIC) {
117                 flip = 0;
118         } else if (msg->ibm_magic == __swab32(IBLND_MSG_MAGIC)) {
119                 flip = 1;
120         } else {
121                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
122                 return -EPROTO;
123         }
124
125         if (msg->ibm_version !=
126             (flip ? __swab16(IBLND_MSG_VERSION) : IBLND_MSG_VERSION)) {
127                 CERROR("Bad version: %d\n", msg->ibm_version);
128                 return -EPROTO;
129         }
130
131         if (nob < hdr_size) {
132                 CERROR("Short message: %d\n", nob);
133                 return -EPROTO;
134         }
135
136         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
137         if (msg_nob > nob) {
138                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
139                 return -EPROTO;
140         }
141
142         /* checksum must be computed with ibm_cksum zero and BEFORE anything
143          * gets flipped */
144         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
145         msg->ibm_cksum = 0;
146         if (msg_cksum != 0 &&
147             msg_cksum != kiblnd_cksum(msg, msg_nob)) {
148                 CERROR("Bad checksum\n");
149                 return -EPROTO;
150         }
151         msg->ibm_cksum = msg_cksum;
152
153         if (flip) {
154                 /* leave magic unflipped as a clue to peer endianness */
155                 __swab16s(&msg->ibm_version);
156                 CLASSERT (sizeof(msg->ibm_type) == 1);
157                 CLASSERT (sizeof(msg->ibm_credits) == 1);
158                 msg->ibm_nob = msg_nob;
159                 __swab64s(&msg->ibm_srcnid);
160                 __swab64s(&msg->ibm_srcstamp);
161                 __swab64s(&msg->ibm_dstnid);
162                 __swab64s(&msg->ibm_dststamp);
163         }
164
165         if (msg->ibm_srcnid == LNET_NID_ANY) {
166                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
167                 return -EPROTO;
168         }
169
170         switch (msg->ibm_type) {
171         default:
172                 CERROR("Unknown message type %x\n", msg->ibm_type);
173                 return -EPROTO;
174
175         case IBLND_MSG_NOOP:
176                 break;
177
178         case IBLND_MSG_IMMEDIATE:
179                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
180                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
181                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
182                         return -EPROTO;
183                 }
184                 break;
185
186         case IBLND_MSG_PUT_REQ:
187                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
188                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
189                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
190                         return -EPROTO;
191                 }
192                 break;
193
194         case IBLND_MSG_PUT_ACK:
195                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
196                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
197                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
198                         return -EPROTO;
199                 }
200 #if IBLND_MAP_ON_DEMAND
201                 if (flip) {
202                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
203                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
204                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
205                 }
206 #else
207                 if (flip) {
208                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
209                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrags);
210                 }
211
212                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrags;
213                 if (n <= 0 || n > IBLND_MAX_RDMA_FRAGS) {
214                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n", 
215                                n, IBLND_MAX_RDMA_FRAGS);
216                         return -EPROTO;
217                 }
218
219                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
220                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
221                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
222                         return -EPROTO;
223                 }
224
225                 if (flip) {
226                         for (i = 0; i < n; i++) {
227                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
228                                 __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr);
229                         }
230                 }
231 #endif
232                 break;
233
234         case IBLND_MSG_GET_REQ:
235                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
236                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
237                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
238                         return -EPROTO;
239                 }
240 #if IBLND_MAP_ON_DEMAND
241                 if (flip) {
242                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
243                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
244                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
245                 }
246 #else
247                 if (flip) {
248                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
249                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrags);
250                 }
251
252                 n = msg->ibm_u.get.ibgm_rd.rd_nfrags;
253                 if (n <= 0 || n > IBLND_MAX_RDMA_FRAGS) {
254                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n", 
255                                n, IBLND_MAX_RDMA_FRAGS);
256                         return -EPROTO;
257                 }
258                 
259                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
260                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
261                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
262                         return -EPROTO;
263                 }
264                 
265                 if (flip)
266                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrags; i++) {
267                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
268                                 __swab64s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr);
269                         }
270 #endif
271                 break;
272
273         case IBLND_MSG_PUT_NAK:
274         case IBLND_MSG_PUT_DONE:
275         case IBLND_MSG_GET_DONE:
276                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
277                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
278                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
279                         return -EPROTO;
280                 }
281                 if (flip)
282                         __swab32s(&msg->ibm_u.completion.ibcm_status);
283                 break;
284
285         case IBLND_MSG_CONNREQ:
286         case IBLND_MSG_CONNACK:
287                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
288                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
289                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
290                         return -EPROTO;
291                 }
292                 if (flip) {
293                         __swab16s(&msg->ibm_u.connparams.ibcp_queue_depth);
294                         __swab16s(&msg->ibm_u.connparams.ibcp_max_frags);
295                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
296                 }
297                 break;
298         }
299         return 0;
300 }
301
302 int
303 kiblnd_create_peer (lnet_ni_t *ni, kib_peer_t **peerp, lnet_nid_t nid)
304 {
305         kib_peer_t     *peer;
306         kib_net_t      *net = ni->ni_data;
307         unsigned long   flags;
308
309         LASSERT (net != NULL);
310         LASSERT (nid != LNET_NID_ANY);
311
312         LIBCFS_ALLOC(peer, sizeof(*peer));
313         if (peer == NULL) {
314                 CERROR("Cannot allocate peer\n");
315                 return -ENOMEM;
316         }
317
318         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
319
320         peer->ibp_ni = ni;
321         peer->ibp_nid = nid;
322         peer->ibp_error = 0;
323         peer->ibp_last_alive = cfs_time_current();
324         atomic_set(&peer->ibp_refcount, 1);     /* 1 ref for caller */
325
326         INIT_LIST_HEAD(&peer->ibp_list);       /* not in the peer table yet */
327         INIT_LIST_HEAD(&peer->ibp_conns);
328         INIT_LIST_HEAD(&peer->ibp_tx_queue);
329
330         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
331
332         /* always called with a ref on ni, which prevents ni being shutdown */
333         LASSERT (net->ibn_shutdown == 0);
334
335         /* npeers only grows with the global lock held */
336         atomic_inc(&net->ibn_npeers);
337
338         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
339
340         *peerp = peer;
341         return 0;
342 }
343
344 void
345 kiblnd_destroy_peer (kib_peer_t *peer)
346 {
347         kib_net_t *net = peer->ibp_ni->ni_data;
348
349         LASSERT (net != NULL);
350         LASSERT (atomic_read(&peer->ibp_refcount) == 0);
351         LASSERT (!kiblnd_peer_active(peer));
352         LASSERT (peer->ibp_connecting == 0);
353         LASSERT (peer->ibp_accepting == 0);
354         LASSERT (list_empty(&peer->ibp_conns));
355         LASSERT (list_empty(&peer->ibp_tx_queue));
356
357         LIBCFS_FREE(peer, sizeof(*peer));
358
359         /* NB a peer's connections keep a reference on their peer until
360          * they are destroyed, so we can be assured that _all_ state to do
361          * with this peer has been cleaned up when its refcount drops to
362          * zero. */
363         atomic_dec(&net->ibn_npeers);
364 }
365
366 void
367 kiblnd_destroy_dev (kib_dev_t *dev)
368 {
369         LASSERT (dev->ibd_nnets == 0);
370
371         if (!list_empty(&dev->ibd_list)) /* on kib_devs? */
372                 list_del_init(&dev->ibd_list);
373
374         if (dev->ibd_mr != NULL)
375                 ib_dereg_mr(dev->ibd_mr);
376
377         if (dev->ibd_pd != NULL)
378                 ib_dealloc_pd(dev->ibd_pd);
379
380         if (dev->ibd_cmid != NULL)
381                 rdma_destroy_id(dev->ibd_cmid);
382
383         LIBCFS_FREE(dev, sizeof(*dev));
384 }
385
386 kib_peer_t *
387 kiblnd_find_peer_locked (lnet_nid_t nid)
388 {
389         /* the caller is responsible for accounting the additional reference
390          * that this creates */
391         struct list_head *peer_list = kiblnd_nid2peerlist(nid);
392         struct list_head *tmp;
393         kib_peer_t       *peer;
394
395         list_for_each (tmp, peer_list) {
396
397                 peer = list_entry(tmp, kib_peer_t, ibp_list);
398
399                 LASSERT (peer->ibp_connecting > 0 || /* creating conns */
400                          peer->ibp_accepting > 0 ||
401                          !list_empty(&peer->ibp_conns));  /* active conn */
402
403                 if (peer->ibp_nid != nid)
404                         continue;
405
406                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
407                        peer, libcfs_nid2str(nid),
408                        atomic_read(&peer->ibp_refcount));
409                 return peer;
410         }
411         return NULL;
412 }
413
414 void
415 kiblnd_unlink_peer_locked (kib_peer_t *peer)
416 {
417         LASSERT (list_empty(&peer->ibp_conns));
418
419         LASSERT (kiblnd_peer_active(peer));
420         list_del_init(&peer->ibp_list);
421         /* lose peerlist's ref */
422         kiblnd_peer_decref(peer);
423 }
424
425 int
426 kiblnd_get_peer_info (lnet_ni_t *ni, int index, 
427                       lnet_nid_t *nidp, int *count)
428 {
429         kib_peer_t        *peer;
430         struct list_head  *ptmp;
431         int                i;
432         unsigned long      flags;
433
434         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
435
436         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
437
438                 list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
439
440                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
441                         LASSERT (peer->ibp_connecting > 0 ||
442                                  peer->ibp_accepting > 0 ||
443                                  !list_empty(&peer->ibp_conns));
444
445                         if (peer->ibp_ni != ni)
446                                 continue;
447
448                         if (index-- > 0)
449                                 continue;
450
451                         *nidp = peer->ibp_nid;
452                         *count = atomic_read(&peer->ibp_refcount);
453
454                         read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
455                                                flags);
456                         return 0;
457                 }
458         }
459
460         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
461         return -ENOENT;
462 }
463
464 void
465 kiblnd_del_peer_locked (kib_peer_t *peer)
466 {
467         struct list_head *ctmp;
468         struct list_head *cnxt;
469         kib_conn_t       *conn;
470
471         if (list_empty(&peer->ibp_conns)) {
472                 kiblnd_unlink_peer_locked(peer);
473         } else {
474                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
475                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
476
477                         kiblnd_close_conn_locked(conn, 0);
478                 }
479                 /* NB closing peer's last conn unlinked it. */
480         }
481         /* NB peer now unlinked; might even be freed if the peer table had the
482          * last ref on it. */
483 }
484
485 int
486 kiblnd_del_peer (lnet_ni_t *ni, lnet_nid_t nid)
487 {
488         CFS_LIST_HEAD     (zombies);
489         struct list_head  *ptmp;
490         struct list_head  *pnxt;
491         kib_peer_t        *peer;
492         int                lo;
493         int                hi;
494         int                i;
495         unsigned long      flags;
496         int                rc = -ENOENT;
497
498         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
499
500         if (nid != LNET_NID_ANY) {
501                 lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
502         } else {
503                 lo = 0;
504                 hi = kiblnd_data.kib_peer_hash_size - 1;
505         }
506
507         for (i = lo; i <= hi; i++) {
508                 list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
509                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
510                         LASSERT (peer->ibp_connecting > 0 ||
511                                  peer->ibp_accepting > 0 ||
512                                  !list_empty(&peer->ibp_conns));
513
514                         if (peer->ibp_ni != ni)
515                                 continue;
516
517                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
518                                 continue;
519
520                         if (!list_empty(&peer->ibp_tx_queue)) {
521                                 LASSERT (list_empty(&peer->ibp_conns));
522
523                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
524                         }
525
526                         kiblnd_del_peer_locked(peer);
527                         rc = 0;         /* matched something */
528                 }
529         }
530
531         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
532
533         kiblnd_txlist_done(ni, &zombies, -EIO);
534
535         return rc;
536 }
537
538 kib_conn_t *
539 kiblnd_get_conn_by_idx (lnet_ni_t *ni, int index)
540 {
541         kib_peer_t        *peer;
542         struct list_head  *ptmp;
543         kib_conn_t        *conn;
544         struct list_head  *ctmp;
545         int                i;
546         unsigned long      flags;
547
548         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
549
550         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
551                 list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
552
553                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
554                         LASSERT (peer->ibp_connecting > 0 ||
555                                  peer->ibp_accepting > 0 ||
556                                  !list_empty(&peer->ibp_conns));
557
558                         if (peer->ibp_ni != ni)
559                                 continue;
560
561                         list_for_each (ctmp, &peer->ibp_conns) {
562                                 if (index-- > 0)
563                                         continue;
564
565                                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
566                                 kiblnd_conn_addref(conn);
567                                 read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
568                                                        flags);
569                                 return conn;
570                         }
571                 }
572         }
573
574         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
575         return NULL;
576 }
577
578 void
579 kiblnd_debug_rx (kib_rx_t *rx)
580 {
581         CDEBUG(D_CONSOLE, "      %p status %d msg_type %x cred %d\n",
582                rx, rx->rx_status, rx->rx_msg->ibm_type,
583                rx->rx_msg->ibm_credits);
584 }
585
586 void
587 kiblnd_debug_tx (kib_tx_t *tx)
588 {
589         CDEBUG(D_CONSOLE, "      %p snd %d q %d w %d rc %d dl %lx "
590                "cookie "LPX64" msg %s%s type %x cred %d\n",
591                tx, tx->tx_sending, tx->tx_queued, tx->tx_waiting,
592                tx->tx_status, tx->tx_deadline, tx->tx_cookie,
593                tx->tx_lntmsg[0] == NULL ? "-" : "!",
594                tx->tx_lntmsg[1] == NULL ? "-" : "!",
595                tx->tx_msg->ibm_type, tx->tx_msg->ibm_credits);
596 }
597
598 void
599 kiblnd_debug_conn (kib_conn_t *conn)
600 {
601         struct list_head *tmp;
602         int               i;
603
604         spin_lock(&conn->ibc_lock);
605
606         CDEBUG(D_CONSOLE, "conn[%d] %p -> %s: \n",
607                atomic_read(&conn->ibc_refcount), conn,
608                libcfs_nid2str(conn->ibc_peer->ibp_nid));
609         CDEBUG(D_CONSOLE, "   state %d nposted %d cred %d o_cred %d r_cred %d\n",
610                conn->ibc_state, conn->ibc_nsends_posted, conn->ibc_credits, 
611                conn->ibc_outstanding_credits, conn->ibc_reserved_credits);
612         CDEBUG(D_CONSOLE, "   comms_err %d\n", conn->ibc_comms_error);
613
614         CDEBUG(D_CONSOLE, "   early_rxs:\n");
615         list_for_each(tmp, &conn->ibc_early_rxs)
616                 kiblnd_debug_rx(list_entry(tmp, kib_rx_t, rx_list));
617
618         CDEBUG(D_CONSOLE, "   tx_noops:\n");
619         list_for_each(tmp, &conn->ibc_tx_noops)
620                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
621
622         CDEBUG(D_CONSOLE, "   tx_queue_nocred:\n");
623         list_for_each(tmp, &conn->ibc_tx_queue_nocred)
624                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
625
626         CDEBUG(D_CONSOLE, "   tx_queue_rsrvd:\n");
627         list_for_each(tmp, &conn->ibc_tx_queue_rsrvd)
628                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
629
630         CDEBUG(D_CONSOLE, "   tx_queue:\n");
631         list_for_each(tmp, &conn->ibc_tx_queue)
632                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
633
634         CDEBUG(D_CONSOLE, "   active_txs:\n");
635         list_for_each(tmp, &conn->ibc_active_txs)
636                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
637
638         CDEBUG(D_CONSOLE, "   rxs:\n");
639         for (i = 0; i < IBLND_RX_MSGS; i++)
640                 kiblnd_debug_rx(&conn->ibc_rxs[i]);
641
642         spin_unlock(&conn->ibc_lock);
643 }
644
645 kib_conn_t *
646 kiblnd_create_conn (kib_peer_t *peer, struct rdma_cm_id *cmid, int state)
647 {
648         /* CAVEAT EMPTOR:
649          * If the new conn is created successfully it takes over the caller's
650          * ref on 'peer'.  It also "owns" 'cmid' and destroys it when it itself
651          * is destroyed.  On failure, the caller's ref on 'peer' remains and
652          * she must dispose of 'cmid'.  (Actually I'd block forever if I tried
653          * to destroy 'cmid' here since I'm called from the CM which still has
654          * its ref on 'cmid'). */
655         kib_conn_t             *conn;
656         kib_net_t              *net = peer->ibp_ni->ni_data;
657         int                     i;
658         int                     page_offset;
659         int                     ipage;
660         int                     rc;
661         struct ib_cq           *cq;
662         struct ib_qp_init_attr *init_qp_attr;
663         unsigned long           flags;
664
665         LASSERT (net != NULL);
666         LASSERT (!in_interrupt());
667
668         LIBCFS_ALLOC(init_qp_attr, sizeof(*init_qp_attr));
669         if (init_qp_attr == NULL) {
670                 CERROR("Can't allocate qp_attr for %s\n",
671                        libcfs_nid2str(peer->ibp_nid));
672                 goto failed_0;
673         }
674
675         LIBCFS_ALLOC(conn, sizeof(*conn));
676         if (conn == NULL) {
677                 CERROR("Can't allocate connection for %s\n",
678                        libcfs_nid2str(peer->ibp_nid));
679                 goto failed_1;
680         }
681
682         memset(conn, 0, sizeof(*conn)); /* zero flags, NULL pointers etc... */
683
684         conn->ibc_state = IBLND_CONN_INIT;
685         conn->ibc_peer = peer;                  /* I take the caller's ref */
686         cmid->context = conn;                   /* for future CM callbacks */
687         conn->ibc_cmid = cmid;
688
689         INIT_LIST_HEAD(&conn->ibc_early_rxs);
690         INIT_LIST_HEAD(&conn->ibc_tx_noops);
691         INIT_LIST_HEAD(&conn->ibc_tx_queue);
692         INIT_LIST_HEAD(&conn->ibc_tx_queue_rsrvd);
693         INIT_LIST_HEAD(&conn->ibc_tx_queue_nocred);
694         INIT_LIST_HEAD(&conn->ibc_active_txs);
695         spin_lock_init(&conn->ibc_lock);
696
697         LIBCFS_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
698         if (conn->ibc_connvars == NULL) {
699                 CERROR("Can't allocate in-progress connection state\n");
700                 goto failed_2;
701         }
702         memset(conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));
703
704         LIBCFS_ALLOC(conn->ibc_rxs, IBLND_RX_MSGS * sizeof(kib_rx_t));
705         if (conn->ibc_rxs == NULL) {
706                 CERROR("Cannot allocate RX buffers\n");
707                 goto failed_2;
708         }
709         memset(conn->ibc_rxs, 0, IBLND_RX_MSGS * sizeof(kib_rx_t));
710
711         rc = kiblnd_alloc_pages(&conn->ibc_rx_pages, IBLND_RX_MSG_PAGES);
712         if (rc != 0)
713                 goto failed_2;
714
715         for (i = ipage = page_offset = 0; i < IBLND_RX_MSGS; i++) {
716                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
717                 kib_rx_t    *rx = &conn->ibc_rxs[i];
718
719                 rx->rx_conn = conn;
720                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) +
721                                            page_offset);
722                 rx->rx_msgaddr = kiblnd_dma_map_single(cmid->device,
723                                                        rx->rx_msg, IBLND_MSG_SIZE,
724                                                        DMA_FROM_DEVICE);
725                 KIBLND_UNMAP_ADDR_SET(rx, rx_msgunmap, rx->rx_msgaddr);
726
727                 CDEBUG(D_NET,"rx %d: %p "LPX64"("LPX64")\n",
728                        i, rx->rx_msg, rx->rx_msgaddr,
729                        lnet_page2phys(page) + page_offset);
730
731                 page_offset += IBLND_MSG_SIZE;
732                 LASSERT (page_offset <= PAGE_SIZE);
733
734                 if (page_offset == PAGE_SIZE) {
735                         page_offset = 0;
736                         ipage++;
737                         LASSERT (ipage <= IBLND_RX_MSG_PAGES);
738                 }
739         }
740
741 #ifdef HAVE_OFED_IB_COMP_VECTOR
742         cq = ib_create_cq(cmid->device,
743                           kiblnd_cq_completion, kiblnd_cq_event, conn,
744                           IBLND_CQ_ENTRIES(), 0);
745 #else
746         cq = ib_create_cq(cmid->device,
747                           kiblnd_cq_completion, kiblnd_cq_event, conn,
748                           IBLND_CQ_ENTRIES());
749 #endif
750         if (!IS_ERR(cq)) {
751                 conn->ibc_cq = cq;
752         } else {
753                 CERROR("Can't create CQ: %ld\n", PTR_ERR(cq));
754                 goto failed_2;
755         }
756
757         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
758         if (rc != 0) {
759                 CERROR("Can't request completion notificiation: %d\n", rc);
760                 goto failed_2;
761         }
762
763         memset(init_qp_attr, 0, sizeof(*init_qp_attr));
764         init_qp_attr->event_handler = kiblnd_qp_event;
765         init_qp_attr->qp_context = conn;
766         init_qp_attr->cap.max_send_wr = IBLND_SEND_WRS;
767         init_qp_attr->cap.max_recv_wr = IBLND_RECV_WRS;
768         init_qp_attr->cap.max_send_sge = 1;
769         init_qp_attr->cap.max_recv_sge = 1;
770         init_qp_attr->sq_sig_type = IB_SIGNAL_REQ_WR;
771         init_qp_attr->qp_type = IB_QPT_RC;
772         init_qp_attr->send_cq = cq;
773         init_qp_attr->recv_cq = cq;
774
775         rc = 0;
776         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
777         switch (*kiblnd_tunables.kib_ib_mtu) {
778         default:
779                 rc = *kiblnd_tunables.kib_ib_mtu;
780                 /* fall through to... */
781         case 0: /* set tunable to the default
782                  * CAVEAT EMPTOR! this assumes the default is one of the MTUs
783                  * below, otherwise we'll WARN on the next QP create */
784                 *kiblnd_tunables.kib_ib_mtu =
785                         ib_mtu_enum_to_int(cmid->route.path_rec->mtu);
786                 break;
787         case 256:
788                 cmid->route.path_rec->mtu = IB_MTU_256;
789                 break;
790         case 512:
791                 cmid->route.path_rec->mtu = IB_MTU_512;
792                 break;
793         case 1024:
794                 cmid->route.path_rec->mtu = IB_MTU_1024;
795                 break;
796         case 2048:
797                 cmid->route.path_rec->mtu = IB_MTU_2048;
798                 break;
799         case 4096:
800                 cmid->route.path_rec->mtu = IB_MTU_4096;
801                 break;
802         }
803         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
804
805         if (rc != 0)
806                 CWARN("Invalid IB MTU value %d, using default value %d\n",
807                       rc, *kiblnd_tunables.kib_ib_mtu);
808                                 
809         rc = rdma_create_qp(cmid, net->ibn_dev->ibd_pd, init_qp_attr);
810         if (rc != 0) {
811                 CERROR("Can't create QP: %d\n", rc);
812                 goto failed_2;
813         }
814
815         LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
816
817         /* 1 ref for caller and each rxmsg */
818         atomic_set(&conn->ibc_refcount, 1 + IBLND_RX_MSGS);
819         conn->ibc_nrx = IBLND_RX_MSGS;
820
821         /* post receives */
822         for (i = 0; i < IBLND_RX_MSGS; i++) {
823                 rc = kiblnd_post_rx(&conn->ibc_rxs[i],
824                                     IBLND_POSTRX_NO_CREDIT);
825                 if (rc != 0) {
826                         CERROR("Can't post rxmsg: %d\n", rc);
827
828                         /* Make posted receives complete */
829                         kiblnd_abort_receives(conn);
830
831                         /* correct # of posted buffers 
832                          * NB locking needed now I'm racing with completion */
833                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
834                         conn->ibc_nrx -= IBLND_RX_MSGS - i;
835                         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
836                                                flags);
837
838                         /* Drop my own and unused rxbuffer refcounts */
839                         while (i++ <= IBLND_RX_MSGS)
840                                 kiblnd_conn_decref(conn);
841
842                         return NULL;
843                 }
844         }
845         
846         /* Init successful! */
847         LASSERT (state == IBLND_CONN_ACTIVE_CONNECT ||
848                  state == IBLND_CONN_PASSIVE_WAIT);
849         conn->ibc_state = state;
850
851         /* 1 more conn */
852         atomic_inc(&net->ibn_nconns);
853         return conn;
854
855  failed_2:
856         kiblnd_destroy_conn(conn);
857  failed_1:
858         LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
859  failed_0:
860         return NULL;
861 }
862
863 void
864 kiblnd_destroy_conn (kib_conn_t *conn)
865 {
866         struct rdma_cm_id *cmid = conn->ibc_cmid;
867         kib_peer_t        *peer = conn->ibc_peer;
868         int                rc;
869         int                i;
870
871         LASSERT (!in_interrupt());
872         LASSERT (atomic_read(&conn->ibc_refcount) == 0);
873         LASSERT (list_empty(&conn->ibc_early_rxs));
874         LASSERT (list_empty(&conn->ibc_tx_noops));
875         LASSERT (list_empty(&conn->ibc_tx_queue));
876         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
877         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
878         LASSERT (list_empty(&conn->ibc_active_txs));
879         LASSERT (conn->ibc_nsends_posted == 0);
880
881         switch (conn->ibc_state) {
882         default:
883                 /* conn must be completely disengaged from the network */
884                 LBUG();
885
886         case IBLND_CONN_DISCONNECTED:
887                 /* connvars should have been freed already */
888                 LASSERT (conn->ibc_connvars == NULL);
889                 break;
890
891         case IBLND_CONN_INIT:
892                 break;
893         }
894
895         if (cmid->qp != NULL)
896                 rdma_destroy_qp(cmid);
897
898         if (conn->ibc_cq != NULL) {
899                 rc = ib_destroy_cq(conn->ibc_cq);
900                 if (rc != 0)
901                         CWARN("Error destroying CQ: %d\n", rc);
902         }
903
904         if (conn->ibc_rx_pages != NULL) {
905                 LASSERT (conn->ibc_rxs != NULL);
906
907                 for (i = 0; i < IBLND_RX_MSGS; i++) {
908                         kib_rx_t *rx = &conn->ibc_rxs[i];
909
910                         LASSERT (rx->rx_nob >= 0); /* not posted */
911
912                         kiblnd_dma_unmap_single(cmid->device,
913                                                 KIBLND_UNMAP_ADDR(rx, rx_msgunmap,
914                                                                   rx->rx_msgaddr),
915                                                 IBLND_MSG_SIZE, DMA_FROM_DEVICE);
916                 }
917
918                 kiblnd_free_pages(conn->ibc_rx_pages);
919         }
920
921         if (conn->ibc_rxs != NULL) {
922                 LIBCFS_FREE(conn->ibc_rxs,
923                             IBLND_RX_MSGS * sizeof(kib_rx_t));
924         }
925
926         if (conn->ibc_connvars != NULL)
927                 LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
928
929         /* See CAVEAT EMPTOR above in kiblnd_create_conn */
930         if (conn->ibc_state != IBLND_CONN_INIT) {
931                 kib_net_t *net = peer->ibp_ni->ni_data;
932
933                 kiblnd_peer_decref(peer);
934                 rdma_destroy_id(cmid);
935                 atomic_dec(&net->ibn_nconns);
936         }
937
938         LIBCFS_FREE(conn, sizeof(*conn));
939 }
940
941 int
942 kiblnd_close_peer_conns_locked (kib_peer_t *peer, int why)
943 {
944         kib_conn_t         *conn;
945         struct list_head   *ctmp;
946         struct list_head   *cnxt;
947         int                 count = 0;
948
949         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
950                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
951
952                 count++;
953                 kiblnd_close_conn_locked(conn, why);
954         }
955
956         return count;
957 }
958
959 int
960 kiblnd_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
961 {
962         kib_conn_t         *conn;
963         struct list_head   *ctmp;
964         struct list_head   *cnxt;
965         int                 count = 0;
966
967         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
968                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
969
970                 if (conn->ibc_incarnation == incarnation)
971                         continue;
972
973                 CDEBUG(D_NET, "Closing stale conn -> %s incarnation:"LPX64"("LPX64")\n",
974                        libcfs_nid2str(peer->ibp_nid),
975                        conn->ibc_incarnation, incarnation);
976
977                 count++;
978                 kiblnd_close_conn_locked(conn, -ESTALE);
979         }
980
981         return count;
982 }
983
984 int
985 kiblnd_close_matching_conns (lnet_ni_t *ni, lnet_nid_t nid)
986 {
987         kib_peer_t         *peer;
988         struct list_head   *ptmp;
989         struct list_head   *pnxt;
990         int                 lo;
991         int                 hi;
992         int                 i;
993         unsigned long       flags;
994         int                 count = 0;
995
996         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
997
998         if (nid != LNET_NID_ANY)
999                 lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
1000         else {
1001                 lo = 0;
1002                 hi = kiblnd_data.kib_peer_hash_size - 1;
1003         }
1004
1005         for (i = lo; i <= hi; i++) {
1006                 list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
1007
1008                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
1009                         LASSERT (peer->ibp_connecting > 0 ||
1010                                  peer->ibp_accepting > 0 ||
1011                                  !list_empty(&peer->ibp_conns));
1012
1013                         if (peer->ibp_ni != ni)
1014                                 continue;
1015
1016                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1017                                 continue;
1018
1019                         count += kiblnd_close_peer_conns_locked(peer, 0);
1020                 }
1021         }
1022
1023         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1024
1025         /* wildcards always succeed */
1026         if (nid == LNET_NID_ANY)
1027                 return 0;
1028
1029         return (count == 0) ? -ENOENT : 0;
1030 }
1031
1032 int
1033 kiblnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1034 {
1035         struct libcfs_ioctl_data *data = arg;
1036         int                       rc = -EINVAL;
1037
1038         switch(cmd) {
1039         case IOC_LIBCFS_GET_PEER: {
1040                 lnet_nid_t   nid = 0;
1041                 int          count = 0;
1042
1043                 rc = kiblnd_get_peer_info(ni, data->ioc_count,
1044                                           &nid, &count);
1045                 data->ioc_nid    = nid;
1046                 data->ioc_count  = count;
1047                 break;
1048         }
1049
1050         case IOC_LIBCFS_DEL_PEER: {
1051                 rc = kiblnd_del_peer(ni, data->ioc_nid);
1052                 break;
1053         }
1054         case IOC_LIBCFS_GET_CONN: {
1055                 kib_conn_t *conn = kiblnd_get_conn_by_idx(ni, data->ioc_count);
1056
1057                 if (conn == NULL) {
1058                         rc = -ENOENT;
1059                 } else {
1060                         // kiblnd_debug_conn(conn);
1061                         rc = 0;
1062                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1063                         kiblnd_conn_decref(conn);
1064                 }
1065                 break;
1066         }
1067         case IOC_LIBCFS_CLOSE_CONNECTION: {
1068                 rc = kiblnd_close_matching_conns(ni, data->ioc_nid);
1069                 break;
1070         }
1071
1072         default:
1073                 break;
1074         }
1075
1076         return rc;
1077 }
1078
1079 void
1080 kiblnd_free_pages (kib_pages_t *p)
1081 {
1082         int         npages = p->ibp_npages;
1083         int         i;
1084
1085         for (i = 0; i < npages; i++)
1086                 if (p->ibp_pages[i] != NULL)
1087                         __free_page(p->ibp_pages[i]);
1088
1089         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1090 }
1091
1092 int
1093 kiblnd_alloc_pages (kib_pages_t **pp, int npages)
1094 {
1095         kib_pages_t   *p;
1096         int            i;
1097
1098         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1099         if (p == NULL) {
1100                 CERROR("Can't allocate descriptor for %d pages\n", npages);
1101                 return -ENOMEM;
1102         }
1103
1104         memset(p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1105         p->ibp_npages = npages;
1106
1107         for (i = 0; i < npages; i++) {
1108                 p->ibp_pages[i] = alloc_page(GFP_KERNEL);
1109                 if (p->ibp_pages[i] == NULL) {
1110                         CERROR("Can't allocate page %d of %d\n", i, npages);
1111                         kiblnd_free_pages(p);
1112                         return -ENOMEM;
1113                 }
1114         }
1115
1116         *pp = p;
1117         return 0;
1118 }
1119
1120 void
1121 kiblnd_free_tx_descs (lnet_ni_t *ni)
1122 {
1123         int        i;
1124         kib_net_t *net = ni->ni_data;
1125
1126         LASSERT (net != NULL);
1127
1128         if (net->ibn_tx_descs != NULL) {
1129                 for (i = 0; i < IBLND_TX_MSGS(); i++) {
1130                         kib_tx_t *tx = &net->ibn_tx_descs[i];
1131
1132 #if IBLND_MAP_ON_DEMAND
1133                         if (tx->tx_pages != NULL)
1134                                 LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1135                                             sizeof(*tx->tx_pages));
1136 #else
1137                         if (tx->tx_wrq != NULL)
1138                                 LIBCFS_FREE(tx->tx_wrq, 
1139                                             (1 + IBLND_MAX_RDMA_FRAGS) * 
1140                                             sizeof(*tx->tx_wrq));
1141
1142                         if (tx->tx_sge != NULL)
1143                                 LIBCFS_FREE(tx->tx_sge, 
1144                                             (1 + IBLND_MAX_RDMA_FRAGS) * 
1145                                             sizeof(*tx->tx_sge));
1146
1147                         if (tx->tx_rd != NULL)
1148                                 LIBCFS_FREE(tx->tx_rd, 
1149                                             offsetof(kib_rdma_desc_t, 
1150                                                rd_frags[IBLND_MAX_RDMA_FRAGS]));
1151
1152                         if (tx->tx_frags != NULL)
1153                                 LIBCFS_FREE(tx->tx_frags, 
1154                                             IBLND_MAX_RDMA_FRAGS *
1155                                             sizeof(*tx->tx_frags));
1156 #endif
1157                 }
1158
1159                 LIBCFS_FREE(net->ibn_tx_descs,
1160                             IBLND_TX_MSGS() * sizeof(kib_tx_t));
1161         }
1162
1163         if (net->ibn_tx_pages != NULL)
1164                 kiblnd_free_pages(net->ibn_tx_pages);
1165 }
1166
1167 int
1168 kiblnd_alloc_tx_descs (lnet_ni_t *ni)
1169 {
1170         int        i;
1171         int        rc;
1172         kib_net_t *net = ni->ni_data;
1173
1174         LASSERT (net != NULL);
1175
1176         rc = kiblnd_alloc_pages(&net->ibn_tx_pages, IBLND_TX_MSG_PAGES());
1177
1178         if (rc != 0) {
1179                 CERROR("Can't allocate tx pages\n");
1180                 return rc;
1181         }
1182
1183         LIBCFS_ALLOC (net->ibn_tx_descs,
1184                       IBLND_TX_MSGS() * sizeof(kib_tx_t));
1185         if (net->ibn_tx_descs == NULL) {
1186                 CERROR("Can't allocate %d tx descriptors\n", IBLND_TX_MSGS());
1187                 return -ENOMEM;
1188         }
1189
1190         memset(net->ibn_tx_descs, 0,
1191                IBLND_TX_MSGS() * sizeof(kib_tx_t));
1192
1193         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1194                 kib_tx_t *tx = &net->ibn_tx_descs[i];
1195
1196 #if IBLND_MAP_ON_DEMAND
1197                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1198                              sizeof(*tx->tx_pages));
1199                 if (tx->tx_pages == NULL) {
1200                         CERROR("Can't allocate phys page vector[%d]\n",
1201                                LNET_MAX_IOV);
1202                         return -ENOMEM;
1203                 }
1204 #else
1205                 LIBCFS_ALLOC(tx->tx_wrq,
1206                              (1 + IBLND_MAX_RDMA_FRAGS) *
1207                              sizeof(*tx->tx_wrq));
1208                 if (tx->tx_wrq == NULL)
1209                         return -ENOMEM;
1210
1211                 LIBCFS_ALLOC(tx->tx_sge,
1212                              (1 + IBLND_MAX_RDMA_FRAGS) *
1213                              sizeof(*tx->tx_sge));
1214                 if (tx->tx_sge == NULL)
1215                         return -ENOMEM;
1216
1217                 LIBCFS_ALLOC(tx->tx_rd,
1218                              offsetof(kib_rdma_desc_t,
1219                                       rd_frags[IBLND_MAX_RDMA_FRAGS]));
1220                 if (tx->tx_rd == NULL)
1221                         return -ENOMEM;
1222
1223                 LIBCFS_ALLOC(tx->tx_frags,
1224                              IBLND_MAX_RDMA_FRAGS * 
1225                              sizeof(*tx->tx_frags));
1226                 if (tx->tx_frags == NULL)
1227                         return -ENOMEM;
1228 #endif
1229         }
1230
1231         return 0;
1232 }
1233
1234 void
1235 kiblnd_unmap_tx_descs (lnet_ni_t *ni)
1236 {
1237         int             i;
1238         kib_tx_t       *tx;
1239         kib_net_t      *net = ni->ni_data;
1240
1241         LASSERT (net != NULL);
1242
1243         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1244                 tx = &net->ibn_tx_descs[i];
1245
1246                 kiblnd_dma_unmap_single(net->ibn_dev->ibd_cmid->device,
1247                                         KIBLND_UNMAP_ADDR(tx, tx_msgunmap,
1248                                                           tx->tx_msgaddr),
1249                                         IBLND_MSG_SIZE, DMA_TO_DEVICE);
1250         }
1251 }
1252
1253 void
1254 kiblnd_map_tx_descs (lnet_ni_t *ni)
1255 {
1256         int             ipage = 0;
1257         int             page_offset = 0;
1258         int             i;
1259         struct page    *page;
1260         kib_tx_t       *tx;
1261         kib_net_t      *net = ni->ni_data;
1262
1263         LASSERT (net != NULL);
1264
1265         /* pre-mapped messages are not bigger than 1 page */
1266         CLASSERT (IBLND_MSG_SIZE <= PAGE_SIZE);
1267
1268         /* No fancy arithmetic when we do the buffer calculations */
1269         CLASSERT (PAGE_SIZE % IBLND_MSG_SIZE == 0);
1270
1271         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1272                 page = net->ibn_tx_pages->ibp_pages[ipage];
1273                 tx = &net->ibn_tx_descs[i];
1274
1275                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) +
1276                                            page_offset);
1277
1278                 tx->tx_msgaddr = kiblnd_dma_map_single(
1279                         net->ibn_dev->ibd_cmid->device,
1280                         tx->tx_msg, IBLND_MSG_SIZE, DMA_TO_DEVICE);
1281                 KIBLND_UNMAP_ADDR_SET(tx, tx_msgunmap, tx->tx_msgaddr);
1282
1283                 list_add(&tx->tx_list, &net->ibn_idle_txs);
1284
1285                 page_offset += IBLND_MSG_SIZE;
1286                 LASSERT (page_offset <= PAGE_SIZE);
1287
1288                 if (page_offset == PAGE_SIZE) {
1289                         page_offset = 0;
1290                         ipage++;
1291                         LASSERT (ipage <= IBLND_TX_MSG_PAGES());
1292                 }
1293         }
1294 }
1295
1296 void
1297 kiblnd_base_shutdown (void)
1298 {
1299         int i;
1300
1301         LASSERT (list_empty(&kiblnd_data.kib_devs));
1302
1303         CDEBUG(D_MALLOC, "before LND base cleanup: kmem %d\n",
1304                atomic_read(&libcfs_kmemory));
1305
1306         switch (kiblnd_data.kib_init) {
1307         default:
1308                 LBUG();
1309
1310         case IBLND_INIT_ALL:
1311         case IBLND_INIT_DATA:
1312                 LASSERT (kiblnd_data.kib_peers != NULL);
1313                 for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
1314                         LASSERT (list_empty(&kiblnd_data.kib_peers[i]));
1315                 }
1316                 LASSERT (list_empty(&kiblnd_data.kib_connd_zombies));
1317                 LASSERT (list_empty(&kiblnd_data.kib_connd_conns));
1318
1319                 /* flag threads to terminate; wake and wait for them to die */
1320                 kiblnd_data.kib_shutdown = 1;
1321                 wake_up_all(&kiblnd_data.kib_sched_waitq);
1322                 wake_up_all(&kiblnd_data.kib_connd_waitq);
1323
1324                 i = 2;
1325                 while (atomic_read(&kiblnd_data.kib_nthreads) != 0) {
1326                         i++;
1327                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1328                                "Waiting for %d threads to terminate\n",
1329                                atomic_read(&kiblnd_data.kib_nthreads));
1330                         cfs_pause(cfs_time_seconds(1));
1331                 }
1332
1333                 /* fall through */
1334
1335         case IBLND_INIT_NOTHING:
1336                 break;
1337         }
1338
1339         if (kiblnd_data.kib_peers != NULL)
1340                 LIBCFS_FREE(kiblnd_data.kib_peers,
1341                             sizeof(struct list_head) *
1342                             kiblnd_data.kib_peer_hash_size);
1343
1344         CDEBUG(D_MALLOC, "after LND base cleanup: kmem %d\n",
1345                atomic_read(&libcfs_kmemory));
1346
1347         kiblnd_data.kib_init = IBLND_INIT_NOTHING;
1348         PORTAL_MODULE_UNUSE;
1349 }
1350
1351 void
1352 kiblnd_shutdown (lnet_ni_t *ni)
1353 {
1354         kib_net_t        *net = ni->ni_data;
1355         rwlock_t         *g_lock = &kiblnd_data.kib_global_lock;
1356         int               i;
1357         unsigned long     flags;
1358
1359         LASSERT(kiblnd_data.kib_init == IBLND_INIT_ALL);
1360
1361         if (net == NULL)
1362                 goto out;
1363
1364         CDEBUG(D_MALLOC, "before LND net cleanup: kmem %d\n",
1365                atomic_read(&libcfs_kmemory));
1366
1367         write_lock_irqsave(g_lock, flags);
1368         net->ibn_shutdown = 1;
1369         write_unlock_irqrestore(g_lock, flags);
1370
1371         switch (net->ibn_init) {
1372         default:
1373                 LBUG();
1374
1375         case IBLND_INIT_ALL:
1376                 /* nuke all existing peers within this net */
1377                 kiblnd_del_peer(ni, LNET_NID_ANY);
1378
1379                 /* Wait for all peer state to clean up */
1380                 i = 2;
1381                 while (atomic_read(&net->ibn_npeers) != 0) {
1382                         i++;
1383                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n? */
1384                                "%s: waiting for %d peers to disconnect\n",
1385                                libcfs_nid2str(ni->ni_nid),
1386                                atomic_read(&net->ibn_npeers));
1387                         cfs_pause(cfs_time_seconds(1));
1388                 }
1389
1390                 kiblnd_unmap_tx_descs(ni);
1391
1392                 LASSERT (net->ibn_dev->ibd_nnets > 0);
1393                 net->ibn_dev->ibd_nnets--;
1394
1395                 /* fall through */
1396
1397         case IBLND_INIT_NOTHING:
1398                 LASSERT (atomic_read(&net->ibn_nconns) == 0);
1399
1400 #if IBLND_MAP_ON_DEMAND
1401                 if (net->ibn_fmrpool != NULL)
1402                         ib_destroy_fmr_pool(net->ibn_fmrpool);
1403 #endif
1404                 if (net->ibn_dev != NULL &&
1405                     net->ibn_dev->ibd_nnets == 0)
1406                         kiblnd_destroy_dev(net->ibn_dev);
1407
1408                 break;
1409         }
1410
1411         kiblnd_free_tx_descs(ni);
1412
1413         CDEBUG(D_MALLOC, "after LND net cleanup: kmem %d\n",
1414                atomic_read(&libcfs_kmemory));
1415
1416         net->ibn_init = IBLND_INIT_NOTHING;
1417         ni->ni_data = NULL;
1418         
1419         LIBCFS_FREE(net, sizeof(*net));
1420
1421 out:
1422         if (list_empty(&kiblnd_data.kib_devs))
1423                 kiblnd_base_shutdown();
1424         return;
1425 }
1426
1427 int
1428 kiblnd_base_startup (void)
1429 {
1430         int               rc;
1431         int               i;
1432
1433         LASSERT (kiblnd_data.kib_init == IBLND_INIT_NOTHING);
1434
1435         if (*kiblnd_tunables.kib_credits > *kiblnd_tunables.kib_ntx) {
1436                 CERROR("Can't set credits(%d) > ntx(%d)\n",
1437                        *kiblnd_tunables.kib_credits,
1438                        *kiblnd_tunables.kib_ntx);
1439                 return -EINVAL;
1440         }
1441
1442         PORTAL_MODULE_USE;
1443         memset(&kiblnd_data, 0, sizeof(kiblnd_data)); /* zero pointers, flags etc */
1444
1445         rwlock_init(&kiblnd_data.kib_global_lock);
1446
1447         INIT_LIST_HEAD(&kiblnd_data.kib_devs);
1448
1449         kiblnd_data.kib_peer_hash_size = IBLND_PEER_HASH_SIZE;
1450         LIBCFS_ALLOC(kiblnd_data.kib_peers,
1451                      sizeof(struct list_head) * kiblnd_data.kib_peer_hash_size);
1452         if (kiblnd_data.kib_peers == NULL) {
1453                 goto failed;
1454         }
1455         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++)
1456                 INIT_LIST_HEAD(&kiblnd_data.kib_peers[i]);
1457
1458         spin_lock_init(&kiblnd_data.kib_connd_lock);
1459         INIT_LIST_HEAD(&kiblnd_data.kib_connd_conns);
1460         INIT_LIST_HEAD(&kiblnd_data.kib_connd_zombies);
1461         init_waitqueue_head(&kiblnd_data.kib_connd_waitq);
1462
1463         spin_lock_init(&kiblnd_data.kib_sched_lock);
1464         INIT_LIST_HEAD(&kiblnd_data.kib_sched_conns);
1465         init_waitqueue_head(&kiblnd_data.kib_sched_waitq);
1466
1467         kiblnd_data.kib_error_qpa.qp_state = IB_QPS_ERR;
1468
1469         /* lists/ptrs/locks initialised */
1470         kiblnd_data.kib_init = IBLND_INIT_DATA;
1471         /*****************************************************/
1472
1473         for (i = 0; i < IBLND_N_SCHED; i++) {
1474                 rc = kiblnd_thread_start(kiblnd_scheduler, (void *)((long)i));
1475                 if (rc != 0) {
1476                         CERROR("Can't spawn o2iblnd scheduler[%d]: %d\n",
1477                                i, rc);
1478                         goto failed;
1479                 }
1480         }
1481
1482         rc = kiblnd_thread_start(kiblnd_connd, NULL);
1483         if (rc != 0) {
1484                 CERROR("Can't spawn o2iblnd connd: %d\n", rc);
1485                 goto failed;
1486         }
1487
1488         /* flag everything initialised */
1489         kiblnd_data.kib_init = IBLND_INIT_ALL;
1490         /*****************************************************/
1491
1492         return 0;
1493
1494  failed:
1495         kiblnd_base_shutdown();
1496         return -ENETDOWN;
1497 }
1498
1499 int
1500 kiblnd_startup (lnet_ni_t *ni)
1501 {
1502         char                     *ifname;
1503         kib_net_t                *net;
1504         kib_dev_t                *ibdev;
1505         struct list_head         *tmp;
1506         struct timeval            tv;
1507         int                       rc;
1508
1509         LASSERT (ni->ni_lnd == &the_kiblnd);
1510
1511         if (kiblnd_data.kib_init == IBLND_INIT_NOTHING) {
1512                 rc = kiblnd_base_startup();
1513                 if (rc != 0)
1514                         return rc;
1515         }
1516
1517         LIBCFS_ALLOC(net, sizeof(*net));
1518         ni->ni_data = net;
1519         if (net == NULL)
1520                 goto failed;
1521
1522         memset(net, 0, sizeof(*net));
1523
1524         do_gettimeofday(&tv);
1525         net->ibn_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1526
1527         ni->ni_maxtxcredits = *kiblnd_tunables.kib_credits;
1528         ni->ni_peertxcredits = *kiblnd_tunables.kib_peercredits;
1529
1530         spin_lock_init(&net->ibn_tx_lock);
1531         INIT_LIST_HEAD(&net->ibn_idle_txs);
1532
1533         rc = kiblnd_alloc_tx_descs(ni);
1534         if (rc != 0) {
1535                 CERROR("Can't allocate tx descs\n");
1536                 goto failed;
1537         }
1538
1539         if (ni->ni_interfaces[0] != NULL) {
1540                 /* Use the IPoIB interface specified in 'networks=' */
1541
1542                 CLASSERT (LNET_MAX_INTERFACES > 1);
1543                 if (ni->ni_interfaces[1] != NULL) {
1544                         CERROR("Multiple interfaces not supported\n");
1545                         goto failed;
1546                 }
1547
1548                 ifname = ni->ni_interfaces[0];
1549         } else {
1550                 ifname = *kiblnd_tunables.kib_default_ipif;
1551         }
1552
1553         if (strlen(ifname) >= sizeof(ibdev->ibd_ifname)) {
1554                 CERROR("IPoIB interface name too long: %s\n", ifname);
1555                 goto failed;
1556         }
1557
1558         ibdev = NULL;
1559         list_for_each (tmp, &kiblnd_data.kib_devs) {
1560                 ibdev = list_entry(tmp, kib_dev_t, ibd_list);
1561
1562                 if (!strcmp(&ibdev->ibd_ifname[0], ifname))
1563                         break;
1564
1565                 ibdev = NULL;
1566         }
1567
1568         if (ibdev == NULL) {
1569                 __u32                     ip;
1570                 __u32                     netmask;
1571                 int                       up;
1572                 struct rdma_cm_id        *id;
1573                 struct ib_pd             *pd;
1574                 struct ib_mr             *mr;
1575                 struct sockaddr_in        addr;
1576
1577                 rc = libcfs_ipif_query(ifname, &up, &ip, &netmask);
1578                 if (rc != 0) {
1579                         CERROR("Can't query IPoIB interface %s: %d\n",
1580                                ifname, rc);
1581                         goto failed;
1582                 }
1583
1584                 if (!up) {
1585                         CERROR("Can't query IPoIB interface %s: it's down\n",
1586                                ifname);
1587                         goto failed;
1588                 }
1589
1590                 LIBCFS_ALLOC(ibdev, sizeof(*ibdev));
1591                 if (ibdev == NULL)
1592                         goto failed;
1593
1594                 memset(ibdev, 0, sizeof(*ibdev));
1595
1596                 INIT_LIST_HEAD(&ibdev->ibd_list); /* not yet in kib_devs */
1597                 ibdev->ibd_ifip = ip;
1598                 strcpy(&ibdev->ibd_ifname[0], ifname);
1599
1600                 id = rdma_create_id(kiblnd_cm_callback, ibdev, RDMA_PS_TCP);
1601                 if (!IS_ERR(id)) {
1602                         ibdev->ibd_cmid = id;
1603                 } else {
1604                         CERROR("Can't create listen ID: %ld\n", PTR_ERR(id));
1605                         goto failed;
1606                 }
1607
1608                 memset(&addr, 0, sizeof(addr));
1609                 addr.sin_family      = AF_INET;
1610                 addr.sin_port        = htons(*kiblnd_tunables.kib_service);
1611                 addr.sin_addr.s_addr = htonl(ip);
1612
1613                 rc = rdma_bind_addr(id, (struct sockaddr *)&addr);
1614                 if (rc != 0) {
1615                         CERROR("Can't bind to %s: %d\n", ifname, rc);
1616                         goto failed;
1617                 }
1618
1619                 /* Binding should have assigned me an IB device */
1620                 LASSERT (id->device != NULL);
1621
1622                 pd = ib_alloc_pd(id->device);
1623                 if (!IS_ERR(pd)) {
1624                         ibdev->ibd_pd = pd;
1625                 } else {
1626                         CERROR("Can't allocate PD: %ld\n", PTR_ERR(pd));
1627                         goto failed;
1628                 }
1629
1630 #if IBLND_MAP_ON_DEMAND
1631                 /* MR for sends and receives */
1632                 mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE);
1633 #else
1634                 /* MR for sends, recieves _and_ RDMA...........v */
1635                 mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE |
1636                                        IB_ACCESS_REMOTE_WRITE);
1637 #endif
1638                 if (!IS_ERR(mr)) {
1639                         ibdev->ibd_mr = mr;
1640                 } else {
1641                         CERROR("Can't get MR: %ld\n", PTR_ERR(mr));
1642                         goto failed;
1643                 }
1644
1645                 rc = rdma_listen(id, 0);
1646                 if (rc != 0) {
1647                         CERROR("Can't start listener: %d\n", rc);
1648                         goto failed;
1649                 }
1650
1651                 list_add_tail(&ibdev->ibd_list, 
1652                               &kiblnd_data.kib_devs);
1653         }
1654
1655         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ibdev->ibd_ifip);
1656         net->ibn_dev = ibdev;
1657
1658 #if IBLND_MAP_ON_DEMAND
1659         /* FMR pool for RDMA */
1660         {
1661                 struct ib_fmr_pool      *fmrpool;
1662                 struct ib_fmr_pool_param param = {
1663                         .max_pages_per_fmr = LNET_MAX_PAYLOAD/PAGE_SIZE,
1664                         .page_shift        = PAGE_SHIFT,
1665                         .access            = (IB_ACCESS_LOCAL_WRITE |
1666                                               IB_ACCESS_REMOTE_WRITE),
1667                         .pool_size         = *kiblnd_tunables.kib_fmr_pool_size,
1668                         .dirty_watermark   = *kiblnd_tunables.kib_fmr_flush_trigger,
1669                         .flush_function    = NULL,
1670                         .flush_arg         = NULL,
1671                         .cache             = *kiblnd_tunables.kib_fmr_cache};
1672
1673                 if (*kiblnd_tunables.kib_fmr_pool_size < 
1674                     *kiblnd_tunables.kib_ntx) {
1675                         CERROR("Can't set fmr pool size (%d) < ntx(%d)\n",
1676                                *kiblnd_tunables.kib_fmr_pool_size,
1677                                *kiblnd_tunables.kib_ntx);
1678                         goto failed;
1679                 }
1680
1681                 fmrpool = ib_create_fmr_pool(ibdev->ibd_pd, &param);
1682                 if (!IS_ERR(fmrpool)) {
1683                         net->ibn_fmrpool = fmrpool;
1684                 } else {
1685                         CERROR("Can't create FMR pool: %ld\n", 
1686                                PTR_ERR(fmrpool));
1687                         goto failed;
1688                 }
1689         }
1690 #endif
1691
1692         kiblnd_map_tx_descs(ni);
1693
1694         ibdev->ibd_nnets++;
1695         net->ibn_init = IBLND_INIT_ALL;
1696
1697         return 0;
1698
1699 failed:
1700         kiblnd_shutdown(ni);
1701
1702         CDEBUG(D_NET, "kiblnd_startup failed\n");
1703         return -ENETDOWN;
1704 }
1705
1706 void __exit
1707 kiblnd_module_fini (void)
1708 {
1709         lnet_unregister_lnd(&the_kiblnd);
1710         kiblnd_tunables_fini();
1711 }
1712
1713 int __init
1714 kiblnd_module_init (void)
1715 {
1716         int    rc;
1717
1718         CLASSERT (sizeof(kib_msg_t) <= IBLND_MSG_SIZE);
1719 #if !IBLND_MAP_ON_DEMAND
1720         CLASSERT (offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
1721                   <= IBLND_MSG_SIZE);
1722         CLASSERT (offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
1723                   <= IBLND_MSG_SIZE);
1724 #endif
1725         rc = kiblnd_tunables_init();
1726         if (rc != 0)
1727                 return rc;
1728
1729         lnet_register_lnd(&the_kiblnd);
1730
1731         return 0;
1732 }
1733
1734 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1735 MODULE_DESCRIPTION("Kernel OpenIB gen2 LND v1.00");
1736 MODULE_LICENSE("GPL");
1737
1738 module_init(kiblnd_module_init);
1739 module_exit(kiblnd_module_fini);