Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd.c
1 /*
2  * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * GPL HEADER START
6  *
7  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License version 2 only,
11  * as published by the Free Software Foundation.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License version 2 for more details (a copy is included
17  * in the LICENSE file that accompanied this code).
18  *
19  * You should have received a copy of the GNU General Public License
20  * version 2 along with this program; If not, see [sun.com URL with a
21  * copy of GPLv2].
22  *
23  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24  * CA 95054 USA or visit www.sun.com if you need additional information or
25  * have any questions.
26  *
27  * GPL HEADER END
28  */
29 /*
30  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
31  * Use is subject to license terms.
32  */
33 /*
34  * This file is part of Lustre, http://www.lustre.org/
35  * Lustre is a trademark of Sun Microsystems, Inc.
36  *
37  * lnet/klnds/openiblnd/openiblnd.c
38  *
39  * Author: Eric Barton <eric@bartonsoftware.com>
40  */
41
42 #include "openiblnd.h"
43
44 lnd_t the_kiblnd = {
45 #ifdef USING_TSAPI
46         .lnd_type       = CIBLND,
47 #else
48         .lnd_type       = OPENIBLND,
49 #endif
50         .lnd_startup    = kibnal_startup,
51         .lnd_shutdown   = kibnal_shutdown,
52         .lnd_ctl        = kibnal_ctl,
53         .lnd_send       = kibnal_send,
54         .lnd_recv       = kibnal_recv,
55         .lnd_eager_recv = kibnal_eager_recv,
56         .lnd_accept     = kibnal_accept,
57 };
58
59 kib_data_t              kibnal_data;
60
61 __u32 
62 kibnal_cksum (void *ptr, int nob)
63 {
64         char  *c  = ptr;
65         __u32  sum = 0;
66
67         while (nob-- > 0)
68                 sum = ((sum << 1) | (sum >> 31)) + *c++;
69
70         /* ensure I don't return 0 (== no checksum) */
71         return (sum == 0) ? 1 : sum;
72 }
73
74 void
75 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
76 {
77         msg->ibm_type = type;
78         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
79 }
80
81 void
82 kibnal_pack_msg(kib_msg_t *msg, int version, int credits, 
83                 lnet_nid_t dstnid, __u64 dststamp)
84 {
85         /* CAVEAT EMPTOR! all message fields not set here should have been
86          * initialised previously. */
87         msg->ibm_magic    = IBNAL_MSG_MAGIC;
88         msg->ibm_version  = version;
89         /*   ibm_type */
90         msg->ibm_credits  = credits;
91         /*   ibm_nob */
92         msg->ibm_cksum    = 0;
93         msg->ibm_srcnid   = lnet_ptlcompat_srcnid(kibnal_data.kib_ni->ni_nid,
94                                                   dstnid);
95         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
96         msg->ibm_dstnid   = dstnid;
97         msg->ibm_dststamp = dststamp;
98
99         if (*kibnal_tunables.kib_cksum) {
100                 /* NB ibm_cksum zero while computing cksum */
101                 msg->ibm_cksum    = kibnal_cksum(msg, msg->ibm_nob);
102         }
103 }
104
105 int
106 kibnal_unpack_msg(kib_msg_t *msg, int expected_version, int nob)
107 {
108         const int hdr_size = offsetof(kib_msg_t, ibm_u);
109         __u32     msg_cksum;
110         int       msg_version;
111         int       flip;
112         int       msg_nob;
113
114         if (nob < 6) {
115                 CERROR("Short message: %d\n", nob);
116                 return -EPROTO;
117         }
118
119         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
120                 flip = 0;
121         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
122                 flip = 1;
123         } else {
124                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
125                 return -EPROTO;
126         }
127
128         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
129         if ((expected_version == 0) ?
130             (msg_version != IBNAL_MSG_VERSION &&
131              msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) :
132             (msg_version != expected_version)) {
133                 CERROR("Bad version: %x\n", msg_version);
134                 return -EPROTO;
135         }
136
137         if (nob < hdr_size) {
138                 CERROR("Short message: %d\n", nob);
139                 return -EPROTO;
140         }
141
142         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
143         if (msg_nob > nob) {
144                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
145                 return -EPROTO;
146         }
147
148         /* checksum must be computed with ibm_cksum zero and BEFORE anything
149          * gets flipped */
150         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
151         msg->ibm_cksum = 0;
152         if (msg_cksum != 0 &&
153             msg_cksum != kibnal_cksum(msg, msg_nob)) {
154                 CERROR("Bad checksum\n");
155                 return -EPROTO;
156         }
157         msg->ibm_cksum = msg_cksum;
158         
159         if (flip) {
160                 /* leave magic unflipped as a clue to peer endianness */
161                 msg->ibm_version = msg_version;
162                 LASSERT (sizeof(msg->ibm_type) == 1);
163                 LASSERT (sizeof(msg->ibm_credits) == 1);
164                 msg->ibm_nob = msg_nob;
165                 __swab64s(&msg->ibm_srcnid);
166                 __swab64s(&msg->ibm_srcstamp);
167                 __swab64s(&msg->ibm_dstnid);
168                 __swab64s(&msg->ibm_dststamp);
169         }
170         
171         if (msg->ibm_srcnid == LNET_NID_ANY) {
172                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
173                 return -EPROTO;
174         }
175
176         switch (msg->ibm_type) {
177         default:
178                 CERROR("Unknown message type %x\n", msg->ibm_type);
179                 return -EPROTO;
180                 
181         case IBNAL_MSG_SVCQRY:
182         case IBNAL_MSG_NOOP:
183                 break;
184
185         case IBNAL_MSG_SVCRSP:
186                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.svcrsp)) {
187                         CERROR("Short SVCRSP: %d(%d)\n", msg_nob,
188                                (int)(hdr_size + sizeof(msg->ibm_u.svcrsp)));
189                         return -EPROTO;
190                 }
191                 if (flip) {
192                         __swab64s(&msg->ibm_u.svcrsp.ibsr_svc_id);
193                         __swab16s(&msg->ibm_u.svcrsp.ibsr_svc_pkey);
194                 }
195                 break;
196
197         case IBNAL_MSG_CONNREQ:
198         case IBNAL_MSG_CONNACK:
199                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
200                         CERROR("Short CONNREQ: %d(%d)\n", msg_nob,
201                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
202                         return -EPROTO;
203                 }
204                 if (flip)
205                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
206                 break;
207
208         case IBNAL_MSG_IMMEDIATE:
209                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
210                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
211                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
212                         return -EPROTO;
213                 }
214                 break;
215
216         case IBNAL_MSG_PUT_RDMA:
217         case IBNAL_MSG_GET_RDMA:
218                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.rdma)) {
219                         CERROR("Short RDMA req: %d(%d)\n", msg_nob,
220                                (int)(hdr_size + sizeof(msg->ibm_u.rdma)));
221                         return -EPROTO;
222                 }
223                 if (flip) {
224                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_key);
225                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_nob);
226                         __swab64s(&msg->ibm_u.rdma.ibrm_desc.rd_addr);
227                 }
228                 break;
229
230         case IBNAL_MSG_PUT_DONE:
231         case IBNAL_MSG_GET_DONE:
232                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
233                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
234                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
235                         return -EPROTO;
236                 }
237                 if (flip)
238                         __swab32s(&msg->ibm_u.completion.ibcm_status);
239                 break;
240         }
241         return 0;
242 }
243
244 int
245 kibnal_make_svcqry (kib_conn_t *conn) 
246 {
247         kib_peer_t    *peer = conn->ibc_peer;
248         int            version = IBNAL_MSG_VERSION;
249         int            msg_version;
250         kib_msg_t     *msg;
251         struct socket *sock;
252         int            rc;
253         int            nob;
254
255         LASSERT (conn->ibc_connreq != NULL);
256         msg = &conn->ibc_connreq->cr_msg;
257
258  again:
259         kibnal_init_msg(msg, IBNAL_MSG_SVCQRY, 0);
260         kibnal_pack_msg(msg, version, 0, peer->ibp_nid, 0);
261
262         rc = lnet_connect(&sock, peer->ibp_nid,
263                           0, peer->ibp_ip, peer->ibp_port);
264         if (rc != 0)
265                 return -ECONNABORTED;
266         
267         rc = libcfs_sock_write(sock, msg, msg->ibm_nob,
268                                lnet_acceptor_timeout());
269         if (rc != 0) {
270                 CERROR("Error %d sending svcqry to %s at %u.%u.%u.%u/%d\n", 
271                        rc, libcfs_nid2str(peer->ibp_nid), 
272                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
273                 goto out;
274         }
275
276         /* The first 6 bytes are invariably MAGIC + proto version */
277         rc = libcfs_sock_read(sock, msg, 6, *kibnal_tunables.kib_timeout);
278         if (rc != 0) {
279                 CERROR("Error %d receiving svcrsp from %s at %u.%u.%u.%u/%d\n", 
280                        rc, libcfs_nid2str(peer->ibp_nid), 
281                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
282                 goto out;
283         }
284
285         if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
286             msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
287                 CERROR("Bad magic: %08x from %s at %u.%u.%u.%u/%d\n",
288                        msg->ibm_magic, libcfs_nid2str(peer->ibp_nid),
289                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
290                 rc = -EPROTO;
291                 goto out;
292         }
293
294         msg_version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ? 
295                       msg->ibm_version : __swab16(msg->ibm_version);
296         if (msg_version != version) {
297                 if (version == IBNAL_MSG_VERSION) {
298                         /* retry with previous version */
299                         libcfs_sock_release(sock);
300                         version = IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD;
301                         goto again;
302                 }
303                 
304                 CERROR("Bad version %x from %s at %u.%u.%u.%u/%d\n",
305                        msg_version, libcfs_nid2str(peer->ibp_nid),
306                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
307                 rc = -EPROTO;
308                 goto out;
309         }
310
311         /* Read in the rest of the message now we know the expected format */
312         nob = offsetof(kib_msg_t, ibm_u) + sizeof(kib_svcrsp_t);
313         rc = libcfs_sock_read(sock, ((char *)msg) + 6, nob - 6,
314                               *kibnal_tunables.kib_timeout);
315         if (rc != 0) {
316                 CERROR("Error %d receiving svcrsp from %s at %u.%u.%u.%u/%d\n", 
317                        rc, libcfs_nid2str(peer->ibp_nid), 
318                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
319                 goto out;
320         }
321
322         rc = kibnal_unpack_msg(msg, version, nob);
323         if (rc != 0) {
324                 CERROR("Error %d unpacking svcrsp from %s at %u.%u.%u.%u/%d\n", 
325                        rc, libcfs_nid2str(peer->ibp_nid), 
326                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
327                 goto out;
328         }
329                        
330         if (msg->ibm_type != IBNAL_MSG_SVCRSP) {
331                 CERROR("Unexpected response type %d from %s at %u.%u.%u.%u/%d\n", 
332                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid), 
333                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
334                 rc = -EPROTO;
335                 goto out;
336         }
337         
338         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
339                                      msg->ibm_dstnid) ||
340             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
341                 CERROR("Unexpected dst NID/stamp %s/"LPX64" from "
342                        "%s at %u.%u.%u.%u/%d\n", 
343                        libcfs_nid2str(msg->ibm_dstnid), msg->ibm_dststamp,
344                        libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip), 
345                        peer->ibp_port);
346                 rc = -EPROTO;
347                 goto out;
348         }
349
350         if (!lnet_ptlcompat_matchnid(peer->ibp_nid, msg->ibm_srcnid)) {
351                 CERROR("Unexpected src NID %s from %s at %u.%u.%u.%u/%d\n", 
352                        libcfs_nid2str(msg->ibm_srcnid),
353                        libcfs_nid2str(peer->ibp_nid), 
354                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
355                 rc = -EPROTO;
356                 goto out;
357         }
358
359         conn->ibc_incarnation = msg->ibm_srcstamp;
360         conn->ibc_connreq->cr_svcrsp = msg->ibm_u.svcrsp;
361         conn->ibc_version = version;
362         
363  out:
364         libcfs_sock_release(sock);
365         return rc;
366 }
367
368 void
369 kibnal_handle_svcqry (struct socket *sock)
370 {
371         __u32                peer_ip;
372         unsigned int         peer_port;
373         kib_msg_t           *msg;
374         __u64                srcnid;
375         __u64                srcstamp;
376         int                  version;
377         int                  reject = 0;
378         int                  rc;
379
380         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
381         if (rc != 0) {
382                 CERROR("Can't get peer's IP: %d\n", rc);
383                 return;
384         }
385
386         LIBCFS_ALLOC(msg, sizeof(*msg));
387         if (msg == NULL) {
388                 CERROR("Can't allocate msgs for %u.%u.%u.%u/%d\n",
389                        HIPQUAD(peer_ip), peer_port);
390                 return;
391         }
392         
393         rc = libcfs_sock_read(sock, &msg->ibm_magic, sizeof(msg->ibm_magic),
394                               lnet_acceptor_timeout());
395         if (rc != 0) {
396                 CERROR("Error %d receiving svcqry(1) from %u.%u.%u.%u/%d\n",
397                        rc, HIPQUAD(peer_ip), peer_port);
398                 goto out;
399         }
400
401         if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
402             msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
403                 /* Unexpected magic! */
404                 if (the_lnet.ln_ptlcompat == 0) {
405                         if (msg->ibm_magic == LNET_PROTO_MAGIC ||
406                             msg->ibm_magic == __swab32(LNET_PROTO_MAGIC)) {
407                                 /* future protocol version compatibility!
408                                  * When LNET unifies protocols over all LNDs,
409                                  * the first thing sent will be a version
410                                  * query.  I send back a reply in my current
411                                  * protocol to tell her I'm "old" */
412                                 kibnal_init_msg(msg, 0, 0);
413                                 kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, 
414                                                 LNET_NID_ANY, 0);
415                                 reject = 1;
416                                 goto reply;
417                         }
418
419                         CERROR ("Bad magic(1) %#08x (%#08x expected) from "
420                                 "%u.%u.%u.%u/%d\n", msg->ibm_magic,
421                                 IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port);
422                         goto out;
423                 }
424
425                 /* When portals compatibility is set, I may be passed a new
426                  * connection "blindly" by the acceptor, and I have to
427                  * determine if my peer has sent an acceptor connection request
428                  * or not. */
429                 rc = lnet_accept(kibnal_data.kib_ni, sock, msg->ibm_magic);
430                 if (rc != 0)
431                         goto out;
432
433                 /* It was an acceptor connection request!
434                  * Now I should see my magic... */
435                 rc = libcfs_sock_read(sock, &msg->ibm_magic,
436                                       sizeof(msg->ibm_magic),
437                                       lnet_acceptor_timeout());
438                 if (rc != 0) {
439                         CERROR("Error %d receiving svcqry(2) from %u.%u.%u.%u/%d\n",
440                                rc, HIPQUAD(peer_ip), peer_port);
441                         goto out;
442                 }
443
444                 if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
445                     msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
446                         CERROR ("Bad magic(2) %#08x (%#08x expected) from "
447                                 "%u.%u.%u.%u/%d\n", msg->ibm_magic,
448                                 IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port);
449                         goto out;
450                 }
451         }
452
453         /* Now check version */
454
455         rc = libcfs_sock_read(sock, &msg->ibm_version, sizeof(msg->ibm_version),
456                               lnet_acceptor_timeout());
457         if (rc != 0) {
458                 CERROR("Error %d receiving svcqry(3) from %u.%u.%u.%u/%d\n",
459                        rc, HIPQUAD(peer_ip), peer_port);
460                 goto out;
461         }
462
463         version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ?
464                   msg->ibm_version : __swab16(msg->ibm_version);
465         /* Peer is a different protocol version: reply in my current protocol
466          * to tell her I'm "old" */
467         if (version != IBNAL_MSG_VERSION &&
468             version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
469                 kibnal_init_msg(msg, 0, 0);
470                 kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, LNET_NID_ANY, 0);
471                 reject = 1;
472                 goto reply;
473         }
474         
475         /* Now read in all the rest */
476         rc = libcfs_sock_read(sock, &msg->ibm_type,
477                               offsetof(kib_msg_t, ibm_u) -
478                               offsetof(kib_msg_t, ibm_type),
479                               lnet_acceptor_timeout());
480         if (rc != 0) {
481                 CERROR("Error %d receiving svcqry(4) from %u.%u.%u.%u/%d\n",
482                        rc, HIPQUAD(peer_ip), peer_port);
483                 goto out;
484         }
485         
486         rc = kibnal_unpack_msg(msg, version, offsetof(kib_msg_t, ibm_u));
487         if (rc != 0) {
488                 CERROR("Error %d unpacking svcqry from %u.%u.%u.%u/%d\n",
489                        rc, HIPQUAD(peer_ip), peer_port);
490                 goto out;
491         }
492         
493         if (msg->ibm_type != IBNAL_MSG_SVCQRY) {
494                 CERROR("Unexpected message %d from %u.%u.%u.%u/%d\n",
495                        msg->ibm_type, HIPQUAD(peer_ip), peer_port);
496                 goto out;
497         }
498         
499         if (!lnet_ptlcompat_matchnid(kibnal_data.kib_ni->ni_nid,
500                                      msg->ibm_dstnid)) {
501                 CERROR("Unexpected dstnid %s: expected %s from %u.%u.%u.%u/%d\n",
502                        libcfs_nid2str(msg->ibm_dstnid),
503                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid),
504                        HIPQUAD(peer_ip), peer_port);
505                 goto out;
506         }
507
508         srcnid = msg->ibm_srcnid;
509         srcstamp = msg->ibm_srcstamp;
510         
511         kibnal_init_msg(msg, IBNAL_MSG_SVCRSP, sizeof(msg->ibm_u.svcrsp));
512
513         msg->ibm_u.svcrsp.ibsr_svc_id = kibnal_data.kib_svc_id;
514         memcpy(msg->ibm_u.svcrsp.ibsr_svc_gid, kibnal_data.kib_svc_gid,
515                sizeof(kibnal_data.kib_svc_gid));
516         msg->ibm_u.svcrsp.ibsr_svc_pkey = kibnal_data.kib_svc_pkey;
517
518         kibnal_pack_msg(msg, version, 0, srcnid, srcstamp);
519
520  reply:
521         rc = libcfs_sock_write (sock, msg, msg->ibm_nob,
522                                 lnet_acceptor_timeout());
523         if (!reject && rc != 0) {
524                 /* Only complain if we're not rejecting */
525                 CERROR("Error %d replying to svcqry from %u.%u.%u.%u/%d\n",
526                        rc, HIPQUAD(peer_ip), peer_port);
527                 goto out;
528         }
529         
530  out:
531         LIBCFS_FREE(msg, sizeof(*msg));
532 }
533
534 void
535 kibnal_free_acceptsock (kib_acceptsock_t *as)
536 {
537         libcfs_sock_release(as->ibas_sock);
538         LIBCFS_FREE(as, sizeof(*as));
539 }
540
541 int
542 kibnal_accept(lnet_ni_t *ni, struct socket *sock)
543 {
544         kib_acceptsock_t  *as;
545         unsigned long      flags;
546
547         LIBCFS_ALLOC(as, sizeof(*as));
548         if (as == NULL) {
549                 CERROR("Out of Memory\n");
550                 return -ENOMEM;
551         }
552
553         as->ibas_sock = sock;
554                 
555         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
556                 
557         list_add_tail(&as->ibas_list, &kibnal_data.kib_connd_acceptq);
558         wake_up(&kibnal_data.kib_connd_waitq);
559
560         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
561         return 0;
562 }
563
564 int
565 kibnal_start_ib_listener (void) 
566 {
567         int    rc;
568
569         LASSERT (kibnal_data.kib_listen_handle == NULL);
570
571         kibnal_data.kib_svc_id = ib_cm_service_assign();
572         CDEBUG(D_NET, "svc id "LPX64"\n", kibnal_data.kib_svc_id);
573
574         rc = ib_cached_gid_get(kibnal_data.kib_device,
575                                kibnal_data.kib_port, 0,
576                                kibnal_data.kib_svc_gid);
577         if (rc != 0) {
578                 CERROR("Can't get port %d GID: %d\n",
579                        kibnal_data.kib_port, rc);
580                 return rc;
581         }
582         
583         rc = ib_cached_pkey_get(kibnal_data.kib_device,
584                                 kibnal_data.kib_port, 0,
585                                 &kibnal_data.kib_svc_pkey);
586         if (rc != 0) {
587                 CERROR ("Can't get port %d PKEY: %d\n",
588                         kibnal_data.kib_port, rc);
589                 return rc;
590         }
591
592         rc = ib_cm_listen(kibnal_data.kib_svc_id,
593                           TS_IB_CM_SERVICE_EXACT_MASK,
594                           kibnal_passive_conn_callback, NULL,
595                           &kibnal_data.kib_listen_handle);
596         if (rc != 0) {
597                 kibnal_data.kib_listen_handle = NULL;
598                 CERROR ("Can't create IB listener: %d\n", rc);
599                 return rc;
600         }
601         
602         LASSERT (kibnal_data.kib_listen_handle != NULL);
603         return 0;
604 }
605
606 void
607 kibnal_stop_ib_listener (void) 
608 {
609         int    rc;
610         
611         LASSERT (kibnal_data.kib_listen_handle != NULL);
612
613         rc = ib_cm_listen_stop (kibnal_data.kib_listen_handle);
614         if (rc != 0)
615                 CERROR("Error stopping IB listener: %d\n", rc);
616                 
617         kibnal_data.kib_listen_handle = NULL;
618 }
619
620 int
621 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
622 {
623         kib_peer_t     *peer;
624         unsigned long   flags;
625         int             rc;
626
627         LASSERT (nid != LNET_NID_ANY);
628
629         LIBCFS_ALLOC(peer, sizeof (*peer));
630         if (peer == NULL) {
631                 CERROR("Cannot allocate peer\n");
632                 return -ENOMEM;
633         }
634
635         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
636
637         peer->ibp_nid = nid;
638         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
639
640         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
641         INIT_LIST_HEAD (&peer->ibp_conns);
642         INIT_LIST_HEAD (&peer->ibp_tx_queue);
643         INIT_LIST_HEAD (&peer->ibp_connd_list); /* not queued for connecting */
644
645         peer->ibp_error = 0;
646         peer->ibp_last_alive = cfs_time_current();
647         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
648
649         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
650
651         if (atomic_read(&kibnal_data.kib_npeers) >=
652             *kibnal_tunables.kib_concurrent_peers) {
653                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
654         } else if (kibnal_data.kib_nonewpeers) {
655                 rc = -ESHUTDOWN;        /* shutdown has started */
656         } else {
657                 rc = 0;
658                 /* npeers only grows with kib_global_lock held */
659                 atomic_inc(&kibnal_data.kib_npeers);
660         }
661         
662         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
663
664         if (rc != 0) {
665                 CERROR("Can't create peer: %s\n", 
666                        (rc == -ESHUTDOWN) ? "shutting down" : 
667                        "too many peers");
668                 LIBCFS_FREE(peer, sizeof(*peer));
669         } else {
670                 *peerp = peer;
671         }
672         
673         return rc;
674 }
675
676 void
677 kibnal_destroy_peer (kib_peer_t *peer)
678 {
679         CDEBUG (D_NET, "peer %s %p deleted\n", 
680                 libcfs_nid2str(peer->ibp_nid), peer);
681
682         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
683         LASSERT (peer->ibp_persistence == 0);
684         LASSERT (!kibnal_peer_active(peer));
685         LASSERT (peer->ibp_connecting == 0);
686         LASSERT (peer->ibp_accepting == 0);
687         LASSERT (list_empty (&peer->ibp_connd_list));
688         LASSERT (list_empty (&peer->ibp_conns));
689         LASSERT (list_empty (&peer->ibp_tx_queue));
690
691         LIBCFS_FREE (peer, sizeof (*peer));
692
693         /* NB a peer's connections keep a reference on their peer until
694          * they are destroyed, so we can be assured that _all_ state to do
695          * with this peer has been cleaned up when its refcount drops to
696          * zero. */
697         atomic_dec(&kibnal_data.kib_npeers);
698 }
699
700 kib_peer_t *
701 kibnal_find_peer_locked (lnet_nid_t nid)
702 {
703         struct list_head *peer_list = kibnal_nid2peerlist (nid);
704         struct list_head *tmp;
705         kib_peer_t       *peer;
706
707         list_for_each (tmp, peer_list) {
708
709                 peer = list_entry (tmp, kib_peer_t, ibp_list);
710
711                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
712                          peer->ibp_connecting != 0 || /* creating conns */
713                          peer->ibp_accepting != 0 ||
714                          !list_empty (&peer->ibp_conns));  /* active conn */
715
716                 if (peer->ibp_nid != nid)
717                         continue;
718
719                 return (peer);
720         }
721         return (NULL);
722 }
723
724 kib_peer_t *
725 kibnal_get_peer (lnet_nid_t nid)
726 {
727         kib_peer_t     *peer;
728         unsigned long   flags;
729
730         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
731         peer = kibnal_find_peer_locked (nid);
732         if (peer != NULL)                       /* +1 ref for caller? */
733                 kibnal_peer_addref(peer);
734         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
735
736         return (peer);
737 }
738
739 void
740 kibnal_unlink_peer_locked (kib_peer_t *peer)
741 {
742         LASSERT (peer->ibp_persistence == 0);
743         LASSERT (list_empty(&peer->ibp_conns));
744
745         LASSERT (kibnal_peer_active(peer));
746         list_del_init (&peer->ibp_list);
747         /* lose peerlist's ref */
748         kibnal_peer_decref(peer);
749 }
750
751 int
752 kibnal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,
753                       int *persistencep)
754 {
755         kib_peer_t        *peer;
756         struct list_head  *ptmp;
757         unsigned long      flags;
758         int                i;
759
760         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
761
762         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
763
764                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
765                         
766                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
767                         LASSERT (peer->ibp_persistence != 0 ||
768                                  peer->ibp_connecting != 0 ||
769                                  peer->ibp_accepting != 0 ||
770                                  !list_empty (&peer->ibp_conns));
771
772                         if (index-- > 0)
773                                 continue;
774
775                         *nidp = peer->ibp_nid;
776                         *ipp = peer->ibp_ip;
777                         *portp = peer->ibp_port;
778                         *persistencep = peer->ibp_persistence;
779                         
780                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
781                                                flags);
782                         return (0);
783                 }
784         }
785
786         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
787         return (-ENOENT);
788 }
789
790 int
791 kibnal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port)
792 {
793         unsigned long      flags;
794         kib_peer_t        *peer;
795         kib_peer_t        *peer2;
796         int                rc;
797         
798         if (nid == LNET_NID_ANY)
799                 return (-EINVAL);
800
801         rc = kibnal_create_peer (&peer, nid);
802         if (rc != 0)
803                 return rc;
804
805         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
806
807         /* I'm always called with a reference on kibnal_data.kib_ni
808          * so shutdown can't have started */
809         LASSERT (kibnal_data.kib_nonewpeers == 0);
810
811         peer2 = kibnal_find_peer_locked (nid);
812         if (peer2 != NULL) {
813                 kibnal_peer_decref(peer);
814                 peer = peer2;
815         } else {
816                 /* peer table takes existing ref on peer */
817                 list_add_tail (&peer->ibp_list,
818                                kibnal_nid2peerlist (nid));
819         }
820
821         peer->ibp_ip = ip;
822         peer->ibp_port = port;
823         peer->ibp_persistence++;
824         
825         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
826         return (0);
827 }
828
829 void
830 kibnal_del_peer_locked (kib_peer_t *peer)
831 {
832         struct list_head *ctmp;
833         struct list_head *cnxt;
834         kib_conn_t       *conn;
835
836         peer->ibp_persistence = 0;
837
838         if (list_empty(&peer->ibp_conns)) {
839                 kibnal_unlink_peer_locked(peer);
840         } else {
841                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
842                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
843
844                         kibnal_close_conn_locked (conn, 0);
845                 }
846                 /* NB peer is no longer persistent; closing its last conn
847                  * unlinked it. */
848         }
849         /* NB peer now unlinked; might even be freed if the peer table had the
850          * last ref on it. */
851 }
852
853 int
854 kibnal_del_peer (lnet_nid_t nid)
855 {
856         unsigned long      flags;
857         CFS_LIST_HEAD     (zombies);
858         struct list_head  *ptmp;
859         struct list_head  *pnxt;
860         kib_peer_t        *peer;
861         int                lo;
862         int                hi;
863         int                i;
864         int                rc = -ENOENT;
865
866         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
867
868         if (nid != LNET_NID_ANY)
869                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
870         else {
871                 lo = 0;
872                 hi = kibnal_data.kib_peer_hash_size - 1;
873         }
874
875         for (i = lo; i <= hi; i++) {
876                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
877                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
878                         LASSERT (peer->ibp_persistence != 0 ||
879                                  peer->ibp_connecting != 0 ||
880                                  peer->ibp_accepting != 0 ||
881                                  !list_empty (&peer->ibp_conns));
882
883                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
884                                 continue;
885
886                         if (!list_empty(&peer->ibp_tx_queue)) {
887                                 LASSERT (list_empty(&peer->ibp_conns));
888
889                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
890                         }
891
892                         kibnal_del_peer_locked (peer);
893                         rc = 0;         /* matched something */
894                 }
895         }
896
897         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
898
899         kibnal_txlist_done(&zombies, -EIO);
900
901         return (rc);
902 }
903
904 kib_conn_t *
905 kibnal_get_conn_by_idx (int index)
906 {
907         kib_peer_t        *peer;
908         struct list_head  *ptmp;
909         kib_conn_t        *conn;
910         struct list_head  *ctmp;
911         unsigned long      flags;
912         int                i;
913
914         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
915
916         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
917                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
918
919                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
920                         LASSERT (peer->ibp_persistence > 0 ||
921                                  peer->ibp_connecting != 0 ||
922                                  peer->ibp_accepting != 0 ||
923                                  !list_empty (&peer->ibp_conns));
924
925                         list_for_each (ctmp, &peer->ibp_conns) {
926                                 if (index-- > 0)
927                                         continue;
928
929                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
930                                 kibnal_conn_addref(conn);
931                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
932                                                        flags);
933                                 return (conn);
934                         }
935                 }
936         }
937
938         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
939         return (NULL);
940 }
941
942 kib_conn_t *
943 kibnal_create_conn (void)
944 {
945         kib_conn_t  *conn;
946         int          i;
947         __u64        vaddr = 0;
948         __u64        vaddr_base;
949         int          page_offset;
950         int          ipage;
951         int          rc;
952         union {
953                 struct ib_qp_create_param  qp_create;
954                 struct ib_qp_attribute     qp_attr;
955         } params;
956         
957         LIBCFS_ALLOC (conn, sizeof (*conn));
958         if (conn == NULL) {
959                 CERROR ("Can't allocate connection\n");
960                 return (NULL);
961         }
962
963         /* zero flags, NULL pointers etc... */
964         memset (conn, 0, sizeof (*conn));
965
966         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
967         INIT_LIST_HEAD (&conn->ibc_tx_queue);
968         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
969         INIT_LIST_HEAD (&conn->ibc_active_txs);
970         spin_lock_init (&conn->ibc_lock);
971         
972         atomic_inc (&kibnal_data.kib_nconns);
973         /* well not really, but I call destroy() on failure, which decrements */
974
975         LIBCFS_ALLOC (conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
976         if (conn->ibc_rxs == NULL)
977                 goto failed;
978         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
979
980         rc = kibnal_alloc_pages(&conn->ibc_rx_pages,
981                                 IBNAL_RX_MSG_PAGES,
982                                 IB_ACCESS_LOCAL_WRITE);
983         if (rc != 0)
984                 goto failed;
985
986         vaddr_base = vaddr = conn->ibc_rx_pages->ibp_vaddr;
987
988         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
989                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
990                 kib_rx_t   *rx = &conn->ibc_rxs[i];
991
992                 rx->rx_conn = conn;
993                 rx->rx_vaddr = vaddr;
994                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
995                 
996                 vaddr += IBNAL_MSG_SIZE;
997                 LASSERT (vaddr <= vaddr_base + IBNAL_RX_MSG_BYTES);
998                 
999                 page_offset += IBNAL_MSG_SIZE;
1000                 LASSERT (page_offset <= PAGE_SIZE);
1001
1002                 if (page_offset == PAGE_SIZE) {
1003                         page_offset = 0;
1004                         ipage++;
1005                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
1006                 }
1007         }
1008
1009         /* We can post up to IBNAL_RX_MSGS, which may also include an
1010          * additional RDMA work item */
1011
1012         params.qp_create = (struct ib_qp_create_param) {
1013                 .limit = {
1014                         .max_outstanding_send_request    = 2 * IBNAL_RX_MSGS,
1015                         .max_outstanding_receive_request = IBNAL_RX_MSGS,
1016                         .max_send_gather_element         = 1,
1017                         .max_receive_scatter_element     = 1,
1018                 },
1019                 .pd              = kibnal_data.kib_pd,
1020                 .send_queue      = kibnal_data.kib_cq,
1021                 .receive_queue   = kibnal_data.kib_cq,
1022                 .send_policy     = IB_WQ_SIGNAL_SELECTABLE,
1023                 .receive_policy  = IB_WQ_SIGNAL_SELECTABLE,
1024                 .rd_domain       = 0,
1025                 .transport       = IB_TRANSPORT_RC,
1026                 .device_specific = NULL,
1027         };
1028         
1029         rc = ib_qp_create (&params.qp_create, &conn->ibc_qp, &conn->ibc_qpn);
1030         if (rc != 0) {
1031                 CERROR ("Failed to create queue pair: %d\n", rc);
1032                 goto failed;
1033         }
1034         
1035         /* Mark QP created */
1036         conn->ibc_state = IBNAL_CONN_INIT_QP;
1037
1038         params.qp_attr = (struct ib_qp_attribute) {
1039                 .state             = IB_QP_STATE_INIT,
1040                 .port              = kibnal_data.kib_port,
1041                 .enable_rdma_read  = 1,
1042                 .enable_rdma_write = 1,
1043                 .valid_fields      = (IB_QP_ATTRIBUTE_STATE |
1044                                       IB_QP_ATTRIBUTE_PORT |
1045                                       IB_QP_ATTRIBUTE_PKEY_INDEX |
1046                                       IB_QP_ATTRIBUTE_RDMA_ATOMIC_ENABLE),
1047         };
1048         rc = ib_qp_modify(conn->ibc_qp, &params.qp_attr);
1049         if (rc != 0) {
1050                 CERROR ("Failed to modify queue pair: %d\n", rc);
1051                 goto failed;
1052         }
1053
1054         /* 1 ref for caller */
1055         atomic_set (&conn->ibc_refcount, 1);
1056         return (conn);
1057         
1058  failed:
1059         kibnal_destroy_conn (conn);
1060         return (NULL);
1061 }
1062
1063 void
1064 kibnal_destroy_conn (kib_conn_t *conn)
1065 {
1066         int    rc;
1067         
1068         CDEBUG (D_NET, "connection %p\n", conn);
1069
1070         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1071         LASSERT (list_empty(&conn->ibc_tx_queue));
1072         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1073         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1074         LASSERT (list_empty(&conn->ibc_active_txs));
1075         LASSERT (conn->ibc_nsends_posted == 0);
1076         LASSERT (conn->ibc_connreq == NULL);
1077
1078         switch (conn->ibc_state) {
1079         case IBNAL_CONN_ZOMBIE:
1080                 /* called after connection sequence initiated */
1081
1082         case IBNAL_CONN_INIT_QP:
1083                 rc = ib_qp_destroy(conn->ibc_qp);
1084                 if (rc != 0)
1085                         CERROR("Can't destroy QP: %d\n", rc);
1086                 /* fall through */
1087                 
1088         case IBNAL_CONN_INIT_NOTHING:
1089                 break;
1090
1091         default:
1092                 LASSERT (0);
1093         }
1094
1095         if (conn->ibc_rx_pages != NULL) 
1096                 kibnal_free_pages(conn->ibc_rx_pages);
1097         
1098         if (conn->ibc_rxs != NULL)
1099                 LIBCFS_FREE(conn->ibc_rxs, 
1100                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1101
1102         if (conn->ibc_peer != NULL)
1103                 kibnal_peer_decref(conn->ibc_peer);
1104
1105         LIBCFS_FREE(conn, sizeof (*conn));
1106
1107         atomic_dec(&kibnal_data.kib_nconns);
1108         
1109         if (atomic_read (&kibnal_data.kib_nconns) == 0 &&
1110             kibnal_data.kib_shutdown) {
1111                 /* I just nuked the last connection on shutdown; wake up
1112                  * everyone so they can exit. */
1113                 wake_up_all(&kibnal_data.kib_sched_waitq);
1114                 wake_up_all(&kibnal_data.kib_reaper_waitq);
1115         }
1116 }
1117
1118 int
1119 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1120 {
1121         kib_conn_t         *conn;
1122         struct list_head   *ctmp;
1123         struct list_head   *cnxt;
1124         int                 count = 0;
1125
1126         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1127                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1128
1129                 count++;
1130                 kibnal_close_conn_locked (conn, why);
1131         }
1132
1133         return (count);
1134 }
1135
1136 int
1137 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1138 {
1139         kib_conn_t         *conn;
1140         struct list_head   *ctmp;
1141         struct list_head   *cnxt;
1142         int                 count = 0;
1143
1144         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1145                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1146
1147                 if (conn->ibc_incarnation == incarnation)
1148                         continue;
1149
1150                 CDEBUG(D_NET, "Closing stale conn %p nid: %s"
1151                        " incarnation:"LPX64"("LPX64")\n", conn,
1152                        libcfs_nid2str(peer->ibp_nid), 
1153                        conn->ibc_incarnation, incarnation);
1154                 
1155                 count++;
1156                 kibnal_close_conn_locked (conn, -ESTALE);
1157         }
1158
1159         return (count);
1160 }
1161
1162 int
1163 kibnal_close_matching_conns (lnet_nid_t nid)
1164 {
1165         unsigned long       flags;
1166         kib_peer_t         *peer;
1167         struct list_head   *ptmp;
1168         struct list_head   *pnxt;
1169         int                 lo;
1170         int                 hi;
1171         int                 i;
1172         int                 count = 0;
1173
1174         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1175
1176         if (nid != LNET_NID_ANY)
1177                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1178         else {
1179                 lo = 0;
1180                 hi = kibnal_data.kib_peer_hash_size - 1;
1181         }
1182
1183         for (i = lo; i <= hi; i++) {
1184                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1185
1186                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1187                         LASSERT (peer->ibp_persistence != 0 ||
1188                                  peer->ibp_connecting != 0 ||
1189                                  peer->ibp_accepting != 0 ||
1190                                  !list_empty (&peer->ibp_conns));
1191
1192                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1193                                 continue;
1194
1195                         count += kibnal_close_peer_conns_locked (peer, 0);
1196                 }
1197         }
1198
1199         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1200
1201         /* wildcards always succeed */
1202         if (nid == LNET_NID_ANY)
1203                 return (0);
1204         
1205         return (count == 0 ? -ENOENT : 0);
1206 }
1207
1208 int
1209 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1210 {
1211         struct libcfs_ioctl_data *data = arg;
1212         int                       rc = -EINVAL;
1213
1214         LASSERT (ni == kibnal_data.kib_ni);
1215
1216         switch(cmd) {
1217         case IOC_LIBCFS_GET_PEER: {
1218                 lnet_nid_t   nid = 0;
1219                 __u32       ip = 0;
1220                 int         port = 0;
1221                 int         share_count = 0;
1222
1223                 rc = kibnal_get_peer_info(data->ioc_count,
1224                                           &nid, &ip, &port, &share_count);
1225                 data->ioc_nid    = nid;
1226                 data->ioc_count  = share_count;
1227                 data->ioc_u32[0] = ip;
1228                 data->ioc_u32[1] = port;
1229                 break;
1230         }
1231         case IOC_LIBCFS_ADD_PEER: {
1232                 rc = kibnal_add_persistent_peer (data->ioc_nid,
1233                                                  data->ioc_u32[0], /* IP */
1234                                                  data->ioc_u32[1]); /* port */
1235                 break;
1236         }
1237         case IOC_LIBCFS_DEL_PEER: {
1238                 rc = kibnal_del_peer (data->ioc_nid);
1239                 break;
1240         }
1241         case IOC_LIBCFS_GET_CONN: {
1242                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1243
1244                 if (conn == NULL)
1245                         rc = -ENOENT;
1246                 else {
1247                         rc = 0;
1248                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1249                         kibnal_conn_decref(conn);
1250                 }
1251                 break;
1252         }
1253         case IOC_LIBCFS_CLOSE_CONNECTION: {
1254                 rc = kibnal_close_matching_conns (data->ioc_nid);
1255                 break;
1256         }
1257         case IOC_LIBCFS_REGISTER_MYNID: {
1258                 /* Ignore if this is a noop */
1259                 if (data->ioc_nid == ni->ni_nid) {
1260                         rc = 0;
1261                 } else {
1262                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1263                                libcfs_nid2str(data->ioc_nid),
1264                                libcfs_nid2str(ni->ni_nid));
1265                         rc = -EINVAL;
1266                 }
1267                 break;
1268         }
1269         }
1270
1271         return rc;
1272 }
1273
1274 void
1275 kibnal_free_pages (kib_pages_t *p)
1276 {
1277         int     npages = p->ibp_npages;
1278         int     rc;
1279         int     i;
1280         
1281         if (p->ibp_mapped) {
1282                 rc = ib_memory_deregister(p->ibp_handle);
1283                 if (rc != 0)
1284                         CERROR ("Deregister error: %d\n", rc);
1285         }
1286         
1287         for (i = 0; i < npages; i++)
1288                 if (p->ibp_pages[i] != NULL)
1289                         __free_page(p->ibp_pages[i]);
1290         
1291         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1292 }
1293
1294 int
1295 kibnal_alloc_pages (kib_pages_t **pp, int npages, int access)
1296 {
1297         kib_pages_t                *p;
1298         struct ib_physical_buffer  *phys_pages;
1299         int                         i;
1300         int                         rc;
1301
1302         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1303         if (p == NULL) {
1304                 CERROR ("Can't allocate buffer %d\n", npages);
1305                 return (-ENOMEM);
1306         }
1307
1308         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1309         p->ibp_npages = npages;
1310         
1311         for (i = 0; i < npages; i++) {
1312                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1313                 if (p->ibp_pages[i] == NULL) {
1314                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1315                         kibnal_free_pages(p);
1316                         return (-ENOMEM);
1317                 }
1318         }
1319
1320         LIBCFS_ALLOC(phys_pages, npages * sizeof(*phys_pages));
1321         if (phys_pages == NULL) {
1322                 CERROR ("Can't allocate physarray for %d pages\n", npages);
1323                 kibnal_free_pages(p);
1324                 return (-ENOMEM);
1325         }
1326
1327         for (i = 0; i < npages; i++) {
1328                 phys_pages[i].size = PAGE_SIZE;
1329                 phys_pages[i].address =
1330                         lnet_page2phys(p->ibp_pages[i]);
1331         }
1332
1333         p->ibp_vaddr = 0;
1334         rc = ib_memory_register_physical(kibnal_data.kib_pd,
1335                                          phys_pages, npages,
1336                                          &p->ibp_vaddr,
1337                                          npages * PAGE_SIZE, 0,
1338                                          access,
1339                                          &p->ibp_handle,
1340                                          &p->ibp_lkey,
1341                                          &p->ibp_rkey);
1342         
1343         LIBCFS_FREE(phys_pages, npages * sizeof(*phys_pages));
1344         
1345         if (rc != 0) {
1346                 CERROR ("Error %d mapping %d pages\n", rc, npages);
1347                 kibnal_free_pages(p);
1348                 return (rc);
1349         }
1350         
1351         p->ibp_mapped = 1;
1352         *pp = p;
1353         return (0);
1354 }
1355
1356 int
1357 kibnal_setup_tx_descs (void)
1358 {
1359         int           ipage = 0;
1360         int           page_offset = 0;
1361         __u64         vaddr;
1362         __u64         vaddr_base;
1363         struct page  *page;
1364         kib_tx_t     *tx;
1365         int           i;
1366         int           rc;
1367
1368         /* pre-mapped messages are not bigger than 1 page */
1369         LASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1370
1371         /* No fancy arithmetic when we do the buffer calculations */
1372         LASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1373
1374         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1375                                 IBNAL_TX_MSG_PAGES(), 
1376                                 0);            /* local read access only */
1377         if (rc != 0)
1378                 return (rc);
1379
1380         vaddr = vaddr_base = kibnal_data.kib_tx_pages->ibp_vaddr;
1381
1382         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1383                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1384                 tx = &kibnal_data.kib_tx_descs[i];
1385
1386                 memset (tx, 0, sizeof(*tx));    /* zero flags etc */
1387                 
1388                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
1389                 tx->tx_vaddr = vaddr;
1390                 tx->tx_mapped = KIB_TX_UNMAPPED;
1391
1392                 CDEBUG(D_NET, "Tx[%d] %p->%p - "LPX64"\n", 
1393                        i, tx, tx->tx_msg, tx->tx_vaddr);
1394
1395                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1396
1397                 vaddr += IBNAL_MSG_SIZE;
1398                 LASSERT (vaddr <= vaddr_base + IBNAL_TX_MSG_BYTES());
1399
1400                 page_offset += IBNAL_MSG_SIZE;
1401                 LASSERT (page_offset <= PAGE_SIZE);
1402
1403                 if (page_offset == PAGE_SIZE) {
1404                         page_offset = 0;
1405                         ipage++;
1406                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1407                 }
1408         }
1409         
1410         return (0);
1411 }
1412
1413 void
1414 kibnal_shutdown (lnet_ni_t *ni)
1415 {
1416         int           i;
1417         int           rc;
1418         unsigned long flags;
1419
1420         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1421                atomic_read (&libcfs_kmemory));
1422
1423         LASSERT(ni == kibnal_data.kib_ni);
1424         LASSERT(ni->ni_data == &kibnal_data);
1425
1426         switch (kibnal_data.kib_init) {
1427         default:
1428                 CERROR ("Unexpected state %d\n", kibnal_data.kib_init);
1429                 LBUG();
1430
1431         case IBNAL_INIT_ALL:
1432                 /* Prevent new peers from being created */
1433                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1434                 kibnal_data.kib_nonewpeers = 1;
1435                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1436
1437                 kibnal_stop_ib_listener();
1438
1439                 /* Remove all existing peers from the peer table */
1440                 kibnal_del_peer(LNET_NID_ANY);
1441                 
1442                 /* Wait for pending conn reqs to be handled */
1443                 i = 2;
1444                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1445                 while (!list_empty(&kibnal_data.kib_connd_acceptq)) {
1446                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, 
1447                                                flags);
1448                         i++;
1449                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1450                                "waiting for conn reqs to clean up\n");
1451                         cfs_pause(cfs_time_seconds(1));
1452                         
1453                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1454                 }
1455                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1456
1457                 /* Wait for all peer state to clean up */
1458                 i = 2;
1459                 while (atomic_read(&kibnal_data.kib_npeers) != 0) {
1460                         i++;
1461                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1462                                "waiting for %d peers to close down\n",
1463                                atomic_read(&kibnal_data.kib_npeers));
1464                         cfs_pause(cfs_time_seconds(1));
1465                 }
1466                 /* fall through */
1467
1468         case IBNAL_INIT_CQ:
1469                 rc = ib_cq_destroy (kibnal_data.kib_cq);
1470                 if (rc != 0)
1471                         CERROR ("Destroy CQ error: %d\n", rc);
1472                 /* fall through */
1473
1474         case IBNAL_INIT_TXD:
1475                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1476                 /* fall through */
1477 #if IBNAL_FMR
1478         case IBNAL_INIT_FMR:
1479                 rc = ib_fmr_pool_destroy (kibnal_data.kib_fmr_pool);
1480                 if (rc != 0)
1481                         CERROR ("Destroy FMR pool error: %d\n", rc);
1482                 /* fall through */
1483 #endif
1484         case IBNAL_INIT_PD:
1485                 rc = ib_pd_destroy(kibnal_data.kib_pd);
1486                 if (rc != 0)
1487                         CERROR ("Destroy PD error: %d\n", rc);
1488                 /* fall through */
1489
1490         case IBNAL_INIT_DATA:
1491                 /* Module refcount only gets to zero when all peers
1492                  * have been closed so all lists must be empty */
1493                 LASSERT (atomic_read(&kibnal_data.kib_npeers) == 0);
1494                 LASSERT (kibnal_data.kib_peers != NULL);
1495                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1496                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1497                 }
1498                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1499                 LASSERT (list_empty (&kibnal_data.kib_sched_rxq));
1500                 LASSERT (list_empty (&kibnal_data.kib_sched_txq));
1501                 LASSERT (list_empty (&kibnal_data.kib_reaper_conns));
1502                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1503                 LASSERT (list_empty (&kibnal_data.kib_connd_acceptq));
1504
1505                 /* flag threads to terminate; wake and wait for them to die */
1506                 kibnal_data.kib_shutdown = 1;
1507                 wake_up_all (&kibnal_data.kib_sched_waitq);
1508                 wake_up_all (&kibnal_data.kib_reaper_waitq);
1509                 wake_up_all (&kibnal_data.kib_connd_waitq);
1510
1511                 i = 2;
1512                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1513                         i++;
1514                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1515                                "Waiting for %d threads to terminate\n",
1516                                atomic_read (&kibnal_data.kib_nthreads));
1517                         cfs_pause(cfs_time_seconds(1));
1518                 }
1519                 /* fall through */
1520                 
1521         case IBNAL_INIT_NOTHING:
1522                 break;
1523         }
1524
1525         if (kibnal_data.kib_tx_descs != NULL)
1526                 LIBCFS_FREE (kibnal_data.kib_tx_descs,
1527                              IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1528
1529         if (kibnal_data.kib_peers != NULL)
1530                 LIBCFS_FREE (kibnal_data.kib_peers,
1531                              sizeof (struct list_head) * 
1532                              kibnal_data.kib_peer_hash_size);
1533
1534         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1535                atomic_read (&libcfs_kmemory));
1536
1537         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1538         PORTAL_MODULE_UNUSE;
1539 }
1540
1541 int
1542 kibnal_get_ipoibidx(void)
1543 {
1544         /* NB single threaded! */
1545         static struct ib_port_properties port_props;
1546
1547         int               ipoibidx = 0;
1548         int               devidx;
1549         int               port;
1550         int               rc;
1551         struct ib_device *device;
1552
1553         for (devidx = 0; devidx <= kibnal_data.kib_hca_idx; devidx++) {
1554                 device = ib_device_get_by_index(devidx);
1555                 
1556                 if (device == NULL) {
1557                         CERROR("Can't get IB device %d\n", devidx);
1558                         return -1;
1559                 }
1560                 
1561                 for (port = 1; port <= 2; port++) {
1562                         if (devidx == kibnal_data.kib_hca_idx &&
1563                             port == kibnal_data.kib_port)
1564                                 return ipoibidx;
1565                         
1566                         rc = ib_port_properties_get(device, port,
1567                                                     &port_props);
1568                         if (rc == 0)
1569                                 ipoibidx++;
1570                 }
1571         }
1572
1573         LBUG();
1574         return -1;
1575 }
1576
1577 int
1578 kibnal_startup (lnet_ni_t *ni)
1579 {
1580         char              ipif_name[32];
1581         __u32             ip;
1582         __u32             netmask;
1583         int               up;
1584         struct timeval    tv;
1585         int               rc;
1586         int               hca;
1587         int               port;
1588         int               i;
1589         int               nob;
1590
1591         LASSERT (ni->ni_lnd == &the_kiblnd);
1592
1593         /* Only 1 instance supported */
1594         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1595                 CERROR ("Only 1 instance supported\n");
1596                 return -EPERM;
1597         }
1598
1599         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1600                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1601                         *kibnal_tunables.kib_credits,
1602                         *kibnal_tunables.kib_ntx);
1603                 return -EINVAL;
1604         }
1605
1606         memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */
1607
1608         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1609         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1610
1611         CLASSERT (LNET_MAX_INTERFACES > 1);
1612
1613
1614         kibnal_data.kib_hca_idx = 0;            /* default: first HCA */
1615         kibnal_data.kib_port = 0;               /* any port */
1616
1617         if (ni->ni_interfaces[0] != NULL) {
1618                 /* hca.port specified in 'networks=openib(h.p)' */
1619                 if (ni->ni_interfaces[1] != NULL) {
1620                         CERROR("Multiple interfaces not supported\n");
1621                         return -EPERM;
1622                 }
1623                 
1624                 nob = strlen(ni->ni_interfaces[0]);
1625                 i = sscanf(ni->ni_interfaces[0], "%d.%d%n", &hca, &port, &nob);
1626                 if (i >= 2 && nob == strlen(ni->ni_interfaces[0])) {
1627                         kibnal_data.kib_hca_idx = hca;
1628                         kibnal_data.kib_port = port;
1629                 } else {
1630                         nob = strlen(ni->ni_interfaces[0]);
1631                         i = sscanf(ni->ni_interfaces[0], "%d%n", &hca, &nob);
1632
1633                         if (i >= 1 && nob == strlen(ni->ni_interfaces[0])) {
1634                                 kibnal_data.kib_hca_idx = hca;
1635                         } else {
1636                                 CERROR("Can't parse interface '%s'\n",
1637                                        ni->ni_interfaces[0]);
1638                                 return -EINVAL;
1639                         }
1640                 }
1641         }
1642         
1643         kibnal_data.kib_ni = ni;
1644         ni->ni_data = &kibnal_data;
1645         
1646         do_gettimeofday(&tv);
1647         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1648
1649         PORTAL_MODULE_USE;
1650
1651         rwlock_init(&kibnal_data.kib_global_lock);
1652
1653         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1654         LIBCFS_ALLOC (kibnal_data.kib_peers,
1655                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1656         if (kibnal_data.kib_peers == NULL) {
1657                 goto failed;
1658         }
1659         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1660                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1661
1662         spin_lock_init (&kibnal_data.kib_reaper_lock);
1663         INIT_LIST_HEAD (&kibnal_data.kib_reaper_conns);
1664         init_waitqueue_head (&kibnal_data.kib_reaper_waitq);
1665
1666         spin_lock_init (&kibnal_data.kib_connd_lock);
1667         INIT_LIST_HEAD (&kibnal_data.kib_connd_acceptq);
1668         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1669         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1670
1671         spin_lock_init (&kibnal_data.kib_sched_lock);
1672         INIT_LIST_HEAD (&kibnal_data.kib_sched_txq);
1673         INIT_LIST_HEAD (&kibnal_data.kib_sched_rxq);
1674         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1675
1676         spin_lock_init (&kibnal_data.kib_tx_lock);
1677         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1678
1679         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1680                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1681         if (kibnal_data.kib_tx_descs == NULL) {
1682                 CERROR ("Can't allocate tx descs\n");
1683                 goto failed;
1684         }
1685
1686         /* lists/ptrs/locks initialised */
1687         kibnal_data.kib_init = IBNAL_INIT_DATA;
1688         /*****************************************************/
1689
1690         for (i = 0; i < IBNAL_N_SCHED; i++) {
1691                 rc = kibnal_thread_start (kibnal_scheduler,
1692                                           (void *)((unsigned long)i));
1693                 if (rc != 0) {
1694                         CERROR("Can't spawn openibnal scheduler[%d]: %d\n",
1695                                i, rc);
1696                         goto failed;
1697                 }
1698         }
1699
1700         /* must have at least 2 connds to remain responsive to svcqry while
1701          * connecting */
1702         if (*kibnal_tunables.kib_n_connd < 2)
1703                 *kibnal_tunables.kib_n_connd = 2;
1704
1705
1706         for (i = 0; i < *kibnal_tunables.kib_n_connd; i++) {
1707                 rc = kibnal_thread_start (kibnal_connd,
1708                                           (void *)((unsigned long)i));
1709                 if (rc != 0) {
1710                         CERROR("Can't spawn openibnal connd[%d]: %d\n",
1711                                i, rc);
1712                         goto failed;
1713                 }
1714         }
1715
1716         rc = kibnal_thread_start (kibnal_reaper, NULL);
1717         if (rc != 0) {
1718                 CERROR ("Can't spawn openibnal reaper: %d\n", rc);
1719                 goto failed;
1720         }
1721
1722         kibnal_data.kib_device = ib_device_get_by_index(kibnal_data.kib_hca_idx);
1723         if (kibnal_data.kib_device == NULL) {
1724                 CERROR ("Can't open ib device %d\n",
1725                         kibnal_data.kib_hca_idx);
1726                 goto failed;
1727         }
1728         
1729         rc = ib_device_properties_get(kibnal_data.kib_device,
1730                                       &kibnal_data.kib_device_props);
1731         if (rc != 0) {
1732                 CERROR ("Can't get device props: %d\n", rc);
1733                 goto failed;
1734         }
1735
1736         CDEBUG(D_NET, "Max Initiator: %d Max Responder %d\n", 
1737                kibnal_data.kib_device_props.max_initiator_per_qp,
1738                kibnal_data.kib_device_props.max_responder_per_qp);
1739
1740         if (kibnal_data.kib_port != 0) {
1741                 rc = ib_port_properties_get(kibnal_data.kib_device, 
1742                                             kibnal_data.kib_port,
1743                                             &kibnal_data.kib_port_props);
1744                 if (rc != 0) {
1745                         CERROR("Error %d open port %d on HCA %d\n", rc,
1746                                kibnal_data.kib_port,
1747                                kibnal_data.kib_hca_idx);
1748                         goto failed;
1749                 }
1750         } else {
1751                 for (i = 1; i <= 2; i++) {
1752                         rc = ib_port_properties_get(kibnal_data.kib_device, i,
1753                                                     &kibnal_data.kib_port_props);
1754                         if (rc == 0) {
1755                                 kibnal_data.kib_port = i;
1756                                 break;
1757                         }
1758                 }
1759                 if (kibnal_data.kib_port == 0) {
1760                         CERROR ("Can't find a port\n");
1761                         goto failed;
1762                 }
1763         }
1764
1765         i = kibnal_get_ipoibidx();
1766         if (i < 0)
1767                 goto failed;
1768         
1769         snprintf(ipif_name, sizeof(ipif_name), "%s%d",
1770                  *kibnal_tunables.kib_ipif_basename, i);
1771         if (strlen(ipif_name) == sizeof(ipif_name) - 1) {
1772                 CERROR("IPoIB interface name %s truncated\n", ipif_name);
1773                 return -EINVAL;
1774         }
1775         
1776         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1777         if (rc != 0) {
1778                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1779                 goto failed;
1780         }
1781         
1782         if (!up) {
1783                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1784                 goto failed;
1785         }
1786         
1787         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1788
1789         rc = ib_pd_create(kibnal_data.kib_device,
1790                           NULL, &kibnal_data.kib_pd);
1791         if (rc != 0) {
1792                 CERROR ("Can't create PD: %d\n", rc);
1793                 goto failed;
1794         }
1795         
1796         /* flag PD initialised */
1797         kibnal_data.kib_init = IBNAL_INIT_PD;
1798         /*****************************************************/
1799 #if IBNAL_FMR
1800         {
1801                 const int pool_size = *kibnal_tunables.kib_ntx;
1802                 struct ib_fmr_pool_param params = {
1803                         .max_pages_per_fmr = LNET_MAX_PAYLOAD/PAGE_SIZE,
1804                         .access            = (IB_ACCESS_LOCAL_WRITE |
1805                                               IB_ACCESS_REMOTE_WRITE |
1806                                               IB_ACCESS_REMOTE_READ),
1807                         .pool_size         = pool_size,
1808                         .dirty_watermark   = (pool_size * 3)/4,
1809                         .flush_function    = NULL,
1810                         .flush_arg         = NULL,
1811                         .cache             = 1,
1812                 };
1813                 rc = ib_fmr_pool_create(kibnal_data.kib_pd, &params,
1814                                         &kibnal_data.kib_fmr_pool);
1815                 if (rc != 0) {
1816                         CERROR ("Can't create FMR pool size %d: %d\n", 
1817                                 pool_size, rc);
1818                         goto failed;
1819                 }
1820         }
1821
1822         /* flag FMR pool initialised */
1823         kibnal_data.kib_init = IBNAL_INIT_FMR;
1824 #endif
1825         /*****************************************************/
1826
1827         rc = kibnal_setup_tx_descs();
1828         if (rc != 0) {
1829                 CERROR ("Can't register tx descs: %d\n", rc);
1830                 goto failed;
1831         }
1832         
1833         /* flag TX descs initialised */
1834         kibnal_data.kib_init = IBNAL_INIT_TXD;
1835         /*****************************************************/
1836         
1837         {
1838                 struct ib_cq_callback callback = {
1839                         .context        = IBNAL_CALLBACK_CTXT,
1840                         .policy         = IB_CQ_PROVIDER_REARM,
1841                         .function       = {
1842                                 .entry  = kibnal_callback,
1843                         },
1844                         .arg            = NULL,
1845                 };
1846                 int  nentries = IBNAL_CQ_ENTRIES();
1847                 
1848                 rc = ib_cq_create (kibnal_data.kib_device, 
1849                                    &nentries, &callback, NULL,
1850                                    &kibnal_data.kib_cq);
1851                 if (rc != 0) {
1852                         CERROR ("Can't create CQ: %d\n", rc);
1853                         goto failed;
1854                 }
1855
1856                 /* I only want solicited events */
1857                 rc = ib_cq_request_notification(kibnal_data.kib_cq, 1);
1858                 LASSERT (rc == 0);
1859         }
1860
1861         /* flag CQ initialised */
1862         kibnal_data.kib_init = IBNAL_INIT_CQ;
1863         /*****************************************************/
1864
1865         rc = kibnal_start_ib_listener();
1866         if (rc != 0)
1867                 goto failed;
1868         
1869         /* flag everything initialised */
1870         kibnal_data.kib_init = IBNAL_INIT_ALL;
1871         /*****************************************************/
1872
1873         return 0;
1874
1875  failed:
1876         kibnal_shutdown(ni);    
1877         return -ENETDOWN;
1878 }
1879
1880 void __exit
1881 kibnal_module_fini (void)
1882 {
1883         lnet_unregister_lnd(&the_kiblnd);
1884         kibnal_tunables_fini();
1885 }
1886
1887 int __init
1888 kibnal_module_init (void)
1889 {
1890         int    rc;
1891
1892         rc = kibnal_tunables_init();
1893         if (rc != 0)
1894                 return rc;
1895         
1896         lnet_register_lnd(&the_kiblnd);
1897
1898         return (0);
1899 }
1900
1901 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1902 #ifdef USING_TSAPI
1903 MODULE_DESCRIPTION("Kernel Cisco IB LND v1.00");
1904 #else
1905 MODULE_DESCRIPTION("Kernel OpenIB(gen1) LND v1.00");
1906 #endif
1907 MODULE_LICENSE("GPL");
1908
1909 module_init(kibnal_module_init);
1910 module_exit(kibnal_module_fini);