Whamcloud - gitweb
b=11245,i=liangzhen:
[fs/lustre-release.git] / lnet / klnds / o2iblnd / o2iblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/o2iblnd/o2iblnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "o2iblnd.h"
42
43 lnd_t the_kiblnd = {
44         .lnd_type       = O2IBLND,
45         .lnd_startup    = kiblnd_startup,
46         .lnd_shutdown   = kiblnd_shutdown,
47         .lnd_ctl        = kiblnd_ctl,
48         .lnd_send       = kiblnd_send,
49         .lnd_recv       = kiblnd_recv,
50 };
51
52 kib_data_t              kiblnd_data;
53
54 __u32
55 kiblnd_cksum (void *ptr, int nob)
56 {
57         char  *c  = ptr;
58         __u32  sum = 0;
59
60         while (nob-- > 0)
61                 sum = ((sum << 1) | (sum >> 31)) + *c++;
62
63         /* ensure I don't return 0 (== no checksum) */
64         return (sum == 0) ? 1 : sum;
65 }
66
67 void
68 kiblnd_init_msg (kib_msg_t *msg, int type, int body_nob)
69 {
70         msg->ibm_type = type;
71         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
72 }
73
74 void
75 kiblnd_pack_msg (lnet_ni_t *ni, kib_msg_t *msg,
76                  int credits, lnet_nid_t dstnid, __u64 dststamp)
77 {
78         kib_net_t *net = ni->ni_data;
79
80         /* CAVEAT EMPTOR! all message fields not set here should have been
81          * initialised previously. */
82         msg->ibm_magic    = IBLND_MSG_MAGIC;
83         msg->ibm_version  = IBLND_MSG_VERSION;
84         /*   ibm_type */
85         msg->ibm_credits  = credits;
86         /*   ibm_nob */
87         msg->ibm_cksum    = 0;
88         msg->ibm_srcnid   = ni->ni_nid;
89         msg->ibm_srcstamp = net->ibn_incarnation;
90         msg->ibm_dstnid   = dstnid;
91         msg->ibm_dststamp = dststamp;
92
93         if (*kiblnd_tunables.kib_cksum) {
94                 /* NB ibm_cksum zero while computing cksum */
95                 msg->ibm_cksum = kiblnd_cksum(msg, msg->ibm_nob);
96         }
97 }
98
99 int
100 kiblnd_unpack_msg(kib_msg_t *msg, int nob)
101 {
102         const int hdr_size = offsetof(kib_msg_t, ibm_u);
103         __u32     msg_cksum;
104         int       flip;
105         int       msg_nob;
106 #if !IBLND_MAP_ON_DEMAND
107         int       i;
108         int       n;
109 #endif
110         /* 6 bytes are enough to have received magic + version */
111         if (nob < 6) {
112                 CERROR("Short message: %d\n", nob);
113                 return -EPROTO;
114         }
115
116         if (msg->ibm_magic == IBLND_MSG_MAGIC) {
117                 flip = 0;
118         } else if (msg->ibm_magic == __swab32(IBLND_MSG_MAGIC)) {
119                 flip = 1;
120         } else {
121                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
122                 return -EPROTO;
123         }
124
125         if (msg->ibm_version !=
126             (flip ? __swab16(IBLND_MSG_VERSION) : IBLND_MSG_VERSION)) {
127                 CERROR("Bad version: %d\n", msg->ibm_version);
128                 return -EPROTO;
129         }
130
131         if (nob < hdr_size) {
132                 CERROR("Short message: %d\n", nob);
133                 return -EPROTO;
134         }
135
136         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
137         if (msg_nob > nob) {
138                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
139                 return -EPROTO;
140         }
141
142         /* checksum must be computed with ibm_cksum zero and BEFORE anything
143          * gets flipped */
144         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
145         msg->ibm_cksum = 0;
146         if (msg_cksum != 0 &&
147             msg_cksum != kiblnd_cksum(msg, msg_nob)) {
148                 CERROR("Bad checksum\n");
149                 return -EPROTO;
150         }
151         msg->ibm_cksum = msg_cksum;
152
153         if (flip) {
154                 /* leave magic unflipped as a clue to peer endianness */
155                 __swab16s(&msg->ibm_version);
156                 CLASSERT (sizeof(msg->ibm_type) == 1);
157                 CLASSERT (sizeof(msg->ibm_credits) == 1);
158                 msg->ibm_nob = msg_nob;
159                 __swab64s(&msg->ibm_srcnid);
160                 __swab64s(&msg->ibm_srcstamp);
161                 __swab64s(&msg->ibm_dstnid);
162                 __swab64s(&msg->ibm_dststamp);
163         }
164
165         if (msg->ibm_srcnid == LNET_NID_ANY) {
166                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
167                 return -EPROTO;
168         }
169
170         switch (msg->ibm_type) {
171         default:
172                 CERROR("Unknown message type %x\n", msg->ibm_type);
173                 return -EPROTO;
174
175         case IBLND_MSG_NOOP:
176                 break;
177
178         case IBLND_MSG_IMMEDIATE:
179                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
180                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
181                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
182                         return -EPROTO;
183                 }
184                 break;
185
186         case IBLND_MSG_PUT_REQ:
187                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
188                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
189                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
190                         return -EPROTO;
191                 }
192                 break;
193
194         case IBLND_MSG_PUT_ACK:
195                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
196                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
197                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
198                         return -EPROTO;
199                 }
200 #if IBLND_MAP_ON_DEMAND
201                 if (flip) {
202                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
203                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
204                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
205                 }
206 #else
207                 if (flip) {
208                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
209                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrags);
210                 }
211
212                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrags;
213                 if (n <= 0 || n > IBLND_MAX_RDMA_FRAGS) {
214                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n", 
215                                n, IBLND_MAX_RDMA_FRAGS);
216                         return -EPROTO;
217                 }
218
219                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
220                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
221                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
222                         return -EPROTO;
223                 }
224
225                 if (flip) {
226                         for (i = 0; i < n; i++) {
227                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
228                                 __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr);
229                         }
230                 }
231 #endif
232                 break;
233
234         case IBLND_MSG_GET_REQ:
235                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
236                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
237                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
238                         return -EPROTO;
239                 }
240 #if IBLND_MAP_ON_DEMAND
241                 if (flip) {
242                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
243                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
244                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
245                 }
246 #else
247                 if (flip) {
248                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
249                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrags);
250                 }
251
252                 n = msg->ibm_u.get.ibgm_rd.rd_nfrags;
253                 if (n <= 0 || n > IBLND_MAX_RDMA_FRAGS) {
254                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n", 
255                                n, IBLND_MAX_RDMA_FRAGS);
256                         return -EPROTO;
257                 }
258                 
259                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
260                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
261                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
262                         return -EPROTO;
263                 }
264                 
265                 if (flip)
266                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrags; i++) {
267                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
268                                 __swab64s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr);
269                         }
270 #endif
271                 break;
272
273         case IBLND_MSG_PUT_NAK:
274         case IBLND_MSG_PUT_DONE:
275         case IBLND_MSG_GET_DONE:
276                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
277                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
278                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
279                         return -EPROTO;
280                 }
281                 if (flip)
282                         __swab32s(&msg->ibm_u.completion.ibcm_status);
283                 break;
284
285         case IBLND_MSG_CONNREQ:
286         case IBLND_MSG_CONNACK:
287                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
288                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
289                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
290                         return -EPROTO;
291                 }
292                 if (flip) {
293                         __swab16s(&msg->ibm_u.connparams.ibcp_queue_depth);
294                         __swab16s(&msg->ibm_u.connparams.ibcp_max_frags);
295                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
296                 }
297                 break;
298         }
299         return 0;
300 }
301
302 int
303 kiblnd_create_peer (lnet_ni_t *ni, kib_peer_t **peerp, lnet_nid_t nid)
304 {
305         kib_peer_t     *peer;
306         kib_net_t      *net = ni->ni_data;
307         unsigned long   flags;
308
309         LASSERT (net != NULL);
310         LASSERT (nid != LNET_NID_ANY);
311
312         LIBCFS_ALLOC(peer, sizeof(*peer));
313         if (peer == NULL) {
314                 CERROR("Cannot allocate peer\n");
315                 return -ENOMEM;
316         }
317
318         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
319
320         peer->ibp_ni = ni;
321         peer->ibp_nid = nid;
322         peer->ibp_error = 0;
323         peer->ibp_last_alive = cfs_time_current();
324         atomic_set(&peer->ibp_refcount, 1);     /* 1 ref for caller */
325
326         INIT_LIST_HEAD(&peer->ibp_list);       /* not in the peer table yet */
327         INIT_LIST_HEAD(&peer->ibp_conns);
328         INIT_LIST_HEAD(&peer->ibp_tx_queue);
329
330         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
331
332         /* always called with a ref on ni, which prevents ni being shutdown */
333         LASSERT (net->ibn_shutdown == 0);
334
335         /* npeers only grows with the global lock held */
336         atomic_inc(&net->ibn_npeers);
337
338         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
339
340         *peerp = peer;
341         return 0;
342 }
343
344 void
345 kiblnd_destroy_peer (kib_peer_t *peer)
346 {
347         kib_net_t *net = peer->ibp_ni->ni_data;
348
349         LASSERT (net != NULL);
350         LASSERT (atomic_read(&peer->ibp_refcount) == 0);
351         LASSERT (!kiblnd_peer_active(peer));
352         LASSERT (peer->ibp_connecting == 0);
353         LASSERT (peer->ibp_accepting == 0);
354         LASSERT (list_empty(&peer->ibp_conns));
355         LASSERT (list_empty(&peer->ibp_tx_queue));
356
357         LIBCFS_FREE(peer, sizeof(*peer));
358
359         /* NB a peer's connections keep a reference on their peer until
360          * they are destroyed, so we can be assured that _all_ state to do
361          * with this peer has been cleaned up when its refcount drops to
362          * zero. */
363         atomic_dec(&net->ibn_npeers);
364 }
365
366 void
367 kiblnd_destroy_dev (kib_dev_t *dev)
368 {
369         LASSERT (dev->ibd_nnets == 0);
370
371         if (!list_empty(&dev->ibd_list)) /* on kib_devs? */
372                 list_del_init(&dev->ibd_list);
373
374         if (dev->ibd_mr != NULL)
375                 ib_dereg_mr(dev->ibd_mr);
376
377         if (dev->ibd_pd != NULL)
378                 ib_dealloc_pd(dev->ibd_pd);
379
380         if (dev->ibd_cmid != NULL)
381                 rdma_destroy_id(dev->ibd_cmid);
382
383         LIBCFS_FREE(dev, sizeof(*dev));
384 }
385
386 kib_peer_t *
387 kiblnd_find_peer_locked (lnet_nid_t nid)
388 {
389         /* the caller is responsible for accounting the additional reference
390          * that this creates */
391         struct list_head *peer_list = kiblnd_nid2peerlist(nid);
392         struct list_head *tmp;
393         kib_peer_t       *peer;
394
395         list_for_each (tmp, peer_list) {
396
397                 peer = list_entry(tmp, kib_peer_t, ibp_list);
398
399                 LASSERT (peer->ibp_connecting > 0 || /* creating conns */
400                          peer->ibp_accepting > 0 ||
401                          !list_empty(&peer->ibp_conns));  /* active conn */
402
403                 if (peer->ibp_nid != nid)
404                         continue;
405
406                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
407                        peer, libcfs_nid2str(nid),
408                        atomic_read(&peer->ibp_refcount));
409                 return peer;
410         }
411         return NULL;
412 }
413
414 void
415 kiblnd_unlink_peer_locked (kib_peer_t *peer)
416 {
417         LASSERT (list_empty(&peer->ibp_conns));
418
419         LASSERT (kiblnd_peer_active(peer));
420         list_del_init(&peer->ibp_list);
421         /* lose peerlist's ref */
422         kiblnd_peer_decref(peer);
423 }
424
425 int
426 kiblnd_get_peer_info (lnet_ni_t *ni, int index, 
427                       lnet_nid_t *nidp, int *count)
428 {
429         kib_peer_t        *peer;
430         struct list_head  *ptmp;
431         int                i;
432         unsigned long      flags;
433
434         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
435
436         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
437
438                 list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
439
440                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
441                         LASSERT (peer->ibp_connecting > 0 ||
442                                  peer->ibp_accepting > 0 ||
443                                  !list_empty(&peer->ibp_conns));
444
445                         if (peer->ibp_ni != ni)
446                                 continue;
447
448                         if (index-- > 0)
449                                 continue;
450
451                         *nidp = peer->ibp_nid;
452                         *count = atomic_read(&peer->ibp_refcount);
453
454                         read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
455                                                flags);
456                         return 0;
457                 }
458         }
459
460         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
461         return -ENOENT;
462 }
463
464 void
465 kiblnd_del_peer_locked (kib_peer_t *peer)
466 {
467         struct list_head *ctmp;
468         struct list_head *cnxt;
469         kib_conn_t       *conn;
470
471         if (list_empty(&peer->ibp_conns)) {
472                 kiblnd_unlink_peer_locked(peer);
473         } else {
474                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
475                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
476
477                         kiblnd_close_conn_locked(conn, 0);
478                 }
479                 /* NB closing peer's last conn unlinked it. */
480         }
481         /* NB peer now unlinked; might even be freed if the peer table had the
482          * last ref on it. */
483 }
484
485 int
486 kiblnd_del_peer (lnet_ni_t *ni, lnet_nid_t nid)
487 {
488         CFS_LIST_HEAD     (zombies);
489         struct list_head  *ptmp;
490         struct list_head  *pnxt;
491         kib_peer_t        *peer;
492         int                lo;
493         int                hi;
494         int                i;
495         unsigned long      flags;
496         int                rc = -ENOENT;
497
498         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
499
500         if (nid != LNET_NID_ANY) {
501                 lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
502         } else {
503                 lo = 0;
504                 hi = kiblnd_data.kib_peer_hash_size - 1;
505         }
506
507         for (i = lo; i <= hi; i++) {
508                 list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
509                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
510                         LASSERT (peer->ibp_connecting > 0 ||
511                                  peer->ibp_accepting > 0 ||
512                                  !list_empty(&peer->ibp_conns));
513
514                         if (peer->ibp_ni != ni)
515                                 continue;
516
517                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
518                                 continue;
519
520                         if (!list_empty(&peer->ibp_tx_queue)) {
521                                 LASSERT (list_empty(&peer->ibp_conns));
522
523                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
524                         }
525
526                         kiblnd_del_peer_locked(peer);
527                         rc = 0;         /* matched something */
528                 }
529         }
530
531         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
532
533         kiblnd_txlist_done(ni, &zombies, -EIO);
534
535         return rc;
536 }
537
538 kib_conn_t *
539 kiblnd_get_conn_by_idx (lnet_ni_t *ni, int index)
540 {
541         kib_peer_t        *peer;
542         struct list_head  *ptmp;
543         kib_conn_t        *conn;
544         struct list_head  *ctmp;
545         int                i;
546         unsigned long      flags;
547
548         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
549
550         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
551                 list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
552
553                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
554                         LASSERT (peer->ibp_connecting > 0 ||
555                                  peer->ibp_accepting > 0 ||
556                                  !list_empty(&peer->ibp_conns));
557
558                         if (peer->ibp_ni != ni)
559                                 continue;
560
561                         list_for_each (ctmp, &peer->ibp_conns) {
562                                 if (index-- > 0)
563                                         continue;
564
565                                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
566                                 kiblnd_conn_addref(conn);
567                                 read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
568                                                        flags);
569                                 return conn;
570                         }
571                 }
572         }
573
574         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
575         return NULL;
576 }
577
578 void
579 kiblnd_debug_rx (kib_rx_t *rx)
580 {
581         CDEBUG(D_CONSOLE, "      %p status %d msg_type %x cred %d\n",
582                rx, rx->rx_status, rx->rx_msg->ibm_type,
583                rx->rx_msg->ibm_credits);
584 }
585
586 void
587 kiblnd_debug_tx (kib_tx_t *tx)
588 {
589         CDEBUG(D_CONSOLE, "      %p snd %d q %d w %d rc %d dl %lx "
590                "cookie "LPX64" msg %s%s type %x cred %d\n",
591                tx, tx->tx_sending, tx->tx_queued, tx->tx_waiting,
592                tx->tx_status, tx->tx_deadline, tx->tx_cookie,
593                tx->tx_lntmsg[0] == NULL ? "-" : "!",
594                tx->tx_lntmsg[1] == NULL ? "-" : "!",
595                tx->tx_msg->ibm_type, tx->tx_msg->ibm_credits);
596 }
597
598 void
599 kiblnd_debug_conn (kib_conn_t *conn)
600 {
601         struct list_head *tmp;
602         int               i;
603
604         spin_lock(&conn->ibc_lock);
605
606         CDEBUG(D_CONSOLE, "conn[%d] %p -> %s: \n",
607                atomic_read(&conn->ibc_refcount), conn,
608                libcfs_nid2str(conn->ibc_peer->ibp_nid));
609         CDEBUG(D_CONSOLE, "   state %d nposted %d cred %d o_cred %d r_cred %d\n",
610                conn->ibc_state, conn->ibc_nsends_posted, conn->ibc_credits, 
611                conn->ibc_outstanding_credits, conn->ibc_reserved_credits);
612         CDEBUG(D_CONSOLE, "   comms_err %d\n", conn->ibc_comms_error);
613
614         CDEBUG(D_CONSOLE, "   early_rxs:\n");
615         list_for_each(tmp, &conn->ibc_early_rxs)
616                 kiblnd_debug_rx(list_entry(tmp, kib_rx_t, rx_list));
617
618         CDEBUG(D_CONSOLE, "   tx_noops:\n");
619         list_for_each(tmp, &conn->ibc_tx_noops)
620                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
621
622         CDEBUG(D_CONSOLE, "   tx_queue_nocred:\n");
623         list_for_each(tmp, &conn->ibc_tx_queue_nocred)
624                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
625
626         CDEBUG(D_CONSOLE, "   tx_queue_rsrvd:\n");
627         list_for_each(tmp, &conn->ibc_tx_queue_rsrvd)
628                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
629
630         CDEBUG(D_CONSOLE, "   tx_queue:\n");
631         list_for_each(tmp, &conn->ibc_tx_queue)
632                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
633
634         CDEBUG(D_CONSOLE, "   active_txs:\n");
635         list_for_each(tmp, &conn->ibc_active_txs)
636                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
637
638         CDEBUG(D_CONSOLE, "   rxs:\n");
639         for (i = 0; i < IBLND_RX_MSGS; i++)
640                 kiblnd_debug_rx(&conn->ibc_rxs[i]);
641
642         spin_unlock(&conn->ibc_lock);
643 }
644
645 kib_conn_t *
646 kiblnd_create_conn (kib_peer_t *peer, struct rdma_cm_id *cmid, int state)
647 {
648         /* CAVEAT EMPTOR:
649          * If the new conn is created successfully it takes over the caller's
650          * ref on 'peer'.  It also "owns" 'cmid' and destroys it when it itself
651          * is destroyed.  On failure, the caller's ref on 'peer' remains and
652          * she must dispose of 'cmid'.  (Actually I'd block forever if I tried
653          * to destroy 'cmid' here since I'm called from the CM which still has
654          * its ref on 'cmid'). */
655         kib_conn_t             *conn;
656         kib_net_t              *net = peer->ibp_ni->ni_data;
657         int                     i;
658         int                     page_offset;
659         int                     ipage;
660         int                     rc;
661         struct ib_cq           *cq;
662         struct ib_qp_init_attr *init_qp_attr;
663         unsigned long           flags;
664
665         LASSERT (net != NULL);
666         LASSERT (!in_interrupt());
667
668         LIBCFS_ALLOC(init_qp_attr, sizeof(*init_qp_attr));
669         if (init_qp_attr == NULL) {
670                 CERROR("Can't allocate qp_attr for %s\n",
671                        libcfs_nid2str(peer->ibp_nid));
672                 goto failed_0;
673         }
674
675         LIBCFS_ALLOC(conn, sizeof(*conn));
676         if (conn == NULL) {
677                 CERROR("Can't allocate connection for %s\n",
678                        libcfs_nid2str(peer->ibp_nid));
679                 goto failed_1;
680         }
681
682         memset(conn, 0, sizeof(*conn)); /* zero flags, NULL pointers etc... */
683
684         conn->ibc_state = IBLND_CONN_INIT;
685         conn->ibc_peer = peer;                  /* I take the caller's ref */
686         cmid->context = conn;                   /* for future CM callbacks */
687         conn->ibc_cmid = cmid;
688
689         INIT_LIST_HEAD(&conn->ibc_early_rxs);
690         INIT_LIST_HEAD(&conn->ibc_tx_noops);
691         INIT_LIST_HEAD(&conn->ibc_tx_queue);
692         INIT_LIST_HEAD(&conn->ibc_tx_queue_rsrvd);
693         INIT_LIST_HEAD(&conn->ibc_tx_queue_nocred);
694         INIT_LIST_HEAD(&conn->ibc_active_txs);
695         spin_lock_init(&conn->ibc_lock);
696
697         LIBCFS_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
698         if (conn->ibc_connvars == NULL) {
699                 CERROR("Can't allocate in-progress connection state\n");
700                 goto failed_2;
701         }
702         memset(conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));
703
704         LIBCFS_ALLOC(conn->ibc_rxs, IBLND_RX_MSGS * sizeof(kib_rx_t));
705         if (conn->ibc_rxs == NULL) {
706                 CERROR("Cannot allocate RX buffers\n");
707                 goto failed_2;
708         }
709         memset(conn->ibc_rxs, 0, IBLND_RX_MSGS * sizeof(kib_rx_t));
710
711         rc = kiblnd_alloc_pages(&conn->ibc_rx_pages, IBLND_RX_MSG_PAGES);
712         if (rc != 0)
713                 goto failed_2;
714
715         for (i = ipage = page_offset = 0; i < IBLND_RX_MSGS; i++) {
716                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
717                 kib_rx_t    *rx = &conn->ibc_rxs[i];
718
719                 rx->rx_conn = conn;
720                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) +
721                                            page_offset);
722                 rx->rx_msgaddr = kiblnd_dma_map_single(cmid->device,
723                                                        rx->rx_msg, IBLND_MSG_SIZE,
724                                                        DMA_FROM_DEVICE);
725                 KIBLND_UNMAP_ADDR_SET(rx, rx_msgunmap, rx->rx_msgaddr);
726
727                 CDEBUG(D_NET,"rx %d: %p "LPX64"("LPX64")\n",
728                        i, rx->rx_msg, rx->rx_msgaddr,
729                        lnet_page2phys(page) + page_offset);
730
731                 page_offset += IBLND_MSG_SIZE;
732                 LASSERT (page_offset <= PAGE_SIZE);
733
734                 if (page_offset == PAGE_SIZE) {
735                         page_offset = 0;
736                         ipage++;
737                         LASSERT (ipage <= IBLND_RX_MSG_PAGES);
738                 }
739         }
740
741 #ifdef HAVE_OFED_IB_COMP_VECTOR
742         cq = ib_create_cq(cmid->device,
743                           kiblnd_cq_completion, kiblnd_cq_event, conn,
744                           IBLND_CQ_ENTRIES(), 0);
745 #else
746         cq = ib_create_cq(cmid->device,
747                           kiblnd_cq_completion, kiblnd_cq_event, conn,
748                           IBLND_CQ_ENTRIES());
749 #endif
750         if (!IS_ERR(cq)) {
751                 conn->ibc_cq = cq;
752         } else {
753                 CERROR("Can't create CQ: %ld\n", PTR_ERR(cq));
754                 goto failed_2;
755         }
756
757         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
758         if (rc != 0) {
759                 CERROR("Can't request completion notificiation: %d\n", rc);
760                 goto failed_2;
761         }
762
763         memset(init_qp_attr, 0, sizeof(*init_qp_attr));
764         init_qp_attr->event_handler = kiblnd_qp_event;
765         init_qp_attr->qp_context = conn;
766         init_qp_attr->cap.max_send_wr = IBLND_SEND_WRS;
767         init_qp_attr->cap.max_recv_wr = IBLND_RECV_WRS;
768         init_qp_attr->cap.max_send_sge = 1;
769         init_qp_attr->cap.max_recv_sge = 1;
770         init_qp_attr->sq_sig_type = IB_SIGNAL_REQ_WR;
771         init_qp_attr->qp_type = IB_QPT_RC;
772         init_qp_attr->send_cq = cq;
773         init_qp_attr->recv_cq = cq;
774
775         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
776         switch (*kiblnd_tunables.kib_ib_mtu) {
777         case 0: /* don't force path MTU */
778                 break;
779         case 256:
780                 cmid->route.path_rec->mtu = IB_MTU_256;
781                 break;
782         case 512:
783                 cmid->route.path_rec->mtu = IB_MTU_512;
784                 break;
785         case 1024:
786                 cmid->route.path_rec->mtu = IB_MTU_1024;
787                 break;
788         case 2048:
789                 cmid->route.path_rec->mtu = IB_MTU_2048;
790                 break;
791         case 4096:
792                 cmid->route.path_rec->mtu = IB_MTU_4096;
793                 break;
794         default:
795                 LBUG();
796                 break;
797         }
798         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
799
800         rc = rdma_create_qp(cmid, net->ibn_dev->ibd_pd, init_qp_attr);
801         if (rc != 0) {
802                 CERROR("Can't create QP: %d\n", rc);
803                 goto failed_2;
804         }
805
806         LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
807
808         /* 1 ref for caller and each rxmsg */
809         atomic_set(&conn->ibc_refcount, 1 + IBLND_RX_MSGS);
810         conn->ibc_nrx = IBLND_RX_MSGS;
811
812         /* post receives */
813         for (i = 0; i < IBLND_RX_MSGS; i++) {
814                 rc = kiblnd_post_rx(&conn->ibc_rxs[i],
815                                     IBLND_POSTRX_NO_CREDIT);
816                 if (rc != 0) {
817                         CERROR("Can't post rxmsg: %d\n", rc);
818
819                         /* Make posted receives complete */
820                         kiblnd_abort_receives(conn);
821
822                         /* correct # of posted buffers 
823                          * NB locking needed now I'm racing with completion */
824                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
825                         conn->ibc_nrx -= IBLND_RX_MSGS - i;
826                         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
827                                                flags);
828
829                         /* Drop my own and unused rxbuffer refcounts */
830                         while (i++ <= IBLND_RX_MSGS)
831                                 kiblnd_conn_decref(conn);
832
833                         return NULL;
834                 }
835         }
836         
837         /* Init successful! */
838         LASSERT (state == IBLND_CONN_ACTIVE_CONNECT ||
839                  state == IBLND_CONN_PASSIVE_WAIT);
840         conn->ibc_state = state;
841
842         /* 1 more conn */
843         atomic_inc(&net->ibn_nconns);
844         return conn;
845
846  failed_2:
847         kiblnd_destroy_conn(conn);
848  failed_1:
849         LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
850  failed_0:
851         return NULL;
852 }
853
854 void
855 kiblnd_destroy_conn (kib_conn_t *conn)
856 {
857         struct rdma_cm_id *cmid = conn->ibc_cmid;
858         kib_peer_t        *peer = conn->ibc_peer;
859         int                rc;
860         int                i;
861
862         LASSERT (!in_interrupt());
863         LASSERT (atomic_read(&conn->ibc_refcount) == 0);
864         LASSERT (list_empty(&conn->ibc_early_rxs));
865         LASSERT (list_empty(&conn->ibc_tx_noops));
866         LASSERT (list_empty(&conn->ibc_tx_queue));
867         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
868         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
869         LASSERT (list_empty(&conn->ibc_active_txs));
870         LASSERT (conn->ibc_nsends_posted == 0);
871
872         switch (conn->ibc_state) {
873         default:
874                 /* conn must be completely disengaged from the network */
875                 LBUG();
876
877         case IBLND_CONN_DISCONNECTED:
878                 /* connvars should have been freed already */
879                 LASSERT (conn->ibc_connvars == NULL);
880                 break;
881
882         case IBLND_CONN_INIT:
883                 break;
884         }
885
886         if (cmid->qp != NULL)
887                 rdma_destroy_qp(cmid);
888
889         if (conn->ibc_cq != NULL) {
890                 rc = ib_destroy_cq(conn->ibc_cq);
891                 if (rc != 0)
892                         CWARN("Error destroying CQ: %d\n", rc);
893         }
894
895         if (conn->ibc_rx_pages != NULL) {
896                 LASSERT (conn->ibc_rxs != NULL);
897
898                 for (i = 0; i < IBLND_RX_MSGS; i++) {
899                         kib_rx_t *rx = &conn->ibc_rxs[i];
900
901                         LASSERT (rx->rx_nob >= 0); /* not posted */
902
903                         kiblnd_dma_unmap_single(cmid->device,
904                                                 KIBLND_UNMAP_ADDR(rx, rx_msgunmap,
905                                                                   rx->rx_msgaddr),
906                                                 IBLND_MSG_SIZE, DMA_FROM_DEVICE);
907                 }
908
909                 kiblnd_free_pages(conn->ibc_rx_pages);
910         }
911
912         if (conn->ibc_rxs != NULL) {
913                 LIBCFS_FREE(conn->ibc_rxs,
914                             IBLND_RX_MSGS * sizeof(kib_rx_t));
915         }
916
917         if (conn->ibc_connvars != NULL)
918                 LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
919
920         /* See CAVEAT EMPTOR above in kiblnd_create_conn */
921         if (conn->ibc_state != IBLND_CONN_INIT) {
922                 kib_net_t *net = peer->ibp_ni->ni_data;
923
924                 kiblnd_peer_decref(peer);
925                 rdma_destroy_id(cmid);
926                 atomic_dec(&net->ibn_nconns);
927         }
928
929         LIBCFS_FREE(conn, sizeof(*conn));
930 }
931
932 int
933 kiblnd_close_peer_conns_locked (kib_peer_t *peer, int why)
934 {
935         kib_conn_t         *conn;
936         struct list_head   *ctmp;
937         struct list_head   *cnxt;
938         int                 count = 0;
939
940         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
941                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
942
943                 count++;
944                 kiblnd_close_conn_locked(conn, why);
945         }
946
947         return count;
948 }
949
950 int
951 kiblnd_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
952 {
953         kib_conn_t         *conn;
954         struct list_head   *ctmp;
955         struct list_head   *cnxt;
956         int                 count = 0;
957
958         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
959                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
960
961                 if (conn->ibc_incarnation == incarnation)
962                         continue;
963
964                 CDEBUG(D_NET, "Closing stale conn -> %s incarnation:"LPX64"("LPX64")\n",
965                        libcfs_nid2str(peer->ibp_nid),
966                        conn->ibc_incarnation, incarnation);
967
968                 count++;
969                 kiblnd_close_conn_locked(conn, -ESTALE);
970         }
971
972         return count;
973 }
974
975 int
976 kiblnd_close_matching_conns (lnet_ni_t *ni, lnet_nid_t nid)
977 {
978         kib_peer_t         *peer;
979         struct list_head   *ptmp;
980         struct list_head   *pnxt;
981         int                 lo;
982         int                 hi;
983         int                 i;
984         unsigned long       flags;
985         int                 count = 0;
986
987         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
988
989         if (nid != LNET_NID_ANY)
990                 lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
991         else {
992                 lo = 0;
993                 hi = kiblnd_data.kib_peer_hash_size - 1;
994         }
995
996         for (i = lo; i <= hi; i++) {
997                 list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
998
999                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
1000                         LASSERT (peer->ibp_connecting > 0 ||
1001                                  peer->ibp_accepting > 0 ||
1002                                  !list_empty(&peer->ibp_conns));
1003
1004                         if (peer->ibp_ni != ni)
1005                                 continue;
1006
1007                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1008                                 continue;
1009
1010                         count += kiblnd_close_peer_conns_locked(peer, 0);
1011                 }
1012         }
1013
1014         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1015
1016         /* wildcards always succeed */
1017         if (nid == LNET_NID_ANY)
1018                 return 0;
1019
1020         return (count == 0) ? -ENOENT : 0;
1021 }
1022
1023 int
1024 kiblnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1025 {
1026         struct libcfs_ioctl_data *data = arg;
1027         int                       rc = -EINVAL;
1028
1029         switch(cmd) {
1030         case IOC_LIBCFS_GET_PEER: {
1031                 lnet_nid_t   nid = 0;
1032                 int          count = 0;
1033
1034                 rc = kiblnd_get_peer_info(ni, data->ioc_count,
1035                                           &nid, &count);
1036                 data->ioc_nid    = nid;
1037                 data->ioc_count  = count;
1038                 break;
1039         }
1040
1041         case IOC_LIBCFS_DEL_PEER: {
1042                 rc = kiblnd_del_peer(ni, data->ioc_nid);
1043                 break;
1044         }
1045         case IOC_LIBCFS_GET_CONN: {
1046                 kib_conn_t *conn;
1047
1048                 rc = 0;
1049                 conn = kiblnd_get_conn_by_idx(ni, data->ioc_count);
1050                 if (conn == NULL) {
1051                         rc = -ENOENT;
1052                         break;
1053                 }
1054
1055                 LASSERT (conn->ibc_cmid != NULL);
1056                 data->ioc_nid = conn->ibc_peer->ibp_nid;
1057                 if (conn->ibc_cmid->route.path_rec == NULL)
1058                         data->ioc_u32[0] = 0; /* iWarp has no path MTU */
1059                 else
1060                         data->ioc_u32[0] =
1061                         ib_mtu_enum_to_int(conn->ibc_cmid->route.path_rec->mtu);
1062                 kiblnd_conn_decref(conn);
1063                 break;
1064         }
1065         case IOC_LIBCFS_CLOSE_CONNECTION: {
1066                 rc = kiblnd_close_matching_conns(ni, data->ioc_nid);
1067                 break;
1068         }
1069
1070         default:
1071                 break;
1072         }
1073
1074         return rc;
1075 }
1076
1077 void
1078 kiblnd_free_pages (kib_pages_t *p)
1079 {
1080         int         npages = p->ibp_npages;
1081         int         i;
1082
1083         for (i = 0; i < npages; i++)
1084                 if (p->ibp_pages[i] != NULL)
1085                         __free_page(p->ibp_pages[i]);
1086
1087         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1088 }
1089
1090 int
1091 kiblnd_alloc_pages (kib_pages_t **pp, int npages)
1092 {
1093         kib_pages_t   *p;
1094         int            i;
1095
1096         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1097         if (p == NULL) {
1098                 CERROR("Can't allocate descriptor for %d pages\n", npages);
1099                 return -ENOMEM;
1100         }
1101
1102         memset(p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1103         p->ibp_npages = npages;
1104
1105         for (i = 0; i < npages; i++) {
1106                 p->ibp_pages[i] = alloc_page(GFP_KERNEL);
1107                 if (p->ibp_pages[i] == NULL) {
1108                         CERROR("Can't allocate page %d of %d\n", i, npages);
1109                         kiblnd_free_pages(p);
1110                         return -ENOMEM;
1111                 }
1112         }
1113
1114         *pp = p;
1115         return 0;
1116 }
1117
1118 void
1119 kiblnd_free_tx_descs (lnet_ni_t *ni)
1120 {
1121         int        i;
1122         kib_net_t *net = ni->ni_data;
1123
1124         LASSERT (net != NULL);
1125
1126         if (net->ibn_tx_descs != NULL) {
1127                 for (i = 0; i < IBLND_TX_MSGS(); i++) {
1128                         kib_tx_t *tx = &net->ibn_tx_descs[i];
1129
1130 #if IBLND_MAP_ON_DEMAND
1131                         if (tx->tx_pages != NULL)
1132                                 LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1133                                             sizeof(*tx->tx_pages));
1134 #else
1135                         if (tx->tx_wrq != NULL)
1136                                 LIBCFS_FREE(tx->tx_wrq, 
1137                                             (1 + IBLND_MAX_RDMA_FRAGS) * 
1138                                             sizeof(*tx->tx_wrq));
1139
1140                         if (tx->tx_sge != NULL)
1141                                 LIBCFS_FREE(tx->tx_sge, 
1142                                             (1 + IBLND_MAX_RDMA_FRAGS) * 
1143                                             sizeof(*tx->tx_sge));
1144
1145                         if (tx->tx_rd != NULL)
1146                                 LIBCFS_FREE(tx->tx_rd, 
1147                                             offsetof(kib_rdma_desc_t, 
1148                                                rd_frags[IBLND_MAX_RDMA_FRAGS]));
1149
1150                         if (tx->tx_frags != NULL)
1151                                 LIBCFS_FREE(tx->tx_frags, 
1152                                             IBLND_MAX_RDMA_FRAGS *
1153                                             sizeof(*tx->tx_frags));
1154 #endif
1155                 }
1156
1157                 LIBCFS_FREE(net->ibn_tx_descs,
1158                             IBLND_TX_MSGS() * sizeof(kib_tx_t));
1159         }
1160
1161         if (net->ibn_tx_pages != NULL)
1162                 kiblnd_free_pages(net->ibn_tx_pages);
1163 }
1164
1165 int
1166 kiblnd_alloc_tx_descs (lnet_ni_t *ni)
1167 {
1168         int        i;
1169         int        rc;
1170         kib_net_t *net = ni->ni_data;
1171
1172         LASSERT (net != NULL);
1173
1174         rc = kiblnd_alloc_pages(&net->ibn_tx_pages, IBLND_TX_MSG_PAGES());
1175
1176         if (rc != 0) {
1177                 CERROR("Can't allocate tx pages\n");
1178                 return rc;
1179         }
1180
1181         LIBCFS_ALLOC (net->ibn_tx_descs,
1182                       IBLND_TX_MSGS() * sizeof(kib_tx_t));
1183         if (net->ibn_tx_descs == NULL) {
1184                 CERROR("Can't allocate %d tx descriptors\n", IBLND_TX_MSGS());
1185                 return -ENOMEM;
1186         }
1187
1188         memset(net->ibn_tx_descs, 0,
1189                IBLND_TX_MSGS() * sizeof(kib_tx_t));
1190
1191         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1192                 kib_tx_t *tx = &net->ibn_tx_descs[i];
1193
1194 #if IBLND_MAP_ON_DEMAND
1195                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1196                              sizeof(*tx->tx_pages));
1197                 if (tx->tx_pages == NULL) {
1198                         CERROR("Can't allocate phys page vector[%d]\n",
1199                                LNET_MAX_IOV);
1200                         return -ENOMEM;
1201                 }
1202 #else
1203                 LIBCFS_ALLOC(tx->tx_wrq,
1204                              (1 + IBLND_MAX_RDMA_FRAGS) *
1205                              sizeof(*tx->tx_wrq));
1206                 if (tx->tx_wrq == NULL)
1207                         return -ENOMEM;
1208
1209                 LIBCFS_ALLOC(tx->tx_sge,
1210                              (1 + IBLND_MAX_RDMA_FRAGS) *
1211                              sizeof(*tx->tx_sge));
1212                 if (tx->tx_sge == NULL)
1213                         return -ENOMEM;
1214
1215                 LIBCFS_ALLOC(tx->tx_rd,
1216                              offsetof(kib_rdma_desc_t,
1217                                       rd_frags[IBLND_MAX_RDMA_FRAGS]));
1218                 if (tx->tx_rd == NULL)
1219                         return -ENOMEM;
1220
1221                 LIBCFS_ALLOC(tx->tx_frags,
1222                              IBLND_MAX_RDMA_FRAGS * 
1223                              sizeof(*tx->tx_frags));
1224                 if (tx->tx_frags == NULL)
1225                         return -ENOMEM;
1226 #endif
1227         }
1228
1229         return 0;
1230 }
1231
1232 void
1233 kiblnd_unmap_tx_descs (lnet_ni_t *ni)
1234 {
1235         int             i;
1236         kib_tx_t       *tx;
1237         kib_net_t      *net = ni->ni_data;
1238
1239         LASSERT (net != NULL);
1240
1241         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1242                 tx = &net->ibn_tx_descs[i];
1243
1244                 kiblnd_dma_unmap_single(net->ibn_dev->ibd_cmid->device,
1245                                         KIBLND_UNMAP_ADDR(tx, tx_msgunmap,
1246                                                           tx->tx_msgaddr),
1247                                         IBLND_MSG_SIZE, DMA_TO_DEVICE);
1248         }
1249 }
1250
1251 void
1252 kiblnd_map_tx_descs (lnet_ni_t *ni)
1253 {
1254         int             ipage = 0;
1255         int             page_offset = 0;
1256         int             i;
1257         struct page    *page;
1258         kib_tx_t       *tx;
1259         kib_net_t      *net = ni->ni_data;
1260
1261         LASSERT (net != NULL);
1262
1263         /* pre-mapped messages are not bigger than 1 page */
1264         CLASSERT (IBLND_MSG_SIZE <= PAGE_SIZE);
1265
1266         /* No fancy arithmetic when we do the buffer calculations */
1267         CLASSERT (PAGE_SIZE % IBLND_MSG_SIZE == 0);
1268
1269         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1270                 page = net->ibn_tx_pages->ibp_pages[ipage];
1271                 tx = &net->ibn_tx_descs[i];
1272
1273                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) +
1274                                            page_offset);
1275
1276                 tx->tx_msgaddr = kiblnd_dma_map_single(
1277                         net->ibn_dev->ibd_cmid->device,
1278                         tx->tx_msg, IBLND_MSG_SIZE, DMA_TO_DEVICE);
1279                 KIBLND_UNMAP_ADDR_SET(tx, tx_msgunmap, tx->tx_msgaddr);
1280
1281                 list_add(&tx->tx_list, &net->ibn_idle_txs);
1282
1283                 page_offset += IBLND_MSG_SIZE;
1284                 LASSERT (page_offset <= PAGE_SIZE);
1285
1286                 if (page_offset == PAGE_SIZE) {
1287                         page_offset = 0;
1288                         ipage++;
1289                         LASSERT (ipage <= IBLND_TX_MSG_PAGES());
1290                 }
1291         }
1292 }
1293
1294 void
1295 kiblnd_base_shutdown (void)
1296 {
1297         int i;
1298
1299         LASSERT (list_empty(&kiblnd_data.kib_devs));
1300
1301         CDEBUG(D_MALLOC, "before LND base cleanup: kmem %d\n",
1302                atomic_read(&libcfs_kmemory));
1303
1304         switch (kiblnd_data.kib_init) {
1305         default:
1306                 LBUG();
1307
1308         case IBLND_INIT_ALL:
1309         case IBLND_INIT_DATA:
1310                 LASSERT (kiblnd_data.kib_peers != NULL);
1311                 for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
1312                         LASSERT (list_empty(&kiblnd_data.kib_peers[i]));
1313                 }
1314                 LASSERT (list_empty(&kiblnd_data.kib_connd_zombies));
1315                 LASSERT (list_empty(&kiblnd_data.kib_connd_conns));
1316
1317                 /* flag threads to terminate; wake and wait for them to die */
1318                 kiblnd_data.kib_shutdown = 1;
1319                 wake_up_all(&kiblnd_data.kib_sched_waitq);
1320                 wake_up_all(&kiblnd_data.kib_connd_waitq);
1321
1322                 i = 2;
1323                 while (atomic_read(&kiblnd_data.kib_nthreads) != 0) {
1324                         i++;
1325                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1326                                "Waiting for %d threads to terminate\n",
1327                                atomic_read(&kiblnd_data.kib_nthreads));
1328                         cfs_pause(cfs_time_seconds(1));
1329                 }
1330
1331                 /* fall through */
1332
1333         case IBLND_INIT_NOTHING:
1334                 break;
1335         }
1336
1337         if (kiblnd_data.kib_peers != NULL)
1338                 LIBCFS_FREE(kiblnd_data.kib_peers,
1339                             sizeof(struct list_head) *
1340                             kiblnd_data.kib_peer_hash_size);
1341
1342         CDEBUG(D_MALLOC, "after LND base cleanup: kmem %d\n",
1343                atomic_read(&libcfs_kmemory));
1344
1345         kiblnd_data.kib_init = IBLND_INIT_NOTHING;
1346         PORTAL_MODULE_UNUSE;
1347 }
1348
1349 void
1350 kiblnd_shutdown (lnet_ni_t *ni)
1351 {
1352         kib_net_t        *net = ni->ni_data;
1353         rwlock_t         *g_lock = &kiblnd_data.kib_global_lock;
1354         int               i;
1355         unsigned long     flags;
1356
1357         LASSERT(kiblnd_data.kib_init == IBLND_INIT_ALL);
1358
1359         if (net == NULL)
1360                 goto out;
1361
1362         CDEBUG(D_MALLOC, "before LND net cleanup: kmem %d\n",
1363                atomic_read(&libcfs_kmemory));
1364
1365         write_lock_irqsave(g_lock, flags);
1366         net->ibn_shutdown = 1;
1367         write_unlock_irqrestore(g_lock, flags);
1368
1369         switch (net->ibn_init) {
1370         default:
1371                 LBUG();
1372
1373         case IBLND_INIT_ALL:
1374                 /* nuke all existing peers within this net */
1375                 kiblnd_del_peer(ni, LNET_NID_ANY);
1376
1377                 /* Wait for all peer state to clean up */
1378                 i = 2;
1379                 while (atomic_read(&net->ibn_npeers) != 0) {
1380                         i++;
1381                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n? */
1382                                "%s: waiting for %d peers to disconnect\n",
1383                                libcfs_nid2str(ni->ni_nid),
1384                                atomic_read(&net->ibn_npeers));
1385                         cfs_pause(cfs_time_seconds(1));
1386                 }
1387
1388                 kiblnd_unmap_tx_descs(ni);
1389
1390                 LASSERT (net->ibn_dev->ibd_nnets > 0);
1391                 net->ibn_dev->ibd_nnets--;
1392
1393                 /* fall through */
1394
1395         case IBLND_INIT_NOTHING:
1396                 LASSERT (atomic_read(&net->ibn_nconns) == 0);
1397
1398 #if IBLND_MAP_ON_DEMAND
1399                 if (net->ibn_fmrpool != NULL)
1400                         ib_destroy_fmr_pool(net->ibn_fmrpool);
1401 #endif
1402                 if (net->ibn_dev != NULL &&
1403                     net->ibn_dev->ibd_nnets == 0)
1404                         kiblnd_destroy_dev(net->ibn_dev);
1405
1406                 break;
1407         }
1408
1409         kiblnd_free_tx_descs(ni);
1410
1411         CDEBUG(D_MALLOC, "after LND net cleanup: kmem %d\n",
1412                atomic_read(&libcfs_kmemory));
1413
1414         net->ibn_init = IBLND_INIT_NOTHING;
1415         ni->ni_data = NULL;
1416         
1417         LIBCFS_FREE(net, sizeof(*net));
1418
1419 out:
1420         if (list_empty(&kiblnd_data.kib_devs))
1421                 kiblnd_base_shutdown();
1422         return;
1423 }
1424
1425 int
1426 kiblnd_base_startup (void)
1427 {
1428         int i;
1429         int rc;
1430
1431         LASSERT (kiblnd_data.kib_init == IBLND_INIT_NOTHING);
1432
1433         PORTAL_MODULE_USE;
1434         memset(&kiblnd_data, 0, sizeof(kiblnd_data)); /* zero pointers, flags etc */
1435
1436         rwlock_init(&kiblnd_data.kib_global_lock);
1437
1438         INIT_LIST_HEAD(&kiblnd_data.kib_devs);
1439
1440         kiblnd_data.kib_peer_hash_size = IBLND_PEER_HASH_SIZE;
1441         LIBCFS_ALLOC(kiblnd_data.kib_peers,
1442                      sizeof(struct list_head) * kiblnd_data.kib_peer_hash_size);
1443         if (kiblnd_data.kib_peers == NULL) {
1444                 goto failed;
1445         }
1446         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++)
1447                 INIT_LIST_HEAD(&kiblnd_data.kib_peers[i]);
1448
1449         spin_lock_init(&kiblnd_data.kib_connd_lock);
1450         INIT_LIST_HEAD(&kiblnd_data.kib_connd_conns);
1451         INIT_LIST_HEAD(&kiblnd_data.kib_connd_zombies);
1452         init_waitqueue_head(&kiblnd_data.kib_connd_waitq);
1453
1454         spin_lock_init(&kiblnd_data.kib_sched_lock);
1455         INIT_LIST_HEAD(&kiblnd_data.kib_sched_conns);
1456         init_waitqueue_head(&kiblnd_data.kib_sched_waitq);
1457
1458         kiblnd_data.kib_error_qpa.qp_state = IB_QPS_ERR;
1459
1460         /* lists/ptrs/locks initialised */
1461         kiblnd_data.kib_init = IBLND_INIT_DATA;
1462         /*****************************************************/
1463
1464         for (i = 0; i < IBLND_N_SCHED; i++) {
1465                 rc = kiblnd_thread_start(kiblnd_scheduler, (void *)((long)i));
1466                 if (rc != 0) {
1467                         CERROR("Can't spawn o2iblnd scheduler[%d]: %d\n",
1468                                i, rc);
1469                         goto failed;
1470                 }
1471         }
1472
1473         rc = kiblnd_thread_start(kiblnd_connd, NULL);
1474         if (rc != 0) {
1475                 CERROR("Can't spawn o2iblnd connd: %d\n", rc);
1476                 goto failed;
1477         }
1478
1479         /* flag everything initialised */
1480         kiblnd_data.kib_init = IBLND_INIT_ALL;
1481         /*****************************************************/
1482
1483         return 0;
1484
1485  failed:
1486         kiblnd_base_shutdown();
1487         return -ENETDOWN;
1488 }
1489
1490 int
1491 kiblnd_startup (lnet_ni_t *ni)
1492 {
1493         char                     *ifname;
1494         kib_net_t                *net;
1495         kib_dev_t                *ibdev;
1496         struct list_head         *tmp;
1497         struct timeval            tv;
1498         int                       rc;
1499
1500         LASSERT (ni->ni_lnd == &the_kiblnd);
1501
1502         if (kiblnd_data.kib_init == IBLND_INIT_NOTHING) {
1503                 rc = kiblnd_base_startup();
1504                 if (rc != 0)
1505                         return rc;
1506         }
1507
1508         LIBCFS_ALLOC(net, sizeof(*net));
1509         ni->ni_data = net;
1510         if (net == NULL)
1511                 goto failed;
1512
1513         memset(net, 0, sizeof(*net));
1514
1515         do_gettimeofday(&tv);
1516         net->ibn_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1517
1518         ni->ni_maxtxcredits = *kiblnd_tunables.kib_credits;
1519         ni->ni_peertxcredits = *kiblnd_tunables.kib_peercredits;
1520
1521         spin_lock_init(&net->ibn_tx_lock);
1522         INIT_LIST_HEAD(&net->ibn_idle_txs);
1523
1524         rc = kiblnd_alloc_tx_descs(ni);
1525         if (rc != 0) {
1526                 CERROR("Can't allocate tx descs\n");
1527                 goto failed;
1528         }
1529
1530         if (ni->ni_interfaces[0] != NULL) {
1531                 /* Use the IPoIB interface specified in 'networks=' */
1532
1533                 CLASSERT (LNET_MAX_INTERFACES > 1);
1534                 if (ni->ni_interfaces[1] != NULL) {
1535                         CERROR("Multiple interfaces not supported\n");
1536                         goto failed;
1537                 }
1538
1539                 ifname = ni->ni_interfaces[0];
1540         } else {
1541                 ifname = *kiblnd_tunables.kib_default_ipif;
1542         }
1543
1544         if (strlen(ifname) >= sizeof(ibdev->ibd_ifname)) {
1545                 CERROR("IPoIB interface name too long: %s\n", ifname);
1546                 goto failed;
1547         }
1548
1549         ibdev = NULL;
1550         list_for_each (tmp, &kiblnd_data.kib_devs) {
1551                 ibdev = list_entry(tmp, kib_dev_t, ibd_list);
1552
1553                 if (!strcmp(&ibdev->ibd_ifname[0], ifname))
1554                         break;
1555
1556                 ibdev = NULL;
1557         }
1558
1559         if (ibdev == NULL) {
1560                 __u32                     ip;
1561                 __u32                     netmask;
1562                 int                       up;
1563                 struct rdma_cm_id        *id;
1564                 struct ib_pd             *pd;
1565                 struct ib_mr             *mr;
1566                 struct sockaddr_in        addr;
1567
1568                 rc = libcfs_ipif_query(ifname, &up, &ip, &netmask);
1569                 if (rc != 0) {
1570                         CERROR("Can't query IPoIB interface %s: %d\n",
1571                                ifname, rc);
1572                         goto failed;
1573                 }
1574
1575                 if (!up) {
1576                         CERROR("Can't query IPoIB interface %s: it's down\n",
1577                                ifname);
1578                         goto failed;
1579                 }
1580
1581                 LIBCFS_ALLOC(ibdev, sizeof(*ibdev));
1582                 if (ibdev == NULL)
1583                         goto failed;
1584
1585                 memset(ibdev, 0, sizeof(*ibdev));
1586
1587                 INIT_LIST_HEAD(&ibdev->ibd_list); /* not yet in kib_devs */
1588                 ibdev->ibd_ifip = ip;
1589                 strcpy(&ibdev->ibd_ifname[0], ifname);
1590
1591                 id = rdma_create_id(kiblnd_cm_callback, ibdev, RDMA_PS_TCP);
1592                 if (!IS_ERR(id)) {
1593                         ibdev->ibd_cmid = id;
1594                 } else {
1595                         CERROR("Can't create listen ID: %ld\n", PTR_ERR(id));
1596                         goto failed;
1597                 }
1598
1599                 memset(&addr, 0, sizeof(addr));
1600                 addr.sin_family      = AF_INET;
1601                 addr.sin_port        = htons(*kiblnd_tunables.kib_service);
1602                 addr.sin_addr.s_addr = htonl(ip);
1603
1604                 rc = rdma_bind_addr(id, (struct sockaddr *)&addr);
1605                 if (rc != 0) {
1606                         CERROR("Can't bind to %s: %d\n", ifname, rc);
1607                         goto failed;
1608                 }
1609
1610                 /* Binding should have assigned me an IB device */
1611                 LASSERT (id->device != NULL);
1612
1613                 pd = ib_alloc_pd(id->device);
1614                 if (!IS_ERR(pd)) {
1615                         ibdev->ibd_pd = pd;
1616                 } else {
1617                         CERROR("Can't allocate PD: %ld\n", PTR_ERR(pd));
1618                         goto failed;
1619                 }
1620
1621 #if IBLND_MAP_ON_DEMAND
1622                 /* MR for sends and receives */
1623                 mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE);
1624 #else
1625                 /* MR for sends, recieves _and_ RDMA...........v */
1626                 mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE |
1627                                        IB_ACCESS_REMOTE_WRITE);
1628 #endif
1629                 if (!IS_ERR(mr)) {
1630                         ibdev->ibd_mr = mr;
1631                 } else {
1632                         CERROR("Can't get MR: %ld\n", PTR_ERR(mr));
1633                         goto failed;
1634                 }
1635
1636                 rc = rdma_listen(id, 0);
1637                 if (rc != 0) {
1638                         CERROR("Can't start listener: %d\n", rc);
1639                         goto failed;
1640                 }
1641
1642                 list_add_tail(&ibdev->ibd_list, 
1643                               &kiblnd_data.kib_devs);
1644         }
1645
1646         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ibdev->ibd_ifip);
1647         net->ibn_dev = ibdev;
1648
1649 #if IBLND_MAP_ON_DEMAND
1650         /* FMR pool for RDMA */
1651         {
1652                 struct ib_fmr_pool      *fmrpool;
1653                 struct ib_fmr_pool_param param = {
1654                         .max_pages_per_fmr = LNET_MAX_PAYLOAD/PAGE_SIZE,
1655                         .page_shift        = PAGE_SHIFT,
1656                         .access            = (IB_ACCESS_LOCAL_WRITE |
1657                                               IB_ACCESS_REMOTE_WRITE),
1658                         .pool_size         = *kiblnd_tunables.kib_fmr_pool_size,
1659                         .dirty_watermark   = *kiblnd_tunables.kib_fmr_flush_trigger,
1660                         .flush_function    = NULL,
1661                         .flush_arg         = NULL,
1662                         .cache             = *kiblnd_tunables.kib_fmr_cache};
1663
1664                 if (*kiblnd_tunables.kib_fmr_pool_size < 
1665                     *kiblnd_tunables.kib_ntx) {
1666                         CERROR("Can't set fmr pool size (%d) < ntx(%d)\n",
1667                                *kiblnd_tunables.kib_fmr_pool_size,
1668                                *kiblnd_tunables.kib_ntx);
1669                         goto failed;
1670                 }
1671
1672                 fmrpool = ib_create_fmr_pool(ibdev->ibd_pd, &param);
1673                 if (!IS_ERR(fmrpool)) {
1674                         net->ibn_fmrpool = fmrpool;
1675                 } else {
1676                         CERROR("Can't create FMR pool: %ld\n", 
1677                                PTR_ERR(fmrpool));
1678                         goto failed;
1679                 }
1680         }
1681 #endif
1682
1683         kiblnd_map_tx_descs(ni);
1684
1685         ibdev->ibd_nnets++;
1686         net->ibn_init = IBLND_INIT_ALL;
1687
1688         return 0;
1689
1690 failed:
1691         kiblnd_shutdown(ni);
1692
1693         CDEBUG(D_NET, "kiblnd_startup failed\n");
1694         return -ENETDOWN;
1695 }
1696
1697 void __exit
1698 kiblnd_module_fini (void)
1699 {
1700         lnet_unregister_lnd(&the_kiblnd);
1701         kiblnd_tunables_fini();
1702 }
1703
1704 int __init
1705 kiblnd_module_init (void)
1706 {
1707         int    rc;
1708
1709         CLASSERT (sizeof(kib_msg_t) <= IBLND_MSG_SIZE);
1710 #if !IBLND_MAP_ON_DEMAND
1711         CLASSERT (offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
1712                   <= IBLND_MSG_SIZE);
1713         CLASSERT (offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
1714                   <= IBLND_MSG_SIZE);
1715 #endif
1716         rc = kiblnd_tunables_init();
1717         if (rc != 0)
1718                 return rc;
1719
1720         lnet_register_lnd(&the_kiblnd);
1721
1722         return 0;
1723 }
1724
1725 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1726 MODULE_DESCRIPTION("Kernel OpenIB gen2 LND v1.00");
1727 MODULE_LICENSE("GPL");
1728
1729 module_init(kiblnd_module_init);
1730 module_exit(kiblnd_module_fini);