Whamcloud - gitweb
b=10778,i=eeb:
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openiblnd.h"
25
26 lnd_t the_kiblnd = {
27 #ifdef USING_TSAPI
28         .lnd_type       = CIBLND,
29 #else
30         .lnd_type       = OPENIBLND,
31 #endif
32         .lnd_startup    = kibnal_startup,
33         .lnd_shutdown   = kibnal_shutdown,
34         .lnd_ctl        = kibnal_ctl,
35         .lnd_send       = kibnal_send,
36         .lnd_recv       = kibnal_recv,
37         .lnd_eager_recv = kibnal_eager_recv,
38         .lnd_accept     = kibnal_accept,
39 };
40
41 kib_data_t              kibnal_data;
42
43 __u32 
44 kibnal_cksum (void *ptr, int nob)
45 {
46         char  *c  = ptr;
47         __u32  sum = 0;
48
49         while (nob-- > 0)
50                 sum = ((sum << 1) | (sum >> 31)) + *c++;
51
52         /* ensure I don't return 0 (== no checksum) */
53         return (sum == 0) ? 1 : sum;
54 }
55
56 void
57 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
58 {
59         msg->ibm_type = type;
60         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
61 }
62
63 void
64 kibnal_pack_msg(kib_msg_t *msg, int version, int credits, 
65                 lnet_nid_t dstnid, __u64 dststamp)
66 {
67         /* CAVEAT EMPTOR! all message fields not set here should have been
68          * initialised previously. */
69         msg->ibm_magic    = IBNAL_MSG_MAGIC;
70         msg->ibm_version  = version;
71         /*   ibm_type */
72         msg->ibm_credits  = credits;
73         /*   ibm_nob */
74         msg->ibm_cksum    = 0;
75         msg->ibm_srcnid   = lnet_ptlcompat_srcnid(kibnal_data.kib_ni->ni_nid,
76                                                   dstnid);
77         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
78         msg->ibm_dstnid   = dstnid;
79         msg->ibm_dststamp = dststamp;
80
81         if (*kibnal_tunables.kib_cksum) {
82                 /* NB ibm_cksum zero while computing cksum */
83                 msg->ibm_cksum    = kibnal_cksum(msg, msg->ibm_nob);
84         }
85 }
86
87 int
88 kibnal_unpack_msg(kib_msg_t *msg, int expected_version, int nob)
89 {
90         const int hdr_size = offsetof(kib_msg_t, ibm_u);
91         __u32     msg_cksum;
92         int       msg_version;
93         int       flip;
94         int       msg_nob;
95
96         if (nob < 6) {
97                 CERROR("Short message: %d\n", nob);
98                 return -EPROTO;
99         }
100
101         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
102                 flip = 0;
103         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
104                 flip = 1;
105         } else {
106                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
107                 return -EPROTO;
108         }
109
110         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
111         if ((expected_version == 0) ?
112             (msg_version != IBNAL_MSG_VERSION &&
113              msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) :
114             (msg_version != expected_version)) {
115                 CERROR("Bad version: %x\n", msg_version);
116                 return -EPROTO;
117         }
118
119         if (nob < hdr_size) {
120                 CERROR("Short message: %d\n", nob);
121                 return -EPROTO;
122         }
123
124         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
125         if (msg_nob > nob) {
126                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
127                 return -EPROTO;
128         }
129
130         /* checksum must be computed with ibm_cksum zero and BEFORE anything
131          * gets flipped */
132         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
133         msg->ibm_cksum = 0;
134         if (msg_cksum != 0 &&
135             msg_cksum != kibnal_cksum(msg, msg_nob)) {
136                 CERROR("Bad checksum\n");
137                 return -EPROTO;
138         }
139         msg->ibm_cksum = msg_cksum;
140         
141         if (flip) {
142                 /* leave magic unflipped as a clue to peer endianness */
143                 msg->ibm_version = msg_version;
144                 LASSERT (sizeof(msg->ibm_type) == 1);
145                 LASSERT (sizeof(msg->ibm_credits) == 1);
146                 msg->ibm_nob = msg_nob;
147                 __swab64s(&msg->ibm_srcnid);
148                 __swab64s(&msg->ibm_srcstamp);
149                 __swab64s(&msg->ibm_dstnid);
150                 __swab64s(&msg->ibm_dststamp);
151         }
152         
153         if (msg->ibm_srcnid == LNET_NID_ANY) {
154                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
155                 return -EPROTO;
156         }
157
158         switch (msg->ibm_type) {
159         default:
160                 CERROR("Unknown message type %x\n", msg->ibm_type);
161                 return -EPROTO;
162                 
163         case IBNAL_MSG_SVCQRY:
164         case IBNAL_MSG_NOOP:
165                 break;
166
167         case IBNAL_MSG_SVCRSP:
168                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.svcrsp)) {
169                         CERROR("Short SVCRSP: %d(%d)\n", msg_nob,
170                                (int)(hdr_size + sizeof(msg->ibm_u.svcrsp)));
171                         return -EPROTO;
172                 }
173                 if (flip) {
174                         __swab64s(&msg->ibm_u.svcrsp.ibsr_svc_id);
175                         __swab16s(&msg->ibm_u.svcrsp.ibsr_svc_pkey);
176                 }
177                 break;
178
179         case IBNAL_MSG_CONNREQ:
180         case IBNAL_MSG_CONNACK:
181                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
182                         CERROR("Short CONNREQ: %d(%d)\n", msg_nob,
183                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
184                         return -EPROTO;
185                 }
186                 if (flip)
187                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
188                 break;
189
190         case IBNAL_MSG_IMMEDIATE:
191                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
192                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
193                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
194                         return -EPROTO;
195                 }
196                 break;
197
198         case IBNAL_MSG_PUT_RDMA:
199         case IBNAL_MSG_GET_RDMA:
200                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.rdma)) {
201                         CERROR("Short RDMA req: %d(%d)\n", msg_nob,
202                                (int)(hdr_size + sizeof(msg->ibm_u.rdma)));
203                         return -EPROTO;
204                 }
205                 if (flip) {
206                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_key);
207                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_nob);
208                         __swab64s(&msg->ibm_u.rdma.ibrm_desc.rd_addr);
209                 }
210                 break;
211
212         case IBNAL_MSG_PUT_DONE:
213         case IBNAL_MSG_GET_DONE:
214                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
215                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
216                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
217                         return -EPROTO;
218                 }
219                 if (flip)
220                         __swab32s(&msg->ibm_u.completion.ibcm_status);
221                 break;
222         }
223         return 0;
224 }
225
226 int
227 kibnal_make_svcqry (kib_conn_t *conn) 
228 {
229         kib_peer_t    *peer = conn->ibc_peer;
230         int            version = IBNAL_MSG_VERSION;
231         int            msg_version;
232         kib_msg_t     *msg;
233         struct socket *sock;
234         int            rc;
235         int            nob;
236
237         LASSERT (conn->ibc_connreq != NULL);
238         msg = &conn->ibc_connreq->cr_msg;
239
240  again:
241         kibnal_init_msg(msg, IBNAL_MSG_SVCQRY, 0);
242         kibnal_pack_msg(msg, version, 0, peer->ibp_nid, 0);
243
244         rc = lnet_connect(&sock, peer->ibp_nid,
245                           0, peer->ibp_ip, peer->ibp_port);
246         if (rc != 0)
247                 return -ECONNABORTED;
248         
249         rc = libcfs_sock_write(sock, msg, msg->ibm_nob,
250                                lnet_acceptor_timeout());
251         if (rc != 0) {
252                 CERROR("Error %d sending svcqry to %s at %u.%u.%u.%u/%d\n", 
253                        rc, libcfs_nid2str(peer->ibp_nid), 
254                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
255                 goto out;
256         }
257
258         /* The first 6 bytes are invariably MAGIC + proto version */
259         rc = libcfs_sock_read(sock, msg, 6, *kibnal_tunables.kib_timeout);
260         if (rc != 0) {
261                 CERROR("Error %d receiving svcrsp from %s at %u.%u.%u.%u/%d\n", 
262                        rc, libcfs_nid2str(peer->ibp_nid), 
263                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
264                 goto out;
265         }
266
267         if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
268             msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
269                 CERROR("Bad magic: %08x from %s at %u.%u.%u.%u/%d\n",
270                        msg->ibm_magic, libcfs_nid2str(peer->ibp_nid),
271                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
272                 rc = -EPROTO;
273                 goto out;
274         }
275
276         msg_version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ? 
277                       msg->ibm_version : __swab16(msg->ibm_version);
278         if (msg_version != version) {
279                 if (version == IBNAL_MSG_VERSION) {
280                         /* retry with previous version */
281                         libcfs_sock_release(sock);
282                         version = IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD;
283                         goto again;
284                 }
285                 
286                 CERROR("Bad version %x from %s at %u.%u.%u.%u/%d\n",
287                        msg_version, libcfs_nid2str(peer->ibp_nid),
288                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
289                 rc = -EPROTO;
290                 goto out;
291         }
292
293         /* Read in the rest of the message now we know the expected format */
294         nob = offsetof(kib_msg_t, ibm_u) + sizeof(kib_svcrsp_t);
295         rc = libcfs_sock_read(sock, ((char *)msg) + 6, nob - 6,
296                               *kibnal_tunables.kib_timeout);
297         if (rc != 0) {
298                 CERROR("Error %d receiving svcrsp from %s at %u.%u.%u.%u/%d\n", 
299                        rc, libcfs_nid2str(peer->ibp_nid), 
300                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
301                 goto out;
302         }
303
304         rc = kibnal_unpack_msg(msg, version, nob);
305         if (rc != 0) {
306                 CERROR("Error %d unpacking svcrsp from %s at %u.%u.%u.%u/%d\n", 
307                        rc, libcfs_nid2str(peer->ibp_nid), 
308                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
309                 goto out;
310         }
311                        
312         if (msg->ibm_type != IBNAL_MSG_SVCRSP) {
313                 CERROR("Unexpected response type %d from %s at %u.%u.%u.%u/%d\n", 
314                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid), 
315                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
316                 rc = -EPROTO;
317                 goto out;
318         }
319         
320         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
321                                      msg->ibm_dstnid) ||
322             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
323                 CERROR("Unexpected dst NID/stamp %s/"LPX64" from "
324                        "%s at %u.%u.%u.%u/%d\n", 
325                        libcfs_nid2str(msg->ibm_dstnid), msg->ibm_dststamp,
326                        libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip), 
327                        peer->ibp_port);
328                 rc = -EPROTO;
329                 goto out;
330         }
331
332         if (!lnet_ptlcompat_matchnid(peer->ibp_nid, msg->ibm_srcnid)) {
333                 CERROR("Unexpected src NID %s from %s at %u.%u.%u.%u/%d\n", 
334                        libcfs_nid2str(msg->ibm_srcnid),
335                        libcfs_nid2str(peer->ibp_nid), 
336                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
337                 rc = -EPROTO;
338                 goto out;
339         }
340
341         conn->ibc_incarnation = msg->ibm_srcstamp;
342         conn->ibc_connreq->cr_svcrsp = msg->ibm_u.svcrsp;
343         conn->ibc_version = version;
344         
345  out:
346         libcfs_sock_release(sock);
347         return rc;
348 }
349
350 void
351 kibnal_handle_svcqry (struct socket *sock)
352 {
353         __u32                peer_ip;
354         unsigned int         peer_port;
355         kib_msg_t           *msg;
356         __u64                srcnid;
357         __u64                srcstamp;
358         int                  version;
359         int                  reject = 0;
360         int                  rc;
361
362         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
363         if (rc != 0) {
364                 CERROR("Can't get peer's IP: %d\n", rc);
365                 return;
366         }
367
368         LIBCFS_ALLOC(msg, sizeof(*msg));
369         if (msg == NULL) {
370                 CERROR("Can't allocate msgs for %u.%u.%u.%u/%d\n",
371                        HIPQUAD(peer_ip), peer_port);
372                 return;
373         }
374         
375         rc = libcfs_sock_read(sock, &msg->ibm_magic, sizeof(msg->ibm_magic),
376                               lnet_acceptor_timeout());
377         if (rc != 0) {
378                 CERROR("Error %d receiving svcqry(1) from %u.%u.%u.%u/%d\n",
379                        rc, HIPQUAD(peer_ip), peer_port);
380                 goto out;
381         }
382
383         if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
384             msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
385                 /* Unexpected magic! */
386                 if (the_lnet.ln_ptlcompat == 0) {
387                         if (msg->ibm_magic == LNET_PROTO_MAGIC ||
388                             msg->ibm_magic == __swab32(LNET_PROTO_MAGIC)) {
389                                 /* future protocol version compatibility!
390                                  * When LNET unifies protocols over all LNDs,
391                                  * the first thing sent will be a version
392                                  * query.  I send back a reply in my current
393                                  * protocol to tell her I'm "old" */
394                                 kibnal_init_msg(msg, 0, 0);
395                                 kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, 
396                                                 LNET_NID_ANY, 0);
397                                 reject = 1;
398                                 goto reply;
399                         }
400
401                         CERROR ("Bad magic(1) %#08x (%#08x expected) from "
402                                 "%u.%u.%u.%u/%d\n", msg->ibm_magic,
403                                 IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port);
404                         goto out;
405                 }
406
407                 /* When portals compatibility is set, I may be passed a new
408                  * connection "blindly" by the acceptor, and I have to
409                  * determine if my peer has sent an acceptor connection request
410                  * or not. */
411                 rc = lnet_accept(kibnal_data.kib_ni, sock, msg->ibm_magic);
412                 if (rc != 0)
413                         goto out;
414
415                 /* It was an acceptor connection request!
416                  * Now I should see my magic... */
417                 rc = libcfs_sock_read(sock, &msg->ibm_magic,
418                                       sizeof(msg->ibm_magic),
419                                       lnet_acceptor_timeout());
420                 if (rc != 0) {
421                         CERROR("Error %d receiving svcqry(2) from %u.%u.%u.%u/%d\n",
422                                rc, HIPQUAD(peer_ip), peer_port);
423                         goto out;
424                 }
425
426                 if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
427                     msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
428                         CERROR ("Bad magic(2) %#08x (%#08x expected) from "
429                                 "%u.%u.%u.%u/%d\n", msg->ibm_magic,
430                                 IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port);
431                         goto out;
432                 }
433         }
434
435         /* Now check version */
436
437         rc = libcfs_sock_read(sock, &msg->ibm_version, sizeof(msg->ibm_version),
438                               lnet_acceptor_timeout());
439         if (rc != 0) {
440                 CERROR("Error %d receiving svcqry(3) from %u.%u.%u.%u/%d\n",
441                        rc, HIPQUAD(peer_ip), peer_port);
442                 goto out;
443         }
444
445         version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ?
446                   msg->ibm_version : __swab32(msg->ibm_version);
447         /* Peer is a different protocol version: reply in my current protocol
448          * to tell her I'm "old" */
449         if (version != IBNAL_MSG_VERSION &&
450             version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
451                 kibnal_init_msg(msg, 0, 0);
452                 kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, LNET_NID_ANY, 0);
453                 reject = 1;
454                 goto reply;
455         }
456         
457         /* Now read in all the rest */
458         rc = libcfs_sock_read(sock, &msg->ibm_type,
459                               offsetof(kib_msg_t, ibm_u) -
460                               offsetof(kib_msg_t, ibm_type),
461                               lnet_acceptor_timeout());
462         if (rc != 0) {
463                 CERROR("Error %d receiving svcqry(4) from %u.%u.%u.%u/%d\n",
464                        rc, HIPQUAD(peer_ip), peer_port);
465                 goto out;
466         }
467         
468         rc = kibnal_unpack_msg(msg, version, offsetof(kib_msg_t, ibm_u));
469         if (rc != 0) {
470                 CERROR("Error %d unpacking svcqry from %u.%u.%u.%u/%d\n",
471                        rc, HIPQUAD(peer_ip), peer_port);
472                 goto out;
473         }
474         
475         if (msg->ibm_type != IBNAL_MSG_SVCQRY) {
476                 CERROR("Unexpected message %d from %u.%u.%u.%u/%d\n",
477                        msg->ibm_type, HIPQUAD(peer_ip), peer_port);
478                 goto out;
479         }
480         
481         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
482                                      msg->ibm_dstnid)) {
483                 CERROR("Unexpected dstnid %s: expected %s from %u.%u.%u.%u/%d\n",
484                        libcfs_nid2str(msg->ibm_dstnid),
485                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid),
486                        HIPQUAD(peer_ip), peer_port);
487                 goto out;
488         }
489
490         srcnid = msg->ibm_srcnid;
491         srcstamp = msg->ibm_srcstamp;
492         
493         kibnal_init_msg(msg, IBNAL_MSG_SVCRSP, sizeof(msg->ibm_u.svcrsp));
494
495         msg->ibm_u.svcrsp.ibsr_svc_id = kibnal_data.kib_svc_id;
496         memcpy(msg->ibm_u.svcrsp.ibsr_svc_gid, kibnal_data.kib_svc_gid,
497                sizeof(kibnal_data.kib_svc_gid));
498         msg->ibm_u.svcrsp.ibsr_svc_pkey = kibnal_data.kib_svc_pkey;
499
500         kibnal_pack_msg(msg, version, 0, srcnid, srcstamp);
501
502  reply:
503         rc = libcfs_sock_write (sock, msg, msg->ibm_nob,
504                                 lnet_acceptor_timeout());
505         if (!reject && rc != 0) {
506                 /* Only complain if we're not rejecting */
507                 CERROR("Error %d replying to svcqry from %u.%u.%u.%u/%d\n",
508                        rc, HIPQUAD(peer_ip), peer_port);
509                 goto out;
510         }
511         
512  out:
513         LIBCFS_FREE(msg, sizeof(*msg));
514 }
515
516 void
517 kibnal_free_acceptsock (kib_acceptsock_t *as)
518 {
519         libcfs_sock_release(as->ibas_sock);
520         LIBCFS_FREE(as, sizeof(*as));
521 }
522
523 int
524 kibnal_accept(lnet_ni_t *ni, struct socket *sock)
525 {
526         kib_acceptsock_t  *as;
527         unsigned long      flags;
528
529         LIBCFS_ALLOC(as, sizeof(*as));
530         if (as == NULL) {
531                 CERROR("Out of Memory\n");
532                 return -ENOMEM;
533         }
534
535         as->ibas_sock = sock;
536                 
537         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
538                 
539         list_add_tail(&as->ibas_list, &kibnal_data.kib_connd_acceptq);
540         wake_up(&kibnal_data.kib_connd_waitq);
541
542         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
543         return 0;
544 }
545
546 int
547 kibnal_start_ib_listener (void) 
548 {
549         int    rc;
550
551         LASSERT (kibnal_data.kib_listen_handle == NULL);
552
553         kibnal_data.kib_svc_id = ib_cm_service_assign();
554         CDEBUG(D_NET, "svc id "LPX64"\n", kibnal_data.kib_svc_id);
555
556         rc = ib_cached_gid_get(kibnal_data.kib_device,
557                                kibnal_data.kib_port, 0,
558                                kibnal_data.kib_svc_gid);
559         if (rc != 0) {
560                 CERROR("Can't get port %d GID: %d\n",
561                        kibnal_data.kib_port, rc);
562                 return rc;
563         }
564         
565         rc = ib_cached_pkey_get(kibnal_data.kib_device,
566                                 kibnal_data.kib_port, 0,
567                                 &kibnal_data.kib_svc_pkey);
568         if (rc != 0) {
569                 CERROR ("Can't get port %d PKEY: %d\n",
570                         kibnal_data.kib_port, rc);
571                 return rc;
572         }
573
574         rc = ib_cm_listen(kibnal_data.kib_svc_id,
575                           TS_IB_CM_SERVICE_EXACT_MASK,
576                           kibnal_passive_conn_callback, NULL,
577                           &kibnal_data.kib_listen_handle);
578         if (rc != 0) {
579                 kibnal_data.kib_listen_handle = NULL;
580                 CERROR ("Can't create IB listener: %d\n", rc);
581                 return rc;
582         }
583         
584         LASSERT (kibnal_data.kib_listen_handle != NULL);
585         return 0;
586 }
587
588 void
589 kibnal_stop_ib_listener (void) 
590 {
591         int    rc;
592         
593         LASSERT (kibnal_data.kib_listen_handle != NULL);
594
595         rc = ib_cm_listen_stop (kibnal_data.kib_listen_handle);
596         if (rc != 0)
597                 CERROR("Error stopping IB listener: %d\n", rc);
598                 
599         kibnal_data.kib_listen_handle = NULL;
600 }
601
602 int
603 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
604 {
605         kib_peer_t     *peer;
606         unsigned long   flags;
607         int             rc;
608
609         LASSERT (nid != LNET_NID_ANY);
610
611         LIBCFS_ALLOC(peer, sizeof (*peer));
612         if (peer == NULL) {
613                 CERROR("Cannot allocate peer\n");
614                 return -ENOMEM;
615         }
616
617         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
618
619         peer->ibp_nid = nid;
620         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
621
622         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
623         INIT_LIST_HEAD (&peer->ibp_conns);
624         INIT_LIST_HEAD (&peer->ibp_tx_queue);
625         INIT_LIST_HEAD (&peer->ibp_connd_list); /* not queued for connecting */
626
627         peer->ibp_error = 0;
628         peer->ibp_last_alive = cfs_time_current();
629         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
630
631         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
632
633         if (atomic_read(&kibnal_data.kib_npeers) >=
634             *kibnal_tunables.kib_concurrent_peers) {
635                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
636         } else if (kibnal_data.kib_nonewpeers) {
637                 rc = -ESHUTDOWN;        /* shutdown has started */
638         } else {
639                 rc = 0;
640                 /* npeers only grows with kib_global_lock held */
641                 atomic_inc(&kibnal_data.kib_npeers);
642         }
643         
644         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
645
646         if (rc != 0) {
647                 CERROR("Can't create peer: %s\n", 
648                        (rc == -ESHUTDOWN) ? "shutting down" : 
649                        "too many peers");
650                 LIBCFS_FREE(peer, sizeof(*peer));
651         } else {
652                 *peerp = peer;
653         }
654         
655         return rc;
656 }
657
658 void
659 kibnal_destroy_peer (kib_peer_t *peer)
660 {
661         CDEBUG (D_NET, "peer %s %p deleted\n", 
662                 libcfs_nid2str(peer->ibp_nid), peer);
663
664         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
665         LASSERT (peer->ibp_persistence == 0);
666         LASSERT (!kibnal_peer_active(peer));
667         LASSERT (peer->ibp_connecting == 0);
668         LASSERT (peer->ibp_accepting == 0);
669         LASSERT (list_empty (&peer->ibp_connd_list));
670         LASSERT (list_empty (&peer->ibp_conns));
671         LASSERT (list_empty (&peer->ibp_tx_queue));
672
673         LIBCFS_FREE (peer, sizeof (*peer));
674
675         /* NB a peer's connections keep a reference on their peer until
676          * they are destroyed, so we can be assured that _all_ state to do
677          * with this peer has been cleaned up when its refcount drops to
678          * zero. */
679         atomic_dec(&kibnal_data.kib_npeers);
680 }
681
682 kib_peer_t *
683 kibnal_find_peer_locked (lnet_nid_t nid)
684 {
685         struct list_head *peer_list = kibnal_nid2peerlist (nid);
686         struct list_head *tmp;
687         kib_peer_t       *peer;
688
689         list_for_each (tmp, peer_list) {
690
691                 peer = list_entry (tmp, kib_peer_t, ibp_list);
692
693                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
694                          peer->ibp_connecting != 0 || /* creating conns */
695                          peer->ibp_accepting != 0 ||
696                          !list_empty (&peer->ibp_conns));  /* active conn */
697
698                 if (peer->ibp_nid != nid)
699                         continue;
700
701                 return (peer);
702         }
703         return (NULL);
704 }
705
706 kib_peer_t *
707 kibnal_get_peer (lnet_nid_t nid)
708 {
709         kib_peer_t     *peer;
710         unsigned long   flags;
711
712         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
713         peer = kibnal_find_peer_locked (nid);
714         if (peer != NULL)                       /* +1 ref for caller? */
715                 kibnal_peer_addref(peer);
716         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
717
718         return (peer);
719 }
720
721 void
722 kibnal_unlink_peer_locked (kib_peer_t *peer)
723 {
724         LASSERT (peer->ibp_persistence == 0);
725         LASSERT (list_empty(&peer->ibp_conns));
726
727         LASSERT (kibnal_peer_active(peer));
728         list_del_init (&peer->ibp_list);
729         /* lose peerlist's ref */
730         kibnal_peer_decref(peer);
731 }
732
733 int
734 kibnal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,
735                       int *persistencep)
736 {
737         kib_peer_t        *peer;
738         struct list_head  *ptmp;
739         unsigned long      flags;
740         int                i;
741
742         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
743
744         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
745
746                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
747                         
748                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
749                         LASSERT (peer->ibp_persistence != 0 ||
750                                  peer->ibp_connecting != 0 ||
751                                  peer->ibp_accepting != 0 ||
752                                  !list_empty (&peer->ibp_conns));
753
754                         if (index-- > 0)
755                                 continue;
756
757                         *nidp = peer->ibp_nid;
758                         *ipp = peer->ibp_ip;
759                         *portp = peer->ibp_port;
760                         *persistencep = peer->ibp_persistence;
761                         
762                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
763                                                flags);
764                         return (0);
765                 }
766         }
767
768         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
769         return (-ENOENT);
770 }
771
772 int
773 kibnal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port)
774 {
775         unsigned long      flags;
776         kib_peer_t        *peer;
777         kib_peer_t        *peer2;
778         int                rc;
779         
780         if (nid == LNET_NID_ANY)
781                 return (-EINVAL);
782
783         rc = kibnal_create_peer (&peer, nid);
784         if (rc != 0)
785                 return rc;
786
787         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
788
789         /* I'm always called with a reference on kibnal_data.kib_ni
790          * so shutdown can't have started */
791         LASSERT (kibnal_data.kib_nonewpeers == 0);
792
793         peer2 = kibnal_find_peer_locked (nid);
794         if (peer2 != NULL) {
795                 kibnal_peer_decref(peer);
796                 peer = peer2;
797         } else {
798                 /* peer table takes existing ref on peer */
799                 list_add_tail (&peer->ibp_list,
800                                kibnal_nid2peerlist (nid));
801         }
802
803         peer->ibp_ip = ip;
804         peer->ibp_port = port;
805         peer->ibp_persistence++;
806         
807         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
808         return (0);
809 }
810
811 void
812 kibnal_del_peer_locked (kib_peer_t *peer)
813 {
814         struct list_head *ctmp;
815         struct list_head *cnxt;
816         kib_conn_t       *conn;
817
818         peer->ibp_persistence = 0;
819
820         if (list_empty(&peer->ibp_conns)) {
821                 kibnal_unlink_peer_locked(peer);
822         } else {
823                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
824                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
825
826                         kibnal_close_conn_locked (conn, 0);
827                 }
828                 /* NB peer is no longer persistent; closing its last conn
829                  * unlinked it. */
830         }
831         /* NB peer now unlinked; might even be freed if the peer table had the
832          * last ref on it. */
833 }
834
835 int
836 kibnal_del_peer (lnet_nid_t nid)
837 {
838         unsigned long      flags;
839         CFS_LIST_HEAD     (zombies);
840         struct list_head  *ptmp;
841         struct list_head  *pnxt;
842         kib_peer_t        *peer;
843         int                lo;
844         int                hi;
845         int                i;
846         int                rc = -ENOENT;
847
848         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
849
850         if (nid != LNET_NID_ANY)
851                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
852         else {
853                 lo = 0;
854                 hi = kibnal_data.kib_peer_hash_size - 1;
855         }
856
857         for (i = lo; i <= hi; i++) {
858                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
859                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
860                         LASSERT (peer->ibp_persistence != 0 ||
861                                  peer->ibp_connecting != 0 ||
862                                  peer->ibp_accepting != 0 ||
863                                  !list_empty (&peer->ibp_conns));
864
865                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
866                                 continue;
867
868                         if (!list_empty(&peer->ibp_tx_queue)) {
869                                 LASSERT (list_empty(&peer->ibp_conns));
870
871                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
872                         }
873
874                         kibnal_del_peer_locked (peer);
875                         rc = 0;         /* matched something */
876                 }
877         }
878
879         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
880
881         kibnal_txlist_done(&zombies, -EIO);
882
883         return (rc);
884 }
885
886 kib_conn_t *
887 kibnal_get_conn_by_idx (int index)
888 {
889         kib_peer_t        *peer;
890         struct list_head  *ptmp;
891         kib_conn_t        *conn;
892         struct list_head  *ctmp;
893         unsigned long      flags;
894         int                i;
895
896         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
897
898         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
899                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
900
901                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
902                         LASSERT (peer->ibp_persistence > 0 ||
903                                  peer->ibp_connecting != 0 ||
904                                  peer->ibp_accepting != 0 ||
905                                  !list_empty (&peer->ibp_conns));
906
907                         list_for_each (ctmp, &peer->ibp_conns) {
908                                 if (index-- > 0)
909                                         continue;
910
911                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
912                                 kibnal_conn_addref(conn);
913                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
914                                                        flags);
915                                 return (conn);
916                         }
917                 }
918         }
919
920         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
921         return (NULL);
922 }
923
924 kib_conn_t *
925 kibnal_create_conn (void)
926 {
927         kib_conn_t  *conn;
928         int          i;
929         __u64        vaddr = 0;
930         __u64        vaddr_base;
931         int          page_offset;
932         int          ipage;
933         int          rc;
934         union {
935                 struct ib_qp_create_param  qp_create;
936                 struct ib_qp_attribute     qp_attr;
937         } params;
938         
939         LIBCFS_ALLOC (conn, sizeof (*conn));
940         if (conn == NULL) {
941                 CERROR ("Can't allocate connection\n");
942                 return (NULL);
943         }
944
945         /* zero flags, NULL pointers etc... */
946         memset (conn, 0, sizeof (*conn));
947
948         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
949         INIT_LIST_HEAD (&conn->ibc_tx_queue);
950         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
951         INIT_LIST_HEAD (&conn->ibc_active_txs);
952         spin_lock_init (&conn->ibc_lock);
953         
954         atomic_inc (&kibnal_data.kib_nconns);
955         /* well not really, but I call destroy() on failure, which decrements */
956
957         LIBCFS_ALLOC (conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
958         if (conn->ibc_rxs == NULL)
959                 goto failed;
960         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
961
962         rc = kibnal_alloc_pages(&conn->ibc_rx_pages,
963                                 IBNAL_RX_MSG_PAGES,
964                                 IB_ACCESS_LOCAL_WRITE);
965         if (rc != 0)
966                 goto failed;
967
968         vaddr_base = vaddr = conn->ibc_rx_pages->ibp_vaddr;
969
970         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
971                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
972                 kib_rx_t   *rx = &conn->ibc_rxs[i];
973
974                 rx->rx_conn = conn;
975                 rx->rx_vaddr = vaddr;
976                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
977                 
978                 vaddr += IBNAL_MSG_SIZE;
979                 LASSERT (vaddr <= vaddr_base + IBNAL_RX_MSG_BYTES);
980                 
981                 page_offset += IBNAL_MSG_SIZE;
982                 LASSERT (page_offset <= PAGE_SIZE);
983
984                 if (page_offset == PAGE_SIZE) {
985                         page_offset = 0;
986                         ipage++;
987                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
988                 }
989         }
990
991         /* We can post up to IBLND_MSG_QUEUE_SIZE immediate/req messages and
992          * the same # of ack/nak/rdma+done messages */
993
994         params.qp_create = (struct ib_qp_create_param) {
995                 .limit = {
996                         .max_outstanding_send_request    = 3 * IBNAL_MSG_QUEUE_SIZE,
997                         .max_outstanding_receive_request = IBNAL_RX_MSGS,
998                         .max_send_gather_element         = 1,
999                         .max_receive_scatter_element     = 1,
1000                 },
1001                 .pd              = kibnal_data.kib_pd,
1002                 .send_queue      = kibnal_data.kib_cq,
1003                 .receive_queue   = kibnal_data.kib_cq,
1004                 .send_policy     = IB_WQ_SIGNAL_SELECTABLE,
1005                 .receive_policy  = IB_WQ_SIGNAL_SELECTABLE,
1006                 .rd_domain       = 0,
1007                 .transport       = IB_TRANSPORT_RC,
1008                 .device_specific = NULL,
1009         };
1010         
1011         rc = ib_qp_create (&params.qp_create, &conn->ibc_qp, &conn->ibc_qpn);
1012         if (rc != 0) {
1013                 CERROR ("Failed to create queue pair: %d\n", rc);
1014                 goto failed;
1015         }
1016         
1017         /* Mark QP created */
1018         conn->ibc_state = IBNAL_CONN_INIT_QP;
1019
1020         params.qp_attr = (struct ib_qp_attribute) {
1021                 .state             = IB_QP_STATE_INIT,
1022                 .port              = kibnal_data.kib_port,
1023                 .enable_rdma_read  = 1,
1024                 .enable_rdma_write = 1,
1025                 .valid_fields      = (IB_QP_ATTRIBUTE_STATE |
1026                                       IB_QP_ATTRIBUTE_PORT |
1027                                       IB_QP_ATTRIBUTE_PKEY_INDEX |
1028                                       IB_QP_ATTRIBUTE_RDMA_ATOMIC_ENABLE),
1029         };
1030         rc = ib_qp_modify(conn->ibc_qp, &params.qp_attr);
1031         if (rc != 0) {
1032                 CERROR ("Failed to modify queue pair: %d\n", rc);
1033                 goto failed;
1034         }
1035
1036         /* 1 ref for caller */
1037         atomic_set (&conn->ibc_refcount, 1);
1038         return (conn);
1039         
1040  failed:
1041         kibnal_destroy_conn (conn);
1042         return (NULL);
1043 }
1044
1045 void
1046 kibnal_destroy_conn (kib_conn_t *conn)
1047 {
1048         int    rc;
1049         
1050         CDEBUG (D_NET, "connection %p\n", conn);
1051
1052         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1053         LASSERT (list_empty(&conn->ibc_tx_queue));
1054         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1055         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1056         LASSERT (list_empty(&conn->ibc_active_txs));
1057         LASSERT (conn->ibc_nsends_posted == 0);
1058         LASSERT (conn->ibc_connreq == NULL);
1059
1060         switch (conn->ibc_state) {
1061         case IBNAL_CONN_ZOMBIE:
1062                 /* called after connection sequence initiated */
1063
1064         case IBNAL_CONN_INIT_QP:
1065                 rc = ib_qp_destroy(conn->ibc_qp);
1066                 if (rc != 0)
1067                         CERROR("Can't destroy QP: %d\n", rc);
1068                 /* fall through */
1069                 
1070         case IBNAL_CONN_INIT_NOTHING:
1071                 break;
1072
1073         default:
1074                 LASSERT (0);
1075         }
1076
1077         if (conn->ibc_rx_pages != NULL) 
1078                 kibnal_free_pages(conn->ibc_rx_pages);
1079         
1080         if (conn->ibc_rxs != NULL)
1081                 LIBCFS_FREE(conn->ibc_rxs, 
1082                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1083
1084         if (conn->ibc_peer != NULL)
1085                 kibnal_peer_decref(conn->ibc_peer);
1086
1087         LIBCFS_FREE(conn, sizeof (*conn));
1088
1089         atomic_dec(&kibnal_data.kib_nconns);
1090         
1091         if (atomic_read (&kibnal_data.kib_nconns) == 0 &&
1092             kibnal_data.kib_shutdown) {
1093                 /* I just nuked the last connection on shutdown; wake up
1094                  * everyone so they can exit. */
1095                 wake_up_all(&kibnal_data.kib_sched_waitq);
1096                 wake_up_all(&kibnal_data.kib_reaper_waitq);
1097         }
1098 }
1099
1100 int
1101 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1102 {
1103         kib_conn_t         *conn;
1104         struct list_head   *ctmp;
1105         struct list_head   *cnxt;
1106         int                 count = 0;
1107
1108         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1109                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1110
1111                 count++;
1112                 kibnal_close_conn_locked (conn, why);
1113         }
1114
1115         return (count);
1116 }
1117
1118 int
1119 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1120 {
1121         kib_conn_t         *conn;
1122         struct list_head   *ctmp;
1123         struct list_head   *cnxt;
1124         int                 count = 0;
1125
1126         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1127                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1128
1129                 if (conn->ibc_incarnation == incarnation)
1130                         continue;
1131
1132                 CDEBUG(D_NET, "Closing stale conn %p nid: %s"
1133                        " incarnation:"LPX64"("LPX64")\n", conn,
1134                        libcfs_nid2str(peer->ibp_nid), 
1135                        conn->ibc_incarnation, incarnation);
1136                 
1137                 count++;
1138                 kibnal_close_conn_locked (conn, -ESTALE);
1139         }
1140
1141         return (count);
1142 }
1143
1144 int
1145 kibnal_close_matching_conns (lnet_nid_t nid)
1146 {
1147         unsigned long       flags;
1148         kib_peer_t         *peer;
1149         struct list_head   *ptmp;
1150         struct list_head   *pnxt;
1151         int                 lo;
1152         int                 hi;
1153         int                 i;
1154         int                 count = 0;
1155
1156         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1157
1158         if (nid != LNET_NID_ANY)
1159                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1160         else {
1161                 lo = 0;
1162                 hi = kibnal_data.kib_peer_hash_size - 1;
1163         }
1164
1165         for (i = lo; i <= hi; i++) {
1166                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1167
1168                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1169                         LASSERT (peer->ibp_persistence != 0 ||
1170                                  peer->ibp_connecting != 0 ||
1171                                  peer->ibp_accepting != 0 ||
1172                                  !list_empty (&peer->ibp_conns));
1173
1174                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1175                                 continue;
1176
1177                         count += kibnal_close_peer_conns_locked (peer, 0);
1178                 }
1179         }
1180
1181         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1182
1183         /* wildcards always succeed */
1184         if (nid == LNET_NID_ANY)
1185                 return (0);
1186         
1187         return (count == 0 ? -ENOENT : 0);
1188 }
1189
1190 int
1191 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1192 {
1193         struct libcfs_ioctl_data *data = arg;
1194         int                       rc = -EINVAL;
1195
1196         LASSERT (ni == kibnal_data.kib_ni);
1197
1198         switch(cmd) {
1199         case IOC_LIBCFS_GET_PEER: {
1200                 lnet_nid_t   nid = 0;
1201                 __u32       ip = 0;
1202                 int         port = 0;
1203                 int         share_count = 0;
1204
1205                 rc = kibnal_get_peer_info(data->ioc_count,
1206                                           &nid, &ip, &port, &share_count);
1207                 data->ioc_nid    = nid;
1208                 data->ioc_count  = share_count;
1209                 data->ioc_u32[0] = ip;
1210                 data->ioc_u32[1] = port;
1211                 break;
1212         }
1213         case IOC_LIBCFS_ADD_PEER: {
1214                 rc = kibnal_add_persistent_peer (data->ioc_nid,
1215                                                  data->ioc_u32[0], /* IP */
1216                                                  data->ioc_u32[1]); /* port */
1217                 break;
1218         }
1219         case IOC_LIBCFS_DEL_PEER: {
1220                 rc = kibnal_del_peer (data->ioc_nid);
1221                 break;
1222         }
1223         case IOC_LIBCFS_GET_CONN: {
1224                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1225
1226                 if (conn == NULL)
1227                         rc = -ENOENT;
1228                 else {
1229                         rc = 0;
1230                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1231                         kibnal_conn_decref(conn);
1232                 }
1233                 break;
1234         }
1235         case IOC_LIBCFS_CLOSE_CONNECTION: {
1236                 rc = kibnal_close_matching_conns (data->ioc_nid);
1237                 break;
1238         }
1239         case IOC_LIBCFS_REGISTER_MYNID: {
1240                 /* Ignore if this is a noop */
1241                 if (data->ioc_nid == ni->ni_nid) {
1242                         rc = 0;
1243                 } else {
1244                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1245                                libcfs_nid2str(data->ioc_nid),
1246                                libcfs_nid2str(ni->ni_nid));
1247                         rc = -EINVAL;
1248                 }
1249                 break;
1250         }
1251         }
1252
1253         return rc;
1254 }
1255
1256 void
1257 kibnal_free_pages (kib_pages_t *p)
1258 {
1259         int     npages = p->ibp_npages;
1260         int     rc;
1261         int     i;
1262         
1263         if (p->ibp_mapped) {
1264                 rc = ib_memory_deregister(p->ibp_handle);
1265                 if (rc != 0)
1266                         CERROR ("Deregister error: %d\n", rc);
1267         }
1268         
1269         for (i = 0; i < npages; i++)
1270                 if (p->ibp_pages[i] != NULL)
1271                         __free_page(p->ibp_pages[i]);
1272         
1273         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1274 }
1275
1276 int
1277 kibnal_alloc_pages (kib_pages_t **pp, int npages, int access)
1278 {
1279         kib_pages_t                *p;
1280         struct ib_physical_buffer  *phys_pages;
1281         int                         i;
1282         int                         rc;
1283
1284         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1285         if (p == NULL) {
1286                 CERROR ("Can't allocate buffer %d\n", npages);
1287                 return (-ENOMEM);
1288         }
1289
1290         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1291         p->ibp_npages = npages;
1292         
1293         for (i = 0; i < npages; i++) {
1294                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1295                 if (p->ibp_pages[i] == NULL) {
1296                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1297                         kibnal_free_pages(p);
1298                         return (-ENOMEM);
1299                 }
1300         }
1301
1302         LIBCFS_ALLOC(phys_pages, npages * sizeof(*phys_pages));
1303         if (phys_pages == NULL) {
1304                 CERROR ("Can't allocate physarray for %d pages\n", npages);
1305                 kibnal_free_pages(p);
1306                 return (-ENOMEM);
1307         }
1308
1309         for (i = 0; i < npages; i++) {
1310                 phys_pages[i].size = PAGE_SIZE;
1311                 phys_pages[i].address =
1312                         lnet_page2phys(p->ibp_pages[i]);
1313         }
1314
1315         p->ibp_vaddr = 0;
1316         rc = ib_memory_register_physical(kibnal_data.kib_pd,
1317                                          phys_pages, npages,
1318                                          &p->ibp_vaddr,
1319                                          npages * PAGE_SIZE, 0,
1320                                          access,
1321                                          &p->ibp_handle,
1322                                          &p->ibp_lkey,
1323                                          &p->ibp_rkey);
1324         
1325         LIBCFS_FREE(phys_pages, npages * sizeof(*phys_pages));
1326         
1327         if (rc != 0) {
1328                 CERROR ("Error %d mapping %d pages\n", rc, npages);
1329                 kibnal_free_pages(p);
1330                 return (rc);
1331         }
1332         
1333         p->ibp_mapped = 1;
1334         *pp = p;
1335         return (0);
1336 }
1337
1338 int
1339 kibnal_setup_tx_descs (void)
1340 {
1341         int           ipage = 0;
1342         int           page_offset = 0;
1343         __u64         vaddr;
1344         __u64         vaddr_base;
1345         struct page  *page;
1346         kib_tx_t     *tx;
1347         int           i;
1348         int           rc;
1349
1350         /* pre-mapped messages are not bigger than 1 page */
1351         LASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1352
1353         /* No fancy arithmetic when we do the buffer calculations */
1354         LASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1355
1356         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1357                                 IBNAL_TX_MSG_PAGES(), 
1358                                 0);            /* local read access only */
1359         if (rc != 0)
1360                 return (rc);
1361
1362         vaddr = vaddr_base = kibnal_data.kib_tx_pages->ibp_vaddr;
1363
1364         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1365                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1366                 tx = &kibnal_data.kib_tx_descs[i];
1367
1368                 memset (tx, 0, sizeof(*tx));    /* zero flags etc */
1369                 
1370                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
1371                 tx->tx_vaddr = vaddr;
1372                 tx->tx_mapped = KIB_TX_UNMAPPED;
1373
1374                 CDEBUG(D_NET, "Tx[%d] %p->%p - "LPX64"\n", 
1375                        i, tx, tx->tx_msg, tx->tx_vaddr);
1376
1377                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1378
1379                 vaddr += IBNAL_MSG_SIZE;
1380                 LASSERT (vaddr <= vaddr_base + IBNAL_TX_MSG_BYTES());
1381
1382                 page_offset += IBNAL_MSG_SIZE;
1383                 LASSERT (page_offset <= PAGE_SIZE);
1384
1385                 if (page_offset == PAGE_SIZE) {
1386                         page_offset = 0;
1387                         ipage++;
1388                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1389                 }
1390         }
1391         
1392         return (0);
1393 }
1394
1395 void
1396 kibnal_shutdown (lnet_ni_t *ni)
1397 {
1398         int           i;
1399         int           rc;
1400         unsigned long flags;
1401
1402         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1403                atomic_read (&libcfs_kmemory));
1404
1405         LASSERT(ni == kibnal_data.kib_ni);
1406         LASSERT(ni->ni_data == &kibnal_data);
1407
1408         switch (kibnal_data.kib_init) {
1409         default:
1410                 CERROR ("Unexpected state %d\n", kibnal_data.kib_init);
1411                 LBUG();
1412
1413         case IBNAL_INIT_ALL:
1414                 /* Prevent new peers from being created */
1415                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1416                 kibnal_data.kib_nonewpeers = 1;
1417                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1418
1419                 kibnal_stop_ib_listener();
1420
1421                 /* Remove all existing peers from the peer table */
1422                 kibnal_del_peer(LNET_NID_ANY);
1423                 
1424                 /* Wait for pending conn reqs to be handled */
1425                 i = 2;
1426                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1427                 while (!list_empty(&kibnal_data.kib_connd_acceptq)) {
1428                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, 
1429                                                flags);
1430                         i++;
1431                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1432                                "waiting for conn reqs to clean up\n");
1433                         cfs_pause(cfs_time_seconds(1));
1434                         
1435                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1436                 }
1437                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1438
1439                 /* Wait for all peer state to clean up */
1440                 i = 2;
1441                 while (atomic_read(&kibnal_data.kib_npeers) != 0) {
1442                         i++;
1443                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1444                                "waiting for %d peers to close down\n",
1445                                atomic_read(&kibnal_data.kib_npeers));
1446                         cfs_pause(cfs_time_seconds(1));
1447                 }
1448                 /* fall through */
1449
1450         case IBNAL_INIT_CQ:
1451                 rc = ib_cq_destroy (kibnal_data.kib_cq);
1452                 if (rc != 0)
1453                         CERROR ("Destroy CQ error: %d\n", rc);
1454                 /* fall through */
1455
1456         case IBNAL_INIT_TXD:
1457                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1458                 /* fall through */
1459 #if IBNAL_FMR
1460         case IBNAL_INIT_FMR:
1461                 rc = ib_fmr_pool_destroy (kibnal_data.kib_fmr_pool);
1462                 if (rc != 0)
1463                         CERROR ("Destroy FMR pool error: %d\n", rc);
1464                 /* fall through */
1465 #endif
1466         case IBNAL_INIT_PD:
1467                 rc = ib_pd_destroy(kibnal_data.kib_pd);
1468                 if (rc != 0)
1469                         CERROR ("Destroy PD error: %d\n", rc);
1470                 /* fall through */
1471
1472         case IBNAL_INIT_DATA:
1473                 /* Module refcount only gets to zero when all peers
1474                  * have been closed so all lists must be empty */
1475                 LASSERT (atomic_read(&kibnal_data.kib_npeers) == 0);
1476                 LASSERT (kibnal_data.kib_peers != NULL);
1477                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1478                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1479                 }
1480                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1481                 LASSERT (list_empty (&kibnal_data.kib_sched_rxq));
1482                 LASSERT (list_empty (&kibnal_data.kib_sched_txq));
1483                 LASSERT (list_empty (&kibnal_data.kib_reaper_conns));
1484                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1485                 LASSERT (list_empty (&kibnal_data.kib_connd_acceptq));
1486
1487                 /* flag threads to terminate; wake and wait for them to die */
1488                 kibnal_data.kib_shutdown = 1;
1489                 wake_up_all (&kibnal_data.kib_sched_waitq);
1490                 wake_up_all (&kibnal_data.kib_reaper_waitq);
1491                 wake_up_all (&kibnal_data.kib_connd_waitq);
1492
1493                 i = 2;
1494                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1495                         i++;
1496                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1497                                "Waiting for %d threads to terminate\n",
1498                                atomic_read (&kibnal_data.kib_nthreads));
1499                         cfs_pause(cfs_time_seconds(1));
1500                 }
1501                 /* fall through */
1502                 
1503         case IBNAL_INIT_NOTHING:
1504                 break;
1505         }
1506
1507         if (kibnal_data.kib_tx_descs != NULL)
1508                 LIBCFS_FREE (kibnal_data.kib_tx_descs,
1509                              IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1510
1511         if (kibnal_data.kib_peers != NULL)
1512                 LIBCFS_FREE (kibnal_data.kib_peers,
1513                              sizeof (struct list_head) * 
1514                              kibnal_data.kib_peer_hash_size);
1515
1516         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1517                atomic_read (&libcfs_kmemory));
1518
1519         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1520         PORTAL_MODULE_UNUSE;
1521 }
1522
1523 int
1524 kibnal_get_ipoibidx(void)
1525 {
1526         /* NB single threaded! */
1527         static struct ib_port_properties port_props;
1528
1529         int               ipoibidx = 0;
1530         int               devidx;
1531         int               port;
1532         int               rc;
1533         struct ib_device *device;
1534
1535         for (devidx = 0; devidx <= kibnal_data.kib_hca_idx; devidx++) {
1536                 device = ib_device_get_by_index(devidx);
1537                 
1538                 if (device == NULL) {
1539                         CERROR("Can't get IB device %d\n", devidx);
1540                         return -1;
1541                 }
1542                 
1543                 for (port = 1; port <= 2; port++) {
1544                         if (devidx == kibnal_data.kib_hca_idx &&
1545                             port == kibnal_data.kib_port)
1546                                 return ipoibidx;
1547                         
1548                         rc = ib_port_properties_get(device, port,
1549                                                     &port_props);
1550                         if (rc == 0)
1551                                 ipoibidx++;
1552                 }
1553         }
1554
1555         LBUG();
1556         return -1;
1557 }
1558
1559 int
1560 kibnal_startup (lnet_ni_t *ni)
1561 {
1562         char              ipif_name[32];
1563         __u32             ip;
1564         __u32             netmask;
1565         int               up;
1566         struct timeval    tv;
1567         int               rc;
1568         int               hca;
1569         int               port;
1570         int               i;
1571         int               nob;
1572
1573         LASSERT (ni->ni_lnd == &the_kiblnd);
1574
1575         /* Only 1 instance supported */
1576         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1577                 CERROR ("Only 1 instance supported\n");
1578                 return -EPERM;
1579         }
1580
1581         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1582                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1583                         *kibnal_tunables.kib_credits,
1584                         *kibnal_tunables.kib_ntx);
1585                 return -EINVAL;
1586         }
1587
1588         memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */
1589
1590         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1591         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1592
1593         CLASSERT (LNET_MAX_INTERFACES > 1);
1594
1595
1596         kibnal_data.kib_hca_idx = 0;            /* default: first HCA */
1597         kibnal_data.kib_port = 0;               /* any port */
1598
1599         if (ni->ni_interfaces[0] != NULL) {
1600                 /* hca.port specified in 'networks=openib(h.p)' */
1601                 if (ni->ni_interfaces[1] != NULL) {
1602                         CERROR("Multiple interfaces not supported\n");
1603                         return -EPERM;
1604                 }
1605                 
1606                 nob = strlen(ni->ni_interfaces[0]);
1607                 i = sscanf(ni->ni_interfaces[0], "%d.%d%n", &hca, &port, &nob);
1608                 if (i >= 2 && nob == strlen(ni->ni_interfaces[0])) {
1609                         kibnal_data.kib_hca_idx = hca;
1610                         kibnal_data.kib_port = port;
1611                 } else {
1612                         nob = strlen(ni->ni_interfaces[0]);
1613                         i = sscanf(ni->ni_interfaces[0], "%d%n", &hca, &nob);
1614
1615                         if (i >= 1 && nob == strlen(ni->ni_interfaces[0])) {
1616                                 kibnal_data.kib_hca_idx = hca;
1617                         } else {
1618                                 CERROR("Can't parse interface '%s'\n",
1619                                        ni->ni_interfaces[0]);
1620                                 return -EINVAL;
1621                         }
1622                 }
1623         }
1624         
1625         kibnal_data.kib_ni = ni;
1626         ni->ni_data = &kibnal_data;
1627         
1628         do_gettimeofday(&tv);
1629         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1630
1631         PORTAL_MODULE_USE;
1632
1633         rwlock_init(&kibnal_data.kib_global_lock);
1634
1635         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1636         LIBCFS_ALLOC (kibnal_data.kib_peers,
1637                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1638         if (kibnal_data.kib_peers == NULL) {
1639                 goto failed;
1640         }
1641         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1642                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1643
1644         spin_lock_init (&kibnal_data.kib_reaper_lock);
1645         INIT_LIST_HEAD (&kibnal_data.kib_reaper_conns);
1646         init_waitqueue_head (&kibnal_data.kib_reaper_waitq);
1647
1648         spin_lock_init (&kibnal_data.kib_connd_lock);
1649         INIT_LIST_HEAD (&kibnal_data.kib_connd_acceptq);
1650         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1651         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1652
1653         spin_lock_init (&kibnal_data.kib_sched_lock);
1654         INIT_LIST_HEAD (&kibnal_data.kib_sched_txq);
1655         INIT_LIST_HEAD (&kibnal_data.kib_sched_rxq);
1656         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1657
1658         spin_lock_init (&kibnal_data.kib_tx_lock);
1659         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1660
1661         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1662                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1663         if (kibnal_data.kib_tx_descs == NULL) {
1664                 CERROR ("Can't allocate tx descs\n");
1665                 goto failed;
1666         }
1667
1668         /* lists/ptrs/locks initialised */
1669         kibnal_data.kib_init = IBNAL_INIT_DATA;
1670         /*****************************************************/
1671
1672         for (i = 0; i < IBNAL_N_SCHED; i++) {
1673                 rc = kibnal_thread_start (kibnal_scheduler,
1674                                           (void *)((unsigned long)i));
1675                 if (rc != 0) {
1676                         CERROR("Can't spawn openibnal scheduler[%d]: %d\n",
1677                                i, rc);
1678                         goto failed;
1679                 }
1680         }
1681
1682         /* must have at least 2 connds to remain responsive to svcqry while
1683          * connecting */
1684         if (*kibnal_tunables.kib_n_connd < 2)
1685                 *kibnal_tunables.kib_n_connd = 2;
1686
1687
1688         for (i = 0; i < *kibnal_tunables.kib_n_connd; i++) {
1689                 rc = kibnal_thread_start (kibnal_connd,
1690                                           (void *)((unsigned long)i));
1691                 if (rc != 0) {
1692                         CERROR("Can't spawn openibnal connd[%d]: %d\n",
1693                                i, rc);
1694                         goto failed;
1695                 }
1696         }
1697
1698         rc = kibnal_thread_start (kibnal_reaper, NULL);
1699         if (rc != 0) {
1700                 CERROR ("Can't spawn openibnal reaper: %d\n", rc);
1701                 goto failed;
1702         }
1703
1704         kibnal_data.kib_device = ib_device_get_by_index(kibnal_data.kib_hca_idx);
1705         if (kibnal_data.kib_device == NULL) {
1706                 CERROR ("Can't open ib device %d\n",
1707                         kibnal_data.kib_hca_idx);
1708                 goto failed;
1709         }
1710         
1711         rc = ib_device_properties_get(kibnal_data.kib_device,
1712                                       &kibnal_data.kib_device_props);
1713         if (rc != 0) {
1714                 CERROR ("Can't get device props: %d\n", rc);
1715                 goto failed;
1716         }
1717
1718         CDEBUG(D_NET, "Max Initiator: %d Max Responder %d\n", 
1719                kibnal_data.kib_device_props.max_initiator_per_qp,
1720                kibnal_data.kib_device_props.max_responder_per_qp);
1721
1722         if (kibnal_data.kib_port != 0) {
1723                 rc = ib_port_properties_get(kibnal_data.kib_device, 
1724                                             kibnal_data.kib_port,
1725                                             &kibnal_data.kib_port_props);
1726                 if (rc != 0) {
1727                         CERROR("Error %d open port %d on HCA %d\n", rc,
1728                                kibnal_data.kib_port,
1729                                kibnal_data.kib_hca_idx);
1730                         goto failed;
1731                 }
1732         } else {
1733                 for (i = 1; i <= 2; i++) {
1734                         rc = ib_port_properties_get(kibnal_data.kib_device, i,
1735                                                     &kibnal_data.kib_port_props);
1736                         if (rc == 0) {
1737                                 kibnal_data.kib_port = i;
1738                                 break;
1739                         }
1740                 }
1741                 if (kibnal_data.kib_port == 0) {
1742                         CERROR ("Can't find a port\n");
1743                         goto failed;
1744                 }
1745         }
1746
1747         i = kibnal_get_ipoibidx();
1748         if (i < 0)
1749                 goto failed;
1750         
1751         snprintf(ipif_name, sizeof(ipif_name), "%s%d",
1752                  *kibnal_tunables.kib_ipif_basename, i);
1753         if (strlen(ipif_name) == sizeof(ipif_name) - 1) {
1754                 CERROR("IPoIB interface name %s truncated\n", ipif_name);
1755                 return -EINVAL;
1756         }
1757         
1758         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1759         if (rc != 0) {
1760                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1761                 goto failed;
1762         }
1763         
1764         if (!up) {
1765                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1766                 goto failed;
1767         }
1768         
1769         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1770
1771         rc = ib_pd_create(kibnal_data.kib_device,
1772                           NULL, &kibnal_data.kib_pd);
1773         if (rc != 0) {
1774                 CERROR ("Can't create PD: %d\n", rc);
1775                 goto failed;
1776         }
1777         
1778         /* flag PD initialised */
1779         kibnal_data.kib_init = IBNAL_INIT_PD;
1780         /*****************************************************/
1781 #if IBNAL_FMR
1782         {
1783                 const int pool_size = *kibnal_tunables.kib_ntx;
1784                 struct ib_fmr_pool_param params = {
1785                         .max_pages_per_fmr = LNET_MAX_PAYLOAD/PAGE_SIZE,
1786                         .access            = (IB_ACCESS_LOCAL_WRITE |
1787                                               IB_ACCESS_REMOTE_WRITE |
1788                                               IB_ACCESS_REMOTE_READ),
1789                         .pool_size         = pool_size,
1790                         .dirty_watermark   = (pool_size * 3)/4,
1791                         .flush_function    = NULL,
1792                         .flush_arg         = NULL,
1793                         .cache             = 1,
1794                 };
1795                 rc = ib_fmr_pool_create(kibnal_data.kib_pd, &params,
1796                                         &kibnal_data.kib_fmr_pool);
1797                 if (rc != 0) {
1798                         CERROR ("Can't create FMR pool size %d: %d\n", 
1799                                 pool_size, rc);
1800                         goto failed;
1801                 }
1802         }
1803
1804         /* flag FMR pool initialised */
1805         kibnal_data.kib_init = IBNAL_INIT_FMR;
1806 #endif
1807         /*****************************************************/
1808
1809         rc = kibnal_setup_tx_descs();
1810         if (rc != 0) {
1811                 CERROR ("Can't register tx descs: %d\n", rc);
1812                 goto failed;
1813         }
1814         
1815         /* flag TX descs initialised */
1816         kibnal_data.kib_init = IBNAL_INIT_TXD;
1817         /*****************************************************/
1818         
1819         {
1820                 struct ib_cq_callback callback = {
1821                         .context        = IBNAL_CALLBACK_CTXT,
1822                         .policy         = IB_CQ_PROVIDER_REARM,
1823                         .function       = {
1824                                 .entry  = kibnal_callback,
1825                         },
1826                         .arg            = NULL,
1827                 };
1828                 int  nentries = IBNAL_CQ_ENTRIES();
1829                 
1830                 rc = ib_cq_create (kibnal_data.kib_device, 
1831                                    &nentries, &callback, NULL,
1832                                    &kibnal_data.kib_cq);
1833                 if (rc != 0) {
1834                         CERROR ("Can't create CQ: %d\n", rc);
1835                         goto failed;
1836                 }
1837
1838                 /* I only want solicited events */
1839                 rc = ib_cq_request_notification(kibnal_data.kib_cq, 1);
1840                 LASSERT (rc == 0);
1841         }
1842
1843         /* flag CQ initialised */
1844         kibnal_data.kib_init = IBNAL_INIT_CQ;
1845         /*****************************************************/
1846
1847         rc = kibnal_start_ib_listener();
1848         if (rc != 0)
1849                 goto failed;
1850         
1851         /* flag everything initialised */
1852         kibnal_data.kib_init = IBNAL_INIT_ALL;
1853         /*****************************************************/
1854
1855         return 0;
1856
1857  failed:
1858         kibnal_shutdown(ni);    
1859         return -ENETDOWN;
1860 }
1861
1862 void __exit
1863 kibnal_module_fini (void)
1864 {
1865         lnet_unregister_lnd(&the_kiblnd);
1866         kibnal_tunables_fini();
1867 }
1868
1869 int __init
1870 kibnal_module_init (void)
1871 {
1872         int    rc;
1873
1874         rc = kibnal_tunables_init();
1875         if (rc != 0)
1876                 return rc;
1877         
1878         lnet_register_lnd(&the_kiblnd);
1879
1880         return (0);
1881 }
1882
1883 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1884 #ifdef USING_TSAPI
1885 MODULE_DESCRIPTION("Kernel Cisco IB LND v1.00");
1886 #else
1887 MODULE_DESCRIPTION("Kernel OpenIB(gen1) LND v1.00");
1888 #endif
1889 MODULE_LICENSE("GPL");
1890
1891 module_init(kibnal_module_init);
1892 module_exit(kibnal_module_fini);
1893