Whamcloud - gitweb
Landing b_hd_newconfig on HEAD
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openiblnd.h"
25
26 lnd_t the_kiblnd = {
27 #ifdef USING_TSAPI
28         .lnd_type       = CIBLND,
29 #else
30         .lnd_type       = OPENIBLND,
31 #endif
32         .lnd_startup    = kibnal_startup,
33         .lnd_shutdown   = kibnal_shutdown,
34         .lnd_ctl        = kibnal_ctl,
35         .lnd_send       = kibnal_send,
36         .lnd_recv       = kibnal_recv,
37         .lnd_eager_recv = kibnal_eager_recv,
38         .lnd_accept     = kibnal_accept,
39 };
40
41 kib_data_t              kibnal_data;
42
43 __u32 
44 kibnal_cksum (void *ptr, int nob)
45 {
46         char  *c  = ptr;
47         __u32  sum = 0;
48
49         while (nob-- > 0)
50                 sum = ((sum << 1) | (sum >> 31)) + *c++;
51
52         /* ensure I don't return 0 (== no checksum) */
53         return (sum == 0) ? 1 : sum;
54 }
55
56 void
57 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
58 {
59         msg->ibm_type = type;
60         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
61 }
62
63 void
64 kibnal_pack_msg(kib_msg_t *msg, int version, int credits, 
65                 lnet_nid_t dstnid, __u64 dststamp)
66 {
67         /* CAVEAT EMPTOR! all message fields not set here should have been
68          * initialised previously. */
69         msg->ibm_magic    = IBNAL_MSG_MAGIC;
70         msg->ibm_version  = version;
71         /*   ibm_type */
72         msg->ibm_credits  = credits;
73         /*   ibm_nob */
74         msg->ibm_cksum    = 0;
75         msg->ibm_srcnid   = lnet_ptlcompat_srcnid(kibnal_data.kib_ni->ni_nid,
76                                                   dstnid);
77         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
78         msg->ibm_dstnid   = dstnid;
79         msg->ibm_dststamp = dststamp;
80
81         if (*kibnal_tunables.kib_cksum) {
82                 /* NB ibm_cksum zero while computing cksum */
83                 msg->ibm_cksum    = kibnal_cksum(msg, msg->ibm_nob);
84         }
85 }
86
87 int
88 kibnal_unpack_msg(kib_msg_t *msg, int expected_version, int nob)
89 {
90         const int hdr_size = offsetof(kib_msg_t, ibm_u);
91         __u32     msg_cksum;
92         int       msg_version;
93         int       flip;
94         int       msg_nob;
95
96         if (nob < 6) {
97                 CERROR("Short message: %d\n", nob);
98                 return -EPROTO;
99         }
100
101         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
102                 flip = 0;
103         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
104                 flip = 1;
105         } else {
106                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
107                 return -EPROTO;
108         }
109
110         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
111         if ((expected_version == 0) ?
112             (msg_version != IBNAL_MSG_VERSION &&
113              msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) :
114             (msg_version != expected_version)) {
115                 CERROR("Bad version: %x\n", msg_version);
116                 return -EPROTO;
117         }
118
119         if (nob < hdr_size) {
120                 CERROR("Short message: %d\n", nob);
121                 return -EPROTO;
122         }
123
124         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
125         if (msg_nob > nob) {
126                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
127                 return -EPROTO;
128         }
129
130         /* checksum must be computed with ibm_cksum zero and BEFORE anything
131          * gets flipped */
132         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
133         msg->ibm_cksum = 0;
134         if (msg_cksum != 0 &&
135             msg_cksum != kibnal_cksum(msg, msg_nob)) {
136                 CERROR("Bad checksum\n");
137                 return -EPROTO;
138         }
139         msg->ibm_cksum = msg_cksum;
140         
141         if (flip) {
142                 /* leave magic unflipped as a clue to peer endianness */
143                 msg->ibm_version = msg_version;
144                 LASSERT (sizeof(msg->ibm_type) == 1);
145                 LASSERT (sizeof(msg->ibm_credits) == 1);
146                 msg->ibm_nob = msg_nob;
147                 __swab64s(&msg->ibm_srcnid);
148                 __swab64s(&msg->ibm_srcstamp);
149                 __swab64s(&msg->ibm_dstnid);
150                 __swab64s(&msg->ibm_dststamp);
151         }
152         
153         if (msg->ibm_srcnid == LNET_NID_ANY) {
154                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
155                 return -EPROTO;
156         }
157
158         switch (msg->ibm_type) {
159         default:
160                 CERROR("Unknown message type %x\n", msg->ibm_type);
161                 return -EPROTO;
162                 
163         case IBNAL_MSG_SVCQRY:
164         case IBNAL_MSG_NOOP:
165                 break;
166
167         case IBNAL_MSG_SVCRSP:
168                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.svcrsp)) {
169                         CERROR("Short SVCRSP: %d(%d)\n", msg_nob,
170                                (int)(hdr_size + sizeof(msg->ibm_u.svcrsp)));
171                         return -EPROTO;
172                 }
173                 if (flip) {
174                         __swab64s(&msg->ibm_u.svcrsp.ibsr_svc_id);
175                         __swab16s(&msg->ibm_u.svcrsp.ibsr_svc_pkey);
176                 }
177                 break;
178
179         case IBNAL_MSG_CONNREQ:
180         case IBNAL_MSG_CONNACK:
181                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
182                         CERROR("Short CONNREQ: %d(%d)\n", msg_nob,
183                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
184                         return -EPROTO;
185                 }
186                 if (flip)
187                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
188                 break;
189
190         case IBNAL_MSG_IMMEDIATE:
191                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
192                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
193                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
194                         return -EPROTO;
195                 }
196                 break;
197
198         case IBNAL_MSG_PUT_RDMA:
199         case IBNAL_MSG_GET_RDMA:
200                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.rdma)) {
201                         CERROR("Short RDMA req: %d(%d)\n", msg_nob,
202                                (int)(hdr_size + sizeof(msg->ibm_u.rdma)));
203                         return -EPROTO;
204                 }
205                 if (flip) {
206                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_key);
207                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_nob);
208                         __swab64s(&msg->ibm_u.rdma.ibrm_desc.rd_addr);
209                 }
210                 break;
211
212         case IBNAL_MSG_PUT_DONE:
213         case IBNAL_MSG_GET_DONE:
214                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
215                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
216                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
217                         return -EPROTO;
218                 }
219                 if (flip)
220                         __swab32s(&msg->ibm_u.completion.ibcm_status);
221                 break;
222         }
223         return 0;
224 }
225
226 int
227 kibnal_make_svcqry (kib_conn_t *conn) 
228 {
229         kib_peer_t    *peer = conn->ibc_peer;
230         int            version = IBNAL_MSG_VERSION;
231         int            msg_version;
232         kib_msg_t     *msg;
233         struct socket *sock;
234         int            rc;
235         int            nob;
236
237         LASSERT (conn->ibc_connreq != NULL);
238         msg = &conn->ibc_connreq->cr_msg;
239
240  again:
241         kibnal_init_msg(msg, IBNAL_MSG_SVCQRY, 0);
242         kibnal_pack_msg(msg, version, 0, peer->ibp_nid, 0);
243
244         rc = lnet_connect(&sock, peer->ibp_nid,
245                           0, peer->ibp_ip, peer->ibp_port);
246         if (rc != 0)
247                 return -ECONNABORTED;
248         
249         rc = libcfs_sock_write(sock, msg, msg->ibm_nob,
250                                lnet_acceptor_timeout());
251         if (rc != 0) {
252                 CERROR("Error %d sending svcqry to %s at %u.%u.%u.%u/%d\n", 
253                        rc, libcfs_nid2str(peer->ibp_nid), 
254                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
255                 goto out;
256         }
257
258         /* The first 6 bytes are invariably MAGIC + proto version */
259         rc = libcfs_sock_read(sock, msg, 6, *kibnal_tunables.kib_timeout);
260         if (rc != 0) {
261                 CERROR("Error %d receiving svcrsp from %s at %u.%u.%u.%u/%d\n", 
262                        rc, libcfs_nid2str(peer->ibp_nid), 
263                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
264                 goto out;
265         }
266
267         if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
268             msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
269                 CERROR("Bad magic: %08x from %s at %u.%u.%u.%u/%d\n",
270                        msg->ibm_magic, libcfs_nid2str(peer->ibp_nid),
271                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
272                 rc = -EPROTO;
273                 goto out;
274         }
275
276         msg_version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ? 
277                       msg->ibm_version : __swab16(msg->ibm_version);
278         if (msg_version != version) {
279                 if (version == IBNAL_MSG_VERSION) {
280                         /* retry with previous version */
281                         libcfs_sock_release(sock);
282                         version = IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD;
283                         goto again;
284                 }
285                 
286                 CERROR("Bad version %x from %s at %u.%u.%u.%u/%d\n",
287                        msg_version, libcfs_nid2str(peer->ibp_nid),
288                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
289                 rc = -EPROTO;
290                 goto out;
291         }
292
293         /* Read in the rest of the message now we know the expected format */
294         nob = offsetof(kib_msg_t, ibm_u) + sizeof(kib_svcrsp_t);
295         rc = libcfs_sock_read(sock, ((char *)msg) + 6, nob - 6,
296                               *kibnal_tunables.kib_timeout);
297         if (rc != 0) {
298                 CERROR("Error %d receiving svcrsp from %s at %u.%u.%u.%u/%d\n", 
299                        rc, libcfs_nid2str(peer->ibp_nid), 
300                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
301                 goto out;
302         }
303
304         rc = kibnal_unpack_msg(msg, version, nob);
305         if (rc != 0) {
306                 CERROR("Error %d unpacking svcrsp from %s at %u.%u.%u.%u/%d\n", 
307                        rc, libcfs_nid2str(peer->ibp_nid), 
308                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
309                 goto out;
310         }
311                        
312         if (msg->ibm_type != IBNAL_MSG_SVCRSP) {
313                 CERROR("Unexpected response type %d from %s at %u.%u.%u.%u/%d\n", 
314                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid), 
315                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
316                 rc = -EPROTO;
317                 goto out;
318         }
319         
320         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
321                                      msg->ibm_dstnid) ||
322             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
323                 CERROR("Unexpected dst NID/stamp %s/"LPX64" from "
324                        "%s at %u.%u.%u.%u/%d\n", 
325                        libcfs_nid2str(msg->ibm_dstnid), msg->ibm_dststamp,
326                        libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip), 
327                        peer->ibp_port);
328                 rc = -EPROTO;
329                 goto out;
330         }
331
332         if (!lnet_ptlcompat_matchnid(peer->ibp_nid, msg->ibm_srcnid)) {
333                 CERROR("Unexpected src NID %s from %s at %u.%u.%u.%u/%d\n", 
334                        libcfs_nid2str(msg->ibm_srcnid),
335                        libcfs_nid2str(peer->ibp_nid), 
336                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
337                 rc = -EPROTO;
338                 goto out;
339         }
340
341         conn->ibc_incarnation = msg->ibm_srcstamp;
342         conn->ibc_connreq->cr_svcrsp = msg->ibm_u.svcrsp;
343         conn->ibc_version = version;
344         
345  out:
346         libcfs_sock_release(sock);
347         return rc;
348 }
349
350 void
351 kibnal_handle_svcqry (struct socket *sock)
352 {
353         __u32                peer_ip;
354         unsigned int         peer_port;
355         kib_msg_t           *msg;
356         __u64                srcnid;
357         __u64                srcstamp;
358         int                  version;
359         int                  reject = 0;
360         int                  rc;
361
362         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
363         if (rc != 0) {
364                 CERROR("Can't get peer's IP: %d\n", rc);
365                 return;
366         }
367
368         LIBCFS_ALLOC(msg, sizeof(*msg));
369         if (msg == NULL) {
370                 CERROR("Can't allocate msgs for %u.%u.%u.%u/%d\n",
371                        HIPQUAD(peer_ip), peer_port);
372                 return;
373         }
374         
375         rc = libcfs_sock_read(sock, &msg->ibm_magic, sizeof(msg->ibm_magic),
376                               lnet_acceptor_timeout());
377         if (rc != 0) {
378                 CERROR("Error %d receiving svcqry(1) from %u.%u.%u.%u/%d\n",
379                        rc, HIPQUAD(peer_ip), peer_port);
380                 goto out;
381         }
382
383         if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
384             msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
385                 /* Unexpected magic! */
386                 if (the_lnet.ln_ptlcompat == 0) {
387                         if (msg->ibm_magic == LNET_PROTO_MAGIC ||
388                             msg->ibm_magic == __swab32(LNET_PROTO_MAGIC)) {
389                                 /* future protocol version compatibility!
390                                  * When LNET unifies protocols over all LNDs,
391                                  * the first thing sent will be a version
392                                  * query.  I send back a reply in my current
393                                  * protocol to tell her I'm "old" */
394                                 kibnal_init_msg(msg, 0, 0);
395                                 kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, 
396                                                 LNET_NID_ANY, 0);
397                                 reject = 1;
398                                 goto reply;
399                         }
400
401                         CERROR ("Bad magic(1) %#08x (%#08x expected) from "
402                                 "%u.%u.%u.%u/%d\n", msg->ibm_magic,
403                                 IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port);
404                         goto out;
405                 }
406
407                 /* When portals compatibility is set, I may be passed a new
408                  * connection "blindly" by the acceptor, and I have to
409                  * determine if my peer has sent an acceptor connection request
410                  * or not. */
411                 rc = lnet_accept(kibnal_data.kib_ni, sock, msg->ibm_magic);
412                 if (rc != 0)
413                         goto out;
414
415                 /* It was an acceptor connection request!
416                  * Now I should see my magic... */
417                 rc = libcfs_sock_read(sock, &msg->ibm_magic,
418                                       sizeof(msg->ibm_magic),
419                                       lnet_acceptor_timeout());
420                 if (rc != 0) {
421                         CERROR("Error %d receiving svcqry(2) from %u.%u.%u.%u/%d\n",
422                                rc, HIPQUAD(peer_ip), peer_port);
423                         goto out;
424                 }
425
426                 if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
427                     msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
428                         CERROR ("Bad magic(2) %#08x (%#08x expected) from "
429                                 "%u.%u.%u.%u/%d\n", msg->ibm_magic,
430                                 IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port);
431                         goto out;
432                 }
433         }
434
435         /* Now check version */
436
437         rc = libcfs_sock_read(sock, &msg->ibm_version, sizeof(msg->ibm_version),
438                               lnet_acceptor_timeout());
439         if (rc != 0) {
440                 CERROR("Error %d receiving svcqry(3) from %u.%u.%u.%u/%d\n",
441                        rc, HIPQUAD(peer_ip), peer_port);
442                 goto out;
443         }
444
445         version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ?
446                   msg->ibm_version : __swab32(msg->ibm_version);
447         /* Peer is a different protocol version: reply in my current protocol
448          * to tell her I'm "old" */
449         if (version != IBNAL_MSG_VERSION &&
450             version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
451                 kibnal_init_msg(msg, 0, 0);
452                 kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, LNET_NID_ANY, 0);
453                 reject = 1;
454                 goto reply;
455         }
456         
457         /* Now read in all the rest */
458         rc = libcfs_sock_read(sock, &msg->ibm_type,
459                               offsetof(kib_msg_t, ibm_u) -
460                               offsetof(kib_msg_t, ibm_type),
461                               lnet_acceptor_timeout());
462         if (rc != 0) {
463                 CERROR("Error %d receiving svcqry(4) from %u.%u.%u.%u/%d\n",
464                        rc, HIPQUAD(peer_ip), peer_port);
465                 goto out;
466         }
467         
468         rc = kibnal_unpack_msg(msg, version, offsetof(kib_msg_t, ibm_u));
469         if (rc != 0) {
470                 CERROR("Error %d unpacking svcqry from %u.%u.%u.%u/%d\n",
471                        rc, HIPQUAD(peer_ip), peer_port);
472                 goto out;
473         }
474         
475         if (msg->ibm_type != IBNAL_MSG_SVCQRY) {
476                 CERROR("Unexpected message %d from %u.%u.%u.%u/%d\n",
477                        msg->ibm_type, HIPQUAD(peer_ip), peer_port);
478                 goto out;
479         }
480         
481         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
482                                      msg->ibm_dstnid)) {
483                 CERROR("Unexpected dstnid %s: expected %s from %u.%u.%u.%u/%d\n",
484                        libcfs_nid2str(msg->ibm_dstnid),
485                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid),
486                        HIPQUAD(peer_ip), peer_port);
487                 goto out;
488         }
489
490         srcnid = msg->ibm_srcnid;
491         srcstamp = msg->ibm_srcstamp;
492         
493         kibnal_init_msg(msg, IBNAL_MSG_SVCRSP, sizeof(msg->ibm_u.svcrsp));
494
495         msg->ibm_u.svcrsp.ibsr_svc_id = kibnal_data.kib_svc_id;
496         memcpy(msg->ibm_u.svcrsp.ibsr_svc_gid, kibnal_data.kib_svc_gid,
497                sizeof(kibnal_data.kib_svc_gid));
498         msg->ibm_u.svcrsp.ibsr_svc_pkey = kibnal_data.kib_svc_pkey;
499
500         kibnal_pack_msg(msg, version, 0, srcnid, srcstamp);
501
502  reply:
503         rc = libcfs_sock_write (sock, msg, msg->ibm_nob,
504                                 lnet_acceptor_timeout());
505         if (!reject && rc != 0) {
506                 /* Only complain if we're not rejecting */
507                 CERROR("Error %d replying to svcqry from %u.%u.%u.%u/%d\n",
508                        rc, HIPQUAD(peer_ip), peer_port);
509                 goto out;
510         }
511         
512  out:
513         LIBCFS_FREE(msg, sizeof(*msg));
514 }
515
516 void
517 kibnal_free_acceptsock (kib_acceptsock_t *as)
518 {
519         libcfs_sock_release(as->ibas_sock);
520         LIBCFS_FREE(as, sizeof(*as));
521 }
522
523 int
524 kibnal_accept(lnet_ni_t *ni, struct socket *sock)
525 {
526         kib_acceptsock_t  *as;
527         unsigned long      flags;
528
529         LIBCFS_ALLOC(as, sizeof(*as));
530         if (as == NULL) {
531                 CERROR("Out of Memory\n");
532                 return -ENOMEM;
533         }
534
535         as->ibas_sock = sock;
536                 
537         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
538                 
539         list_add_tail(&as->ibas_list, &kibnal_data.kib_connd_acceptq);
540         wake_up(&kibnal_data.kib_connd_waitq);
541
542         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
543         return 0;
544 }
545
546 int
547 kibnal_start_ib_listener (void) 
548 {
549         int    rc;
550
551         LASSERT (kibnal_data.kib_listen_handle == NULL);
552
553         kibnal_data.kib_svc_id = ib_cm_service_assign();
554         CDEBUG(D_NET, "svc id "LPX64"\n", kibnal_data.kib_svc_id);
555
556         rc = ib_cached_gid_get(kibnal_data.kib_device,
557                                kibnal_data.kib_port, 0,
558                                kibnal_data.kib_svc_gid);
559         if (rc != 0) {
560                 CERROR("Can't get port %d GID: %d\n",
561                        kibnal_data.kib_port, rc);
562                 return rc;
563         }
564         
565         rc = ib_cached_pkey_get(kibnal_data.kib_device,
566                                 kibnal_data.kib_port, 0,
567                                 &kibnal_data.kib_svc_pkey);
568         if (rc != 0) {
569                 CERROR ("Can't get port %d PKEY: %d\n",
570                         kibnal_data.kib_port, rc);
571                 return rc;
572         }
573
574         rc = ib_cm_listen(kibnal_data.kib_svc_id,
575                           TS_IB_CM_SERVICE_EXACT_MASK,
576                           kibnal_passive_conn_callback, NULL,
577                           &kibnal_data.kib_listen_handle);
578         if (rc != 0) {
579                 kibnal_data.kib_listen_handle = NULL;
580                 CERROR ("Can't create IB listener: %d\n", rc);
581                 return rc;
582         }
583         
584         LASSERT (kibnal_data.kib_listen_handle != NULL);
585         return 0;
586 }
587
588 void
589 kibnal_stop_ib_listener (void) 
590 {
591         int    rc;
592         
593         LASSERT (kibnal_data.kib_listen_handle != NULL);
594
595         rc = ib_cm_listen_stop (kibnal_data.kib_listen_handle);
596         if (rc != 0)
597                 CERROR("Error stopping IB listener: %d\n", rc);
598                 
599         kibnal_data.kib_listen_handle = NULL;
600 }
601
602 int
603 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
604 {
605         kib_peer_t     *peer;
606         unsigned long   flags;
607         int             rc;
608
609         LASSERT (nid != LNET_NID_ANY);
610
611         LIBCFS_ALLOC(peer, sizeof (*peer));
612         if (peer == NULL) {
613                 CERROR("Cannot allocate peer\n");
614                 return -ENOMEM;
615         }
616
617         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
618
619         peer->ibp_nid = nid;
620         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
621
622         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
623         INIT_LIST_HEAD (&peer->ibp_conns);
624         INIT_LIST_HEAD (&peer->ibp_tx_queue);
625         INIT_LIST_HEAD (&peer->ibp_connd_list); /* not queued for connecting */
626
627         peer->ibp_error = 0;
628         peer->ibp_last_alive = cfs_time_current();
629         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
630
631         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
632
633         if (atomic_read(&kibnal_data.kib_npeers) >=
634             *kibnal_tunables.kib_concurrent_peers) {
635                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
636         } else if (kibnal_data.kib_nonewpeers) {
637                 rc = -ESHUTDOWN;        /* shutdown has started */
638         } else {
639                 rc = 0;
640                 /* npeers only grows with kib_global_lock held */
641                 atomic_inc(&kibnal_data.kib_npeers);
642         }
643         
644         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
645
646         if (rc != 0) {
647                 CERROR("Can't create peer: %s\n", 
648                        (rc == -ESHUTDOWN) ? "shutting down" : 
649                        "too many peers");
650                 LIBCFS_FREE(peer, sizeof(*peer));
651         } else {
652                 *peerp = peer;
653         }
654         
655         return rc;
656 }
657
658 void
659 kibnal_destroy_peer (kib_peer_t *peer)
660 {
661         CDEBUG (D_NET, "peer %s %p deleted\n", 
662                 libcfs_nid2str(peer->ibp_nid), peer);
663
664         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
665         LASSERT (peer->ibp_persistence == 0);
666         LASSERT (!kibnal_peer_active(peer));
667         LASSERT (peer->ibp_connecting == 0);
668         LASSERT (peer->ibp_accepting == 0);
669         LASSERT (list_empty (&peer->ibp_connd_list));
670         LASSERT (list_empty (&peer->ibp_conns));
671         LASSERT (list_empty (&peer->ibp_tx_queue));
672
673         LIBCFS_FREE (peer, sizeof (*peer));
674
675         /* NB a peer's connections keep a reference on their peer until
676          * they are destroyed, so we can be assured that _all_ state to do
677          * with this peer has been cleaned up when its refcount drops to
678          * zero. */
679         atomic_dec(&kibnal_data.kib_npeers);
680 }
681
682 kib_peer_t *
683 kibnal_find_peer_locked (lnet_nid_t nid)
684 {
685         struct list_head *peer_list = kibnal_nid2peerlist (nid);
686         struct list_head *tmp;
687         kib_peer_t       *peer;
688
689         list_for_each (tmp, peer_list) {
690
691                 peer = list_entry (tmp, kib_peer_t, ibp_list);
692
693                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
694                          peer->ibp_connecting != 0 || /* creating conns */
695                          peer->ibp_accepting != 0 ||
696                          !list_empty (&peer->ibp_conns));  /* active conn */
697
698                 if (peer->ibp_nid != nid)
699                         continue;
700
701                 return (peer);
702         }
703         return (NULL);
704 }
705
706 kib_peer_t *
707 kibnal_get_peer (lnet_nid_t nid)
708 {
709         kib_peer_t     *peer;
710         unsigned long   flags;
711
712         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
713         peer = kibnal_find_peer_locked (nid);
714         if (peer != NULL)                       /* +1 ref for caller? */
715                 kibnal_peer_addref(peer);
716         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
717
718         return (peer);
719 }
720
721 void
722 kibnal_unlink_peer_locked (kib_peer_t *peer)
723 {
724         LASSERT (peer->ibp_persistence == 0);
725         LASSERT (list_empty(&peer->ibp_conns));
726
727         LASSERT (kibnal_peer_active(peer));
728         list_del_init (&peer->ibp_list);
729         /* lose peerlist's ref */
730         kibnal_peer_decref(peer);
731 }
732
733 int
734 kibnal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,
735                       int *persistencep)
736 {
737         kib_peer_t        *peer;
738         struct list_head  *ptmp;
739         unsigned long      flags;
740         int                i;
741
742         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
743
744         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
745
746                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
747                         
748                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
749                         LASSERT (peer->ibp_persistence != 0 ||
750                                  peer->ibp_connecting != 0 ||
751                                  peer->ibp_accepting != 0 ||
752                                  !list_empty (&peer->ibp_conns));
753
754                         if (index-- > 0)
755                                 continue;
756
757                         *nidp = peer->ibp_nid;
758                         *ipp = peer->ibp_ip;
759                         *portp = peer->ibp_port;
760                         *persistencep = peer->ibp_persistence;
761                         
762                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
763                                                flags);
764                         return (0);
765                 }
766         }
767
768         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
769         return (-ENOENT);
770 }
771
772 int
773 kibnal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port)
774 {
775         unsigned long      flags;
776         kib_peer_t        *peer;
777         kib_peer_t        *peer2;
778         int                rc;
779         
780         if (nid == LNET_NID_ANY)
781                 return (-EINVAL);
782
783         rc = kibnal_create_peer (&peer, nid);
784         if (rc != 0)
785                 return rc;
786
787         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
788
789         peer2 = kibnal_find_peer_locked (nid);
790         if (peer2 != NULL) {
791                 kibnal_peer_decref(peer);
792                 peer = peer2;
793         } else {
794                 /* peer table takes existing ref on peer */
795                 list_add_tail (&peer->ibp_list,
796                                kibnal_nid2peerlist (nid));
797         }
798
799         peer->ibp_ip = ip;
800         peer->ibp_port = port;
801         peer->ibp_persistence++;
802         
803         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
804         return (0);
805 }
806
807 void
808 kibnal_del_peer_locked (kib_peer_t *peer)
809 {
810         struct list_head *ctmp;
811         struct list_head *cnxt;
812         kib_conn_t       *conn;
813
814         peer->ibp_persistence = 0;
815
816         if (list_empty(&peer->ibp_conns)) {
817                 kibnal_unlink_peer_locked(peer);
818         } else {
819                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
820                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
821
822                         kibnal_close_conn_locked (conn, 0);
823                 }
824                 /* NB peer is no longer persistent; closing its last conn
825                  * unlinked it. */
826         }
827         /* NB peer now unlinked; might even be freed if the peer table had the
828          * last ref on it. */
829 }
830
831 int
832 kibnal_del_peer (lnet_nid_t nid)
833 {
834         unsigned long      flags;
835         CFS_LIST_HEAD     (zombies);
836         struct list_head  *ptmp;
837         struct list_head  *pnxt;
838         kib_peer_t        *peer;
839         int                lo;
840         int                hi;
841         int                i;
842         int                rc = -ENOENT;
843
844         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
845
846         if (nid != LNET_NID_ANY)
847                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
848         else {
849                 lo = 0;
850                 hi = kibnal_data.kib_peer_hash_size - 1;
851         }
852
853         for (i = lo; i <= hi; i++) {
854                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
855                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
856                         LASSERT (peer->ibp_persistence != 0 ||
857                                  peer->ibp_connecting != 0 ||
858                                  peer->ibp_accepting != 0 ||
859                                  !list_empty (&peer->ibp_conns));
860
861                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
862                                 continue;
863
864                         if (!list_empty(&peer->ibp_tx_queue)) {
865                                 LASSERT (list_empty(&peer->ibp_conns));
866
867                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
868                         }
869
870                         kibnal_del_peer_locked (peer);
871                         rc = 0;         /* matched something */
872                 }
873         }
874
875         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
876
877         kibnal_txlist_done(&zombies, -EIO);
878
879         return (rc);
880 }
881
882 kib_conn_t *
883 kibnal_get_conn_by_idx (int index)
884 {
885         kib_peer_t        *peer;
886         struct list_head  *ptmp;
887         kib_conn_t        *conn;
888         struct list_head  *ctmp;
889         unsigned long      flags;
890         int                i;
891
892         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
893
894         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
895                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
896
897                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
898                         LASSERT (peer->ibp_persistence > 0 ||
899                                  peer->ibp_connecting != 0 ||
900                                  peer->ibp_accepting != 0 ||
901                                  !list_empty (&peer->ibp_conns));
902
903                         list_for_each (ctmp, &peer->ibp_conns) {
904                                 if (index-- > 0)
905                                         continue;
906
907                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
908                                 kibnal_conn_addref(conn);
909                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
910                                                        flags);
911                                 return (conn);
912                         }
913                 }
914         }
915
916         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
917         return (NULL);
918 }
919
920 kib_conn_t *
921 kibnal_create_conn (void)
922 {
923         kib_conn_t  *conn;
924         int          i;
925         __u64        vaddr = 0;
926         __u64        vaddr_base;
927         int          page_offset;
928         int          ipage;
929         int          rc;
930         union {
931                 struct ib_qp_create_param  qp_create;
932                 struct ib_qp_attribute     qp_attr;
933         } params;
934         
935         LIBCFS_ALLOC (conn, sizeof (*conn));
936         if (conn == NULL) {
937                 CERROR ("Can't allocate connection\n");
938                 return (NULL);
939         }
940
941         /* zero flags, NULL pointers etc... */
942         memset (conn, 0, sizeof (*conn));
943
944         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
945         INIT_LIST_HEAD (&conn->ibc_tx_queue);
946         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
947         INIT_LIST_HEAD (&conn->ibc_active_txs);
948         spin_lock_init (&conn->ibc_lock);
949         
950         atomic_inc (&kibnal_data.kib_nconns);
951         /* well not really, but I call destroy() on failure, which decrements */
952
953         LIBCFS_ALLOC (conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
954         if (conn->ibc_rxs == NULL)
955                 goto failed;
956         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
957
958         rc = kibnal_alloc_pages(&conn->ibc_rx_pages,
959                                 IBNAL_RX_MSG_PAGES,
960                                 IB_ACCESS_LOCAL_WRITE);
961         if (rc != 0)
962                 goto failed;
963
964         vaddr_base = vaddr = conn->ibc_rx_pages->ibp_vaddr;
965
966         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
967                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
968                 kib_rx_t   *rx = &conn->ibc_rxs[i];
969
970                 rx->rx_conn = conn;
971                 rx->rx_vaddr = vaddr;
972                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
973                 
974                 vaddr += IBNAL_MSG_SIZE;
975                 LASSERT (vaddr <= vaddr_base + IBNAL_RX_MSG_BYTES);
976                 
977                 page_offset += IBNAL_MSG_SIZE;
978                 LASSERT (page_offset <= PAGE_SIZE);
979
980                 if (page_offset == PAGE_SIZE) {
981                         page_offset = 0;
982                         ipage++;
983                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
984                 }
985         }
986
987         /* We can post up to IBLND_MSG_QUEUE_SIZE immediate/req messages and
988          * the same # of ack/nak/rdma+done messages */
989
990         params.qp_create = (struct ib_qp_create_param) {
991                 .limit = {
992                         .max_outstanding_send_request    = 3 * IBNAL_MSG_QUEUE_SIZE,
993                         .max_outstanding_receive_request = IBNAL_RX_MSGS,
994                         .max_send_gather_element         = 1,
995                         .max_receive_scatter_element     = 1,
996                 },
997                 .pd              = kibnal_data.kib_pd,
998                 .send_queue      = kibnal_data.kib_cq,
999                 .receive_queue   = kibnal_data.kib_cq,
1000                 .send_policy     = IB_WQ_SIGNAL_SELECTABLE,
1001                 .receive_policy  = IB_WQ_SIGNAL_SELECTABLE,
1002                 .rd_domain       = 0,
1003                 .transport       = IB_TRANSPORT_RC,
1004                 .device_specific = NULL,
1005         };
1006         
1007         rc = ib_qp_create (&params.qp_create, &conn->ibc_qp, &conn->ibc_qpn);
1008         if (rc != 0) {
1009                 CERROR ("Failed to create queue pair: %d\n", rc);
1010                 goto failed;
1011         }
1012         
1013         /* Mark QP created */
1014         conn->ibc_state = IBNAL_CONN_INIT_QP;
1015
1016         params.qp_attr = (struct ib_qp_attribute) {
1017                 .state             = IB_QP_STATE_INIT,
1018                 .port              = kibnal_data.kib_port,
1019                 .enable_rdma_read  = 1,
1020                 .enable_rdma_write = 1,
1021                 .valid_fields      = (IB_QP_ATTRIBUTE_STATE |
1022                                       IB_QP_ATTRIBUTE_PORT |
1023                                       IB_QP_ATTRIBUTE_PKEY_INDEX |
1024                                       IB_QP_ATTRIBUTE_RDMA_ATOMIC_ENABLE),
1025         };
1026         rc = ib_qp_modify(conn->ibc_qp, &params.qp_attr);
1027         if (rc != 0) {
1028                 CERROR ("Failed to modify queue pair: %d\n", rc);
1029                 goto failed;
1030         }
1031
1032         /* 1 ref for caller */
1033         atomic_set (&conn->ibc_refcount, 1);
1034         return (conn);
1035         
1036  failed:
1037         kibnal_destroy_conn (conn);
1038         return (NULL);
1039 }
1040
1041 void
1042 kibnal_destroy_conn (kib_conn_t *conn)
1043 {
1044         int    rc;
1045         
1046         CDEBUG (D_NET, "connection %p\n", conn);
1047
1048         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1049         LASSERT (list_empty(&conn->ibc_tx_queue));
1050         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1051         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1052         LASSERT (list_empty(&conn->ibc_active_txs));
1053         LASSERT (conn->ibc_nsends_posted == 0);
1054         LASSERT (conn->ibc_connreq == NULL);
1055
1056         switch (conn->ibc_state) {
1057         case IBNAL_CONN_ZOMBIE:
1058                 /* called after connection sequence initiated */
1059
1060         case IBNAL_CONN_INIT_QP:
1061                 rc = ib_qp_destroy(conn->ibc_qp);
1062                 if (rc != 0)
1063                         CERROR("Can't destroy QP: %d\n", rc);
1064                 /* fall through */
1065                 
1066         case IBNAL_CONN_INIT_NOTHING:
1067                 break;
1068
1069         default:
1070                 LASSERT (0);
1071         }
1072
1073         if (conn->ibc_rx_pages != NULL) 
1074                 kibnal_free_pages(conn->ibc_rx_pages);
1075         
1076         if (conn->ibc_rxs != NULL)
1077                 LIBCFS_FREE(conn->ibc_rxs, 
1078                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1079
1080         if (conn->ibc_peer != NULL)
1081                 kibnal_peer_decref(conn->ibc_peer);
1082
1083         LIBCFS_FREE(conn, sizeof (*conn));
1084
1085         atomic_dec(&kibnal_data.kib_nconns);
1086         
1087         if (atomic_read (&kibnal_data.kib_nconns) == 0 &&
1088             kibnal_data.kib_shutdown) {
1089                 /* I just nuked the last connection on shutdown; wake up
1090                  * everyone so they can exit. */
1091                 wake_up_all(&kibnal_data.kib_sched_waitq);
1092                 wake_up_all(&kibnal_data.kib_reaper_waitq);
1093         }
1094 }
1095
1096 int
1097 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1098 {
1099         kib_conn_t         *conn;
1100         struct list_head   *ctmp;
1101         struct list_head   *cnxt;
1102         int                 count = 0;
1103
1104         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1105                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1106
1107                 count++;
1108                 kibnal_close_conn_locked (conn, why);
1109         }
1110
1111         return (count);
1112 }
1113
1114 int
1115 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1116 {
1117         kib_conn_t         *conn;
1118         struct list_head   *ctmp;
1119         struct list_head   *cnxt;
1120         int                 count = 0;
1121
1122         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1123                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1124
1125                 if (conn->ibc_incarnation == incarnation)
1126                         continue;
1127
1128                 CDEBUG(D_NET, "Closing stale conn %p nid: %s"
1129                        " incarnation:"LPX64"("LPX64")\n", conn,
1130                        libcfs_nid2str(peer->ibp_nid), 
1131                        conn->ibc_incarnation, incarnation);
1132                 
1133                 count++;
1134                 kibnal_close_conn_locked (conn, -ESTALE);
1135         }
1136
1137         return (count);
1138 }
1139
1140 int
1141 kibnal_close_matching_conns (lnet_nid_t nid)
1142 {
1143         unsigned long       flags;
1144         kib_peer_t         *peer;
1145         struct list_head   *ptmp;
1146         struct list_head   *pnxt;
1147         int                 lo;
1148         int                 hi;
1149         int                 i;
1150         int                 count = 0;
1151
1152         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1153
1154         if (nid != LNET_NID_ANY)
1155                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1156         else {
1157                 lo = 0;
1158                 hi = kibnal_data.kib_peer_hash_size - 1;
1159         }
1160
1161         for (i = lo; i <= hi; i++) {
1162                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1163
1164                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1165                         LASSERT (peer->ibp_persistence != 0 ||
1166                                  peer->ibp_connecting != 0 ||
1167                                  peer->ibp_accepting != 0 ||
1168                                  !list_empty (&peer->ibp_conns));
1169
1170                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1171                                 continue;
1172
1173                         count += kibnal_close_peer_conns_locked (peer, 0);
1174                 }
1175         }
1176
1177         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1178
1179         /* wildcards always succeed */
1180         if (nid == LNET_NID_ANY)
1181                 return (0);
1182         
1183         return (count == 0 ? -ENOENT : 0);
1184 }
1185
1186 int
1187 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1188 {
1189         struct libcfs_ioctl_data *data = arg;
1190         int                       rc = -EINVAL;
1191
1192         LASSERT (ni == kibnal_data.kib_ni);
1193
1194         switch(cmd) {
1195         case IOC_LIBCFS_GET_PEER: {
1196                 lnet_nid_t   nid = 0;
1197                 __u32       ip = 0;
1198                 int         port = 0;
1199                 int         share_count = 0;
1200
1201                 rc = kibnal_get_peer_info(data->ioc_count,
1202                                           &nid, &ip, &port, &share_count);
1203                 data->ioc_nid    = nid;
1204                 data->ioc_count  = share_count;
1205                 data->ioc_u32[0] = ip;
1206                 data->ioc_u32[1] = port;
1207                 break;
1208         }
1209         case IOC_LIBCFS_ADD_PEER: {
1210                 rc = kibnal_add_persistent_peer (data->ioc_nid,
1211                                                  data->ioc_u32[0], /* IP */
1212                                                  data->ioc_u32[1]); /* port */
1213                 break;
1214         }
1215         case IOC_LIBCFS_DEL_PEER: {
1216                 rc = kibnal_del_peer (data->ioc_nid);
1217                 break;
1218         }
1219         case IOC_LIBCFS_GET_CONN: {
1220                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1221
1222                 if (conn == NULL)
1223                         rc = -ENOENT;
1224                 else {
1225                         rc = 0;
1226                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1227                         kibnal_conn_decref(conn);
1228                 }
1229                 break;
1230         }
1231         case IOC_LIBCFS_CLOSE_CONNECTION: {
1232                 rc = kibnal_close_matching_conns (data->ioc_nid);
1233                 break;
1234         }
1235         case IOC_LIBCFS_REGISTER_MYNID: {
1236                 /* Ignore if this is a noop */
1237                 if (data->ioc_nid == ni->ni_nid) {
1238                         rc = 0;
1239                 } else {
1240                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1241                                libcfs_nid2str(data->ioc_nid),
1242                                libcfs_nid2str(ni->ni_nid));
1243                         rc = -EINVAL;
1244                 }
1245                 break;
1246         }
1247         }
1248
1249         return rc;
1250 }
1251
1252 void
1253 kibnal_free_pages (kib_pages_t *p)
1254 {
1255         int     npages = p->ibp_npages;
1256         int     rc;
1257         int     i;
1258         
1259         if (p->ibp_mapped) {
1260                 rc = ib_memory_deregister(p->ibp_handle);
1261                 if (rc != 0)
1262                         CERROR ("Deregister error: %d\n", rc);
1263         }
1264         
1265         for (i = 0; i < npages; i++)
1266                 if (p->ibp_pages[i] != NULL)
1267                         __free_page(p->ibp_pages[i]);
1268         
1269         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1270 }
1271
1272 int
1273 kibnal_alloc_pages (kib_pages_t **pp, int npages, int access)
1274 {
1275         kib_pages_t                *p;
1276         struct ib_physical_buffer  *phys_pages;
1277         int                         i;
1278         int                         rc;
1279
1280         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1281         if (p == NULL) {
1282                 CERROR ("Can't allocate buffer %d\n", npages);
1283                 return (-ENOMEM);
1284         }
1285
1286         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1287         p->ibp_npages = npages;
1288         
1289         for (i = 0; i < npages; i++) {
1290                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1291                 if (p->ibp_pages[i] == NULL) {
1292                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1293                         kibnal_free_pages(p);
1294                         return (-ENOMEM);
1295                 }
1296         }
1297
1298         LIBCFS_ALLOC(phys_pages, npages * sizeof(*phys_pages));
1299         if (phys_pages == NULL) {
1300                 CERROR ("Can't allocate physarray for %d pages\n", npages);
1301                 kibnal_free_pages(p);
1302                 return (-ENOMEM);
1303         }
1304
1305         for (i = 0; i < npages; i++) {
1306                 phys_pages[i].size = PAGE_SIZE;
1307                 phys_pages[i].address =
1308                         lnet_page2phys(p->ibp_pages[i]);
1309         }
1310
1311         p->ibp_vaddr = 0;
1312         rc = ib_memory_register_physical(kibnal_data.kib_pd,
1313                                          phys_pages, npages,
1314                                          &p->ibp_vaddr,
1315                                          npages * PAGE_SIZE, 0,
1316                                          access,
1317                                          &p->ibp_handle,
1318                                          &p->ibp_lkey,
1319                                          &p->ibp_rkey);
1320         
1321         LIBCFS_FREE(phys_pages, npages * sizeof(*phys_pages));
1322         
1323         if (rc != 0) {
1324                 CERROR ("Error %d mapping %d pages\n", rc, npages);
1325                 kibnal_free_pages(p);
1326                 return (rc);
1327         }
1328         
1329         p->ibp_mapped = 1;
1330         *pp = p;
1331         return (0);
1332 }
1333
1334 int
1335 kibnal_setup_tx_descs (void)
1336 {
1337         int           ipage = 0;
1338         int           page_offset = 0;
1339         __u64         vaddr;
1340         __u64         vaddr_base;
1341         struct page  *page;
1342         kib_tx_t     *tx;
1343         int           i;
1344         int           rc;
1345
1346         /* pre-mapped messages are not bigger than 1 page */
1347         LASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1348
1349         /* No fancy arithmetic when we do the buffer calculations */
1350         LASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1351
1352         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1353                                 IBNAL_TX_MSG_PAGES(), 
1354                                 0);            /* local read access only */
1355         if (rc != 0)
1356                 return (rc);
1357
1358         vaddr = vaddr_base = kibnal_data.kib_tx_pages->ibp_vaddr;
1359
1360         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1361                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1362                 tx = &kibnal_data.kib_tx_descs[i];
1363
1364                 memset (tx, 0, sizeof(*tx));    /* zero flags etc */
1365                 
1366                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
1367                 tx->tx_vaddr = vaddr;
1368                 tx->tx_mapped = KIB_TX_UNMAPPED;
1369
1370                 CDEBUG(D_NET, "Tx[%d] %p->%p - "LPX64"\n", 
1371                        i, tx, tx->tx_msg, tx->tx_vaddr);
1372
1373                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1374
1375                 vaddr += IBNAL_MSG_SIZE;
1376                 LASSERT (vaddr <= vaddr_base + IBNAL_TX_MSG_BYTES());
1377
1378                 page_offset += IBNAL_MSG_SIZE;
1379                 LASSERT (page_offset <= PAGE_SIZE);
1380
1381                 if (page_offset == PAGE_SIZE) {
1382                         page_offset = 0;
1383                         ipage++;
1384                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1385                 }
1386         }
1387         
1388         return (0);
1389 }
1390
1391 void
1392 kibnal_shutdown (lnet_ni_t *ni)
1393 {
1394         int           i;
1395         int           rc;
1396         unsigned long flags;
1397
1398         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1399                atomic_read (&libcfs_kmemory));
1400
1401         LASSERT(ni == kibnal_data.kib_ni);
1402         LASSERT(ni->ni_data == &kibnal_data);
1403
1404         switch (kibnal_data.kib_init) {
1405         default:
1406                 CERROR ("Unexpected state %d\n", kibnal_data.kib_init);
1407                 LBUG();
1408
1409         case IBNAL_INIT_ALL:
1410                 /* Prevent new peers from being created */
1411                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1412                 kibnal_data.kib_nonewpeers = 1;
1413                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1414
1415                 kibnal_stop_ib_listener();
1416
1417                 /* Remove all existing peers from the peer table */
1418                 kibnal_del_peer(LNET_NID_ANY);
1419                 
1420                 /* Wait for pending conn reqs to be handled */
1421                 i = 2;
1422                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1423                 while (!list_empty(&kibnal_data.kib_connd_acceptq)) {
1424                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, 
1425                                                flags);
1426                         i++;
1427                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1428                                "waiting for conn reqs to clean up\n");
1429                         cfs_pause(cfs_time_seconds(1));
1430                         
1431                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1432                 }
1433                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1434
1435                 /* Wait for all peer state to clean up */
1436                 i = 2;
1437                 while (atomic_read(&kibnal_data.kib_npeers) != 0) {
1438                         i++;
1439                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1440                                "waiting for %d peers to close down\n",
1441                                atomic_read(&kibnal_data.kib_npeers));
1442                         cfs_pause(cfs_time_seconds(1));
1443                 }
1444                 /* fall through */
1445
1446         case IBNAL_INIT_CQ:
1447                 rc = ib_cq_destroy (kibnal_data.kib_cq);
1448                 if (rc != 0)
1449                         CERROR ("Destroy CQ error: %d\n", rc);
1450                 /* fall through */
1451
1452         case IBNAL_INIT_TXD:
1453                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1454                 /* fall through */
1455 #if IBNAL_FMR
1456         case IBNAL_INIT_FMR:
1457                 rc = ib_fmr_pool_destroy (kibnal_data.kib_fmr_pool);
1458                 if (rc != 0)
1459                         CERROR ("Destroy FMR pool error: %d\n", rc);
1460                 /* fall through */
1461 #endif
1462         case IBNAL_INIT_PD:
1463                 rc = ib_pd_destroy(kibnal_data.kib_pd);
1464                 if (rc != 0)
1465                         CERROR ("Destroy PD error: %d\n", rc);
1466                 /* fall through */
1467
1468         case IBNAL_INIT_DATA:
1469                 /* Module refcount only gets to zero when all peers
1470                  * have been closed so all lists must be empty */
1471                 LASSERT (atomic_read(&kibnal_data.kib_npeers) == 0);
1472                 LASSERT (kibnal_data.kib_peers != NULL);
1473                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1474                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1475                 }
1476                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1477                 LASSERT (list_empty (&kibnal_data.kib_sched_rxq));
1478                 LASSERT (list_empty (&kibnal_data.kib_sched_txq));
1479                 LASSERT (list_empty (&kibnal_data.kib_reaper_conns));
1480                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1481                 LASSERT (list_empty (&kibnal_data.kib_connd_acceptq));
1482
1483                 /* flag threads to terminate; wake and wait for them to die */
1484                 kibnal_data.kib_shutdown = 1;
1485                 wake_up_all (&kibnal_data.kib_sched_waitq);
1486                 wake_up_all (&kibnal_data.kib_reaper_waitq);
1487                 wake_up_all (&kibnal_data.kib_connd_waitq);
1488
1489                 i = 2;
1490                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1491                         i++;
1492                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1493                                "Waiting for %d threads to terminate\n",
1494                                atomic_read (&kibnal_data.kib_nthreads));
1495                         cfs_pause(cfs_time_seconds(1));
1496                 }
1497                 /* fall through */
1498                 
1499         case IBNAL_INIT_NOTHING:
1500                 break;
1501         }
1502
1503         if (kibnal_data.kib_tx_descs != NULL)
1504                 LIBCFS_FREE (kibnal_data.kib_tx_descs,
1505                              IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1506
1507         if (kibnal_data.kib_peers != NULL)
1508                 LIBCFS_FREE (kibnal_data.kib_peers,
1509                              sizeof (struct list_head) * 
1510                              kibnal_data.kib_peer_hash_size);
1511
1512         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1513                atomic_read (&libcfs_kmemory));
1514
1515         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1516         PORTAL_MODULE_UNUSE;
1517 }
1518
1519 int
1520 kibnal_get_ipoibidx(void)
1521 {
1522         /* NB single threaded! */
1523         static struct ib_port_properties port_props;
1524
1525         int               ipoibidx = 0;
1526         int               devidx;
1527         int               port;
1528         int               rc;
1529         struct ib_device *device;
1530
1531         for (devidx = 0; devidx <= kibnal_data.kib_hca_idx; devidx++) {
1532                 device = ib_device_get_by_index(devidx);
1533                 
1534                 if (device == NULL) {
1535                         CERROR("Can't get IB device %d\n", devidx);
1536                         return -1;
1537                 }
1538                 
1539                 for (port = 1; port <= 2; port++) {
1540                         if (devidx == kibnal_data.kib_hca_idx &&
1541                             port == kibnal_data.kib_port)
1542                                 return ipoibidx;
1543                         
1544                         rc = ib_port_properties_get(device, port,
1545                                                     &port_props);
1546                         if (rc == 0)
1547                                 ipoibidx++;
1548                 }
1549         }
1550
1551         LBUG();
1552         return -1;
1553 }
1554
1555 int
1556 kibnal_startup (lnet_ni_t *ni)
1557 {
1558         char              ipif_name[32];
1559         __u32             ip;
1560         __u32             netmask;
1561         int               up;
1562         struct timeval    tv;
1563         int               rc;
1564         int               hca;
1565         int               port;
1566         int               i;
1567         int               nob;
1568
1569         LASSERT (ni->ni_lnd == &the_kiblnd);
1570
1571         /* Only 1 instance supported */
1572         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1573                 CERROR ("Only 1 instance supported\n");
1574                 return -EPERM;
1575         }
1576
1577         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1578                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1579                         *kibnal_tunables.kib_credits,
1580                         *kibnal_tunables.kib_ntx);
1581                 return -EINVAL;
1582         }
1583
1584         memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */
1585
1586         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1587         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1588
1589         CLASSERT (LNET_MAX_INTERFACES > 1);
1590
1591
1592         kibnal_data.kib_hca_idx = 0;            /* default: first HCA */
1593         kibnal_data.kib_port = 0;               /* any port */
1594
1595         if (ni->ni_interfaces[0] != NULL) {
1596                 /* hca.port specified in 'networks=openib(h.p)' */
1597                 if (ni->ni_interfaces[1] != NULL) {
1598                         CERROR("Multiple interfaces not supported\n");
1599                         return -EPERM;
1600                 }
1601                 
1602                 nob = strlen(ni->ni_interfaces[0]);
1603                 i = sscanf(ni->ni_interfaces[0], "%d.%d%n", &hca, &port, &nob);
1604                 if (i >= 2 && nob == strlen(ni->ni_interfaces[0])) {
1605                         kibnal_data.kib_hca_idx = hca;
1606                         kibnal_data.kib_port = port;
1607                 } else {
1608                         nob = strlen(ni->ni_interfaces[0]);
1609                         i = sscanf(ni->ni_interfaces[0], "%d%n", &hca, &nob);
1610
1611                         if (i >= 1 && nob == strlen(ni->ni_interfaces[0])) {
1612                                 kibnal_data.kib_hca_idx = hca;
1613                         } else {
1614                                 CERROR("Can't parse interface '%s'\n",
1615                                        ni->ni_interfaces[0]);
1616                                 return -EINVAL;
1617                         }
1618                 }
1619         }
1620         
1621         kibnal_data.kib_ni = ni;
1622         ni->ni_data = &kibnal_data;
1623         
1624         do_gettimeofday(&tv);
1625         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1626
1627         PORTAL_MODULE_USE;
1628
1629         rwlock_init(&kibnal_data.kib_global_lock);
1630
1631         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1632         LIBCFS_ALLOC (kibnal_data.kib_peers,
1633                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1634         if (kibnal_data.kib_peers == NULL) {
1635                 goto failed;
1636         }
1637         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1638                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1639
1640         spin_lock_init (&kibnal_data.kib_reaper_lock);
1641         INIT_LIST_HEAD (&kibnal_data.kib_reaper_conns);
1642         init_waitqueue_head (&kibnal_data.kib_reaper_waitq);
1643
1644         spin_lock_init (&kibnal_data.kib_connd_lock);
1645         INIT_LIST_HEAD (&kibnal_data.kib_connd_acceptq);
1646         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1647         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1648
1649         spin_lock_init (&kibnal_data.kib_sched_lock);
1650         INIT_LIST_HEAD (&kibnal_data.kib_sched_txq);
1651         INIT_LIST_HEAD (&kibnal_data.kib_sched_rxq);
1652         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1653
1654         spin_lock_init (&kibnal_data.kib_tx_lock);
1655         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1656
1657         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1658                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1659         if (kibnal_data.kib_tx_descs == NULL) {
1660                 CERROR ("Can't allocate tx descs\n");
1661                 goto failed;
1662         }
1663
1664         /* lists/ptrs/locks initialised */
1665         kibnal_data.kib_init = IBNAL_INIT_DATA;
1666         /*****************************************************/
1667
1668         for (i = 0; i < IBNAL_N_SCHED; i++) {
1669                 rc = kibnal_thread_start (kibnal_scheduler,
1670                                           (void *)((unsigned long)i));
1671                 if (rc != 0) {
1672                         CERROR("Can't spawn openibnal scheduler[%d]: %d\n",
1673                                i, rc);
1674                         goto failed;
1675                 }
1676         }
1677
1678         /* must have at least 2 connds to remain responsive to svcqry while
1679          * connecting */
1680         if (*kibnal_tunables.kib_n_connd < 2)
1681                 *kibnal_tunables.kib_n_connd = 2;
1682
1683
1684         for (i = 0; i < *kibnal_tunables.kib_n_connd; i++) {
1685                 rc = kibnal_thread_start (kibnal_connd,
1686                                           (void *)((unsigned long)i));
1687                 if (rc != 0) {
1688                         CERROR("Can't spawn openibnal connd[%d]: %d\n",
1689                                i, rc);
1690                         goto failed;
1691                 }
1692         }
1693
1694         rc = kibnal_thread_start (kibnal_reaper, NULL);
1695         if (rc != 0) {
1696                 CERROR ("Can't spawn openibnal reaper: %d\n", rc);
1697                 goto failed;
1698         }
1699
1700         kibnal_data.kib_device = ib_device_get_by_index(kibnal_data.kib_hca_idx);
1701         if (kibnal_data.kib_device == NULL) {
1702                 CERROR ("Can't open ib device %d\n",
1703                         kibnal_data.kib_hca_idx);
1704                 goto failed;
1705         }
1706         
1707         rc = ib_device_properties_get(kibnal_data.kib_device,
1708                                       &kibnal_data.kib_device_props);
1709         if (rc != 0) {
1710                 CERROR ("Can't get device props: %d\n", rc);
1711                 goto failed;
1712         }
1713
1714         CDEBUG(D_NET, "Max Initiator: %d Max Responder %d\n", 
1715                kibnal_data.kib_device_props.max_initiator_per_qp,
1716                kibnal_data.kib_device_props.max_responder_per_qp);
1717
1718         if (kibnal_data.kib_port != 0) {
1719                 rc = ib_port_properties_get(kibnal_data.kib_device, 
1720                                             kibnal_data.kib_port,
1721                                             &kibnal_data.kib_port_props);
1722                 if (rc != 0) {
1723                         CERROR("Error %d open port %d on HCA %d\n", rc,
1724                                kibnal_data.kib_port,
1725                                kibnal_data.kib_hca_idx);
1726                         goto failed;
1727                 }
1728         } else {
1729                 for (i = 1; i <= 2; i++) {
1730                         rc = ib_port_properties_get(kibnal_data.kib_device, i,
1731                                                     &kibnal_data.kib_port_props);
1732                         if (rc == 0) {
1733                                 kibnal_data.kib_port = i;
1734                                 break;
1735                         }
1736                 }
1737                 if (kibnal_data.kib_port == 0) {
1738                         CERROR ("Can't find a port\n");
1739                         goto failed;
1740                 }
1741         }
1742
1743         i = kibnal_get_ipoibidx();
1744         if (i < 0)
1745                 goto failed;
1746         
1747         snprintf(ipif_name, sizeof(ipif_name), "%s%d",
1748                  *kibnal_tunables.kib_ipif_basename, i);
1749         if (strlen(ipif_name) == sizeof(ipif_name - 1)) {
1750                 CERROR("IPoIB interface name %s truncated\n", ipif_name);
1751                 return -EINVAL;
1752         }
1753         
1754         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1755         if (rc != 0) {
1756                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1757                 goto failed;
1758         }
1759         
1760         if (!up) {
1761                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1762                 goto failed;
1763         }
1764         
1765         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1766
1767         rc = ib_pd_create(kibnal_data.kib_device,
1768                           NULL, &kibnal_data.kib_pd);
1769         if (rc != 0) {
1770                 CERROR ("Can't create PD: %d\n", rc);
1771                 goto failed;
1772         }
1773         
1774         /* flag PD initialised */
1775         kibnal_data.kib_init = IBNAL_INIT_PD;
1776         /*****************************************************/
1777 #if IBNAL_FMR
1778         {
1779                 const int pool_size = *kibnal_tunables.kib_ntx;
1780                 struct ib_fmr_pool_param params = {
1781                         .max_pages_per_fmr = LNET_MAX_PAYLOAD/PAGE_SIZE,
1782                         .access            = (IB_ACCESS_LOCAL_WRITE |
1783                                               IB_ACCESS_REMOTE_WRITE |
1784                                               IB_ACCESS_REMOTE_READ),
1785                         .pool_size         = pool_size,
1786                         .dirty_watermark   = (pool_size * 3)/4,
1787                         .flush_function    = NULL,
1788                         .flush_arg         = NULL,
1789                         .cache             = 1,
1790                 };
1791                 rc = ib_fmr_pool_create(kibnal_data.kib_pd, &params,
1792                                         &kibnal_data.kib_fmr_pool);
1793                 if (rc != 0) {
1794                         CERROR ("Can't create FMR pool size %d: %d\n", 
1795                                 pool_size, rc);
1796                         goto failed;
1797                 }
1798         }
1799
1800         /* flag FMR pool initialised */
1801         kibnal_data.kib_init = IBNAL_INIT_FMR;
1802 #endif
1803         /*****************************************************/
1804
1805         rc = kibnal_setup_tx_descs();
1806         if (rc != 0) {
1807                 CERROR ("Can't register tx descs: %d\n", rc);
1808                 goto failed;
1809         }
1810         
1811         /* flag TX descs initialised */
1812         kibnal_data.kib_init = IBNAL_INIT_TXD;
1813         /*****************************************************/
1814         
1815         {
1816                 struct ib_cq_callback callback = {
1817                         .context        = IBNAL_CALLBACK_CTXT,
1818                         .policy         = IB_CQ_PROVIDER_REARM,
1819                         .function       = {
1820                                 .entry  = kibnal_callback,
1821                         },
1822                         .arg            = NULL,
1823                 };
1824                 int  nentries = IBNAL_CQ_ENTRIES();
1825                 
1826                 rc = ib_cq_create (kibnal_data.kib_device, 
1827                                    &nentries, &callback, NULL,
1828                                    &kibnal_data.kib_cq);
1829                 if (rc != 0) {
1830                         CERROR ("Can't create CQ: %d\n", rc);
1831                         goto failed;
1832                 }
1833
1834                 /* I only want solicited events */
1835                 rc = ib_cq_request_notification(kibnal_data.kib_cq, 1);
1836                 LASSERT (rc == 0);
1837         }
1838
1839         /* flag CQ initialised */
1840         kibnal_data.kib_init = IBNAL_INIT_CQ;
1841         /*****************************************************/
1842
1843         rc = kibnal_start_ib_listener();
1844         if (rc != 0)
1845                 goto failed;
1846         
1847         /* flag everything initialised */
1848         kibnal_data.kib_init = IBNAL_INIT_ALL;
1849         /*****************************************************/
1850
1851         return 0;
1852
1853  failed:
1854         kibnal_shutdown(ni);    
1855         return -ENETDOWN;
1856 }
1857
1858 void __exit
1859 kibnal_module_fini (void)
1860 {
1861         lnet_unregister_lnd(&the_kiblnd);
1862         kibnal_tunables_fini();
1863 }
1864
1865 int __init
1866 kibnal_module_init (void)
1867 {
1868         int    rc;
1869
1870         rc = kibnal_tunables_init();
1871         if (rc != 0)
1872                 return rc;
1873         
1874         lnet_register_lnd(&the_kiblnd);
1875
1876         return (0);
1877 }
1878
1879 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1880 #ifdef USING_TSAPI
1881 MODULE_DESCRIPTION("Kernel Cisco IB LND v1.00");
1882 #else
1883 MODULE_DESCRIPTION("Kernel OpenIB(gen1) LND v1.00");
1884 #endif
1885 MODULE_LICENSE("GPL");
1886
1887 module_init(kibnal_module_init);
1888 module_exit(kibnal_module_fini);
1889