Whamcloud - gitweb
Mass conversion of all copyright messages to Oracle.
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/openiblnd/openiblnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "openiblnd.h"
42
43 lnd_t the_kiblnd = {
44 #ifdef USING_TSAPI
45         .lnd_type       = CIBLND,
46 #else
47         .lnd_type       = OPENIBLND,
48 #endif
49         .lnd_startup    = kibnal_startup,
50         .lnd_shutdown   = kibnal_shutdown,
51         .lnd_ctl        = kibnal_ctl,
52         .lnd_send       = kibnal_send,
53         .lnd_recv       = kibnal_recv,
54         .lnd_eager_recv = kibnal_eager_recv,
55         .lnd_accept     = kibnal_accept,
56 };
57
58 kib_data_t              kibnal_data;
59
60 __u32 
61 kibnal_cksum (void *ptr, int nob)
62 {
63         char  *c  = ptr;
64         __u32  sum = 0;
65
66         while (nob-- > 0)
67                 sum = ((sum << 1) | (sum >> 31)) + *c++;
68
69         /* ensure I don't return 0 (== no checksum) */
70         return (sum == 0) ? 1 : sum;
71 }
72
73 void
74 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
75 {
76         msg->ibm_type = type;
77         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
78 }
79
80 void
81 kibnal_pack_msg(kib_msg_t *msg, int version, int credits, 
82                 lnet_nid_t dstnid, __u64 dststamp)
83 {
84         /* CAVEAT EMPTOR! all message fields not set here should have been
85          * initialised previously. */
86         msg->ibm_magic    = IBNAL_MSG_MAGIC;
87         msg->ibm_version  = version;
88         /*   ibm_type */
89         msg->ibm_credits  = credits;
90         /*   ibm_nob */
91         msg->ibm_cksum    = 0;
92         msg->ibm_srcnid   = kibnal_data.kib_ni->ni_nid;
93         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
94         msg->ibm_dstnid   = dstnid;
95         msg->ibm_dststamp = dststamp;
96
97         if (*kibnal_tunables.kib_cksum) {
98                 /* NB ibm_cksum zero while computing cksum */
99                 msg->ibm_cksum    = kibnal_cksum(msg, msg->ibm_nob);
100         }
101 }
102
103 int
104 kibnal_unpack_msg(kib_msg_t *msg, int expected_version, int nob)
105 {
106         const int hdr_size = offsetof(kib_msg_t, ibm_u);
107         __u32     msg_cksum;
108         int       msg_version;
109         int       flip;
110         int       msg_nob;
111
112         if (nob < 6) {
113                 CERROR("Short message: %d\n", nob);
114                 return -EPROTO;
115         }
116
117         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
118                 flip = 0;
119         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
120                 flip = 1;
121         } else {
122                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
123                 return -EPROTO;
124         }
125
126         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
127         if ((expected_version == 0) ?
128             (msg_version != IBNAL_MSG_VERSION &&
129              msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) :
130             (msg_version != expected_version)) {
131                 CERROR("Bad version: %x\n", msg_version);
132                 return -EPROTO;
133         }
134
135         if (nob < hdr_size) {
136                 CERROR("Short message: %d\n", nob);
137                 return -EPROTO;
138         }
139
140         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
141         if (msg_nob > nob) {
142                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
143                 return -EPROTO;
144         }
145
146         /* checksum must be computed with ibm_cksum zero and BEFORE anything
147          * gets flipped */
148         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
149         msg->ibm_cksum = 0;
150         if (msg_cksum != 0 &&
151             msg_cksum != kibnal_cksum(msg, msg_nob)) {
152                 CERROR("Bad checksum\n");
153                 return -EPROTO;
154         }
155         msg->ibm_cksum = msg_cksum;
156         
157         if (flip) {
158                 /* leave magic unflipped as a clue to peer endianness */
159                 msg->ibm_version = msg_version;
160                 LASSERT (sizeof(msg->ibm_type) == 1);
161                 LASSERT (sizeof(msg->ibm_credits) == 1);
162                 msg->ibm_nob = msg_nob;
163                 __swab64s(&msg->ibm_srcnid);
164                 __swab64s(&msg->ibm_srcstamp);
165                 __swab64s(&msg->ibm_dstnid);
166                 __swab64s(&msg->ibm_dststamp);
167         }
168         
169         if (msg->ibm_srcnid == LNET_NID_ANY) {
170                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
171                 return -EPROTO;
172         }
173
174         switch (msg->ibm_type) {
175         default:
176                 CERROR("Unknown message type %x\n", msg->ibm_type);
177                 return -EPROTO;
178                 
179         case IBNAL_MSG_SVCQRY:
180         case IBNAL_MSG_NOOP:
181                 break;
182
183         case IBNAL_MSG_SVCRSP:
184                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.svcrsp)) {
185                         CERROR("Short SVCRSP: %d(%d)\n", msg_nob,
186                                (int)(hdr_size + sizeof(msg->ibm_u.svcrsp)));
187                         return -EPROTO;
188                 }
189                 if (flip) {
190                         __swab64s(&msg->ibm_u.svcrsp.ibsr_svc_id);
191                         __swab16s(&msg->ibm_u.svcrsp.ibsr_svc_pkey);
192                 }
193                 break;
194
195         case IBNAL_MSG_CONNREQ:
196         case IBNAL_MSG_CONNACK:
197                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
198                         CERROR("Short CONNREQ: %d(%d)\n", msg_nob,
199                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
200                         return -EPROTO;
201                 }
202                 if (flip)
203                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
204                 break;
205
206         case IBNAL_MSG_IMMEDIATE:
207                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
208                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
209                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
210                         return -EPROTO;
211                 }
212                 break;
213
214         case IBNAL_MSG_PUT_RDMA:
215         case IBNAL_MSG_GET_RDMA:
216                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.rdma)) {
217                         CERROR("Short RDMA req: %d(%d)\n", msg_nob,
218                                (int)(hdr_size + sizeof(msg->ibm_u.rdma)));
219                         return -EPROTO;
220                 }
221                 if (flip) {
222                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_key);
223                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_nob);
224                         __swab64s(&msg->ibm_u.rdma.ibrm_desc.rd_addr);
225                 }
226                 break;
227
228         case IBNAL_MSG_PUT_DONE:
229         case IBNAL_MSG_GET_DONE:
230                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
231                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
232                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
233                         return -EPROTO;
234                 }
235                 if (flip)
236                         __swab32s(&msg->ibm_u.completion.ibcm_status);
237                 break;
238         }
239         return 0;
240 }
241
242 int
243 kibnal_make_svcqry (kib_conn_t *conn) 
244 {
245         kib_peer_t    *peer = conn->ibc_peer;
246         int            version = IBNAL_MSG_VERSION;
247         int            msg_version;
248         kib_msg_t     *msg;
249         struct socket *sock;
250         int            rc;
251         int            nob;
252
253         LASSERT (conn->ibc_connreq != NULL);
254         msg = &conn->ibc_connreq->cr_msg;
255
256  again:
257         kibnal_init_msg(msg, IBNAL_MSG_SVCQRY, 0);
258         kibnal_pack_msg(msg, version, 0, peer->ibp_nid, 0);
259
260         rc = lnet_connect(&sock, peer->ibp_nid,
261                           0, peer->ibp_ip, peer->ibp_port);
262         if (rc != 0)
263                 return -ECONNABORTED;
264         
265         rc = libcfs_sock_write(sock, msg, msg->ibm_nob,
266                                lnet_acceptor_timeout());
267         if (rc != 0) {
268                 CERROR("Error %d sending svcqry to %s at %u.%u.%u.%u/%d\n", 
269                        rc, libcfs_nid2str(peer->ibp_nid), 
270                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
271                 goto out;
272         }
273
274         /* The first 6 bytes are invariably MAGIC + proto version */
275         rc = libcfs_sock_read(sock, msg, 6, *kibnal_tunables.kib_timeout);
276         if (rc != 0) {
277                 CERROR("Error %d receiving svcrsp from %s at %u.%u.%u.%u/%d\n", 
278                        rc, libcfs_nid2str(peer->ibp_nid), 
279                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
280                 goto out;
281         }
282
283         if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
284             msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
285                 CERROR("Bad magic: %08x from %s at %u.%u.%u.%u/%d\n",
286                        msg->ibm_magic, libcfs_nid2str(peer->ibp_nid),
287                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
288                 rc = -EPROTO;
289                 goto out;
290         }
291
292         msg_version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ? 
293                       msg->ibm_version : __swab16(msg->ibm_version);
294         if (msg_version != version) {
295                 if (version == IBNAL_MSG_VERSION) {
296                         /* retry with previous version */
297                         libcfs_sock_release(sock);
298                         version = IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD;
299                         goto again;
300                 }
301                 
302                 CERROR("Bad version %x from %s at %u.%u.%u.%u/%d\n",
303                        msg_version, libcfs_nid2str(peer->ibp_nid),
304                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
305                 rc = -EPROTO;
306                 goto out;
307         }
308
309         /* Read in the rest of the message now we know the expected format */
310         nob = offsetof(kib_msg_t, ibm_u) + sizeof(kib_svcrsp_t);
311         rc = libcfs_sock_read(sock, ((char *)msg) + 6, nob - 6,
312                               *kibnal_tunables.kib_timeout);
313         if (rc != 0) {
314                 CERROR("Error %d receiving svcrsp from %s at %u.%u.%u.%u/%d\n", 
315                        rc, libcfs_nid2str(peer->ibp_nid), 
316                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
317                 goto out;
318         }
319
320         rc = kibnal_unpack_msg(msg, version, nob);
321         if (rc != 0) {
322                 CERROR("Error %d unpacking svcrsp from %s at %u.%u.%u.%u/%d\n", 
323                        rc, libcfs_nid2str(peer->ibp_nid), 
324                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
325                 goto out;
326         }
327                        
328         if (msg->ibm_type != IBNAL_MSG_SVCRSP) {
329                 CERROR("Unexpected response type %d from %s at %u.%u.%u.%u/%d\n", 
330                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid), 
331                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
332                 rc = -EPROTO;
333                 goto out;
334         }
335         
336         if (kibnal_data.kib_ni->ni_nid != msg->ibm_dstnid ||
337             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
338                 CERROR("Unexpected dst NID/stamp %s/"LPX64" from "
339                        "%s at %u.%u.%u.%u/%d\n", 
340                        libcfs_nid2str(msg->ibm_dstnid), msg->ibm_dststamp,
341                        libcfs_nid2str(peer->ibp_nid), HIPQUAD(peer->ibp_ip), 
342                        peer->ibp_port);
343                 rc = -EPROTO;
344                 goto out;
345         }
346
347         if (peer->ibp_nid != msg->ibm_srcnid) {
348                 CERROR("Unexpected src NID %s from %s at %u.%u.%u.%u/%d\n", 
349                        libcfs_nid2str(msg->ibm_srcnid),
350                        libcfs_nid2str(peer->ibp_nid), 
351                        HIPQUAD(peer->ibp_ip), peer->ibp_port);
352                 rc = -EPROTO;
353                 goto out;
354         }
355
356         conn->ibc_incarnation = msg->ibm_srcstamp;
357         conn->ibc_connreq->cr_svcrsp = msg->ibm_u.svcrsp;
358         conn->ibc_version = version;
359         
360  out:
361         libcfs_sock_release(sock);
362         return rc;
363 }
364
365 void
366 kibnal_handle_svcqry (struct socket *sock)
367 {
368         __u32                peer_ip;
369         unsigned int         peer_port;
370         kib_msg_t           *msg;
371         __u64                srcnid;
372         __u64                srcstamp;
373         int                  version;
374         int                  reject = 0;
375         int                  rc;
376
377         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
378         if (rc != 0) {
379                 CERROR("Can't get peer's IP: %d\n", rc);
380                 return;
381         }
382
383         LIBCFS_ALLOC(msg, sizeof(*msg));
384         if (msg == NULL) {
385                 CERROR("Can't allocate msgs for %u.%u.%u.%u/%d\n",
386                        HIPQUAD(peer_ip), peer_port);
387                 return;
388         }
389         
390         rc = libcfs_sock_read(sock, &msg->ibm_magic, sizeof(msg->ibm_magic),
391                               lnet_acceptor_timeout());
392         if (rc != 0) {
393                 CERROR("Error %d receiving svcqry(1) from %u.%u.%u.%u/%d\n",
394                        rc, HIPQUAD(peer_ip), peer_port);
395                 goto out;
396         }
397
398         if (msg->ibm_magic != IBNAL_MSG_MAGIC &&
399             msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
400                 /* Unexpected magic! */
401                 if (msg->ibm_magic == LNET_PROTO_MAGIC ||
402                     msg->ibm_magic == __swab32(LNET_PROTO_MAGIC)) {
403                         /* future protocol version compatibility!  When LNET
404                          * unifies protocols over all LNDs, the first thing
405                          * sent will be a version query.  I send back a reply
406                          * in my current protocol to tell her I'm "old" */
407                         kibnal_init_msg(msg, 0, 0);
408                         kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, 
409                                         LNET_NID_ANY, 0);
410                         reject = 1;
411                         goto reply;
412                 }
413
414                 CERROR ("Bad magic(1) %#08x (%#08x expected) from "
415                         "%u.%u.%u.%u/%d\n", msg->ibm_magic,
416                         IBNAL_MSG_MAGIC, HIPQUAD(peer_ip), peer_port);
417                 goto out;
418         }
419
420         /* Now check version */
421
422         rc = libcfs_sock_read(sock, &msg->ibm_version, sizeof(msg->ibm_version),
423                               lnet_acceptor_timeout());
424         if (rc != 0) {
425                 CERROR("Error %d receiving svcqry(2) from %u.%u.%u.%u/%d\n",
426                        rc, HIPQUAD(peer_ip), peer_port);
427                 goto out;
428         }
429
430         version = (msg->ibm_magic == IBNAL_MSG_MAGIC) ?
431                   msg->ibm_version : __swab16(msg->ibm_version);
432         /* Peer is a different protocol version: reply in my current protocol
433          * to tell her I'm "old" */
434         if (version != IBNAL_MSG_VERSION &&
435             version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {
436                 kibnal_init_msg(msg, 0, 0);
437                 kibnal_pack_msg(msg, IBNAL_MSG_VERSION, 0, LNET_NID_ANY, 0);
438                 reject = 1;
439                 goto reply;
440         }
441         
442         /* Now read in all the rest */
443         rc = libcfs_sock_read(sock, &msg->ibm_type,
444                               offsetof(kib_msg_t, ibm_u) -
445                               offsetof(kib_msg_t, ibm_type),
446                               lnet_acceptor_timeout());
447         if (rc != 0) {
448                 CERROR("Error %d receiving svcqry(3) from %u.%u.%u.%u/%d\n",
449                        rc, HIPQUAD(peer_ip), peer_port);
450                 goto out;
451         }
452         
453         rc = kibnal_unpack_msg(msg, version, offsetof(kib_msg_t, ibm_u));
454         if (rc != 0) {
455                 CERROR("Error %d unpacking svcqry from %u.%u.%u.%u/%d\n",
456                        rc, HIPQUAD(peer_ip), peer_port);
457                 goto out;
458         }
459         
460         if (msg->ibm_type != IBNAL_MSG_SVCQRY) {
461                 CERROR("Unexpected message %d from %u.%u.%u.%u/%d\n",
462                        msg->ibm_type, HIPQUAD(peer_ip), peer_port);
463                 goto out;
464         }
465         
466         if (kibnal_data.kib_ni->ni_nid != msg->ibm_dstnid) {
467                 CERROR("Unexpected dstnid %s: expected %s from %u.%u.%u.%u/%d\n",
468                        libcfs_nid2str(msg->ibm_dstnid),
469                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid),
470                        HIPQUAD(peer_ip), peer_port);
471                 goto out;
472         }
473
474         srcnid = msg->ibm_srcnid;
475         srcstamp = msg->ibm_srcstamp;
476         
477         kibnal_init_msg(msg, IBNAL_MSG_SVCRSP, sizeof(msg->ibm_u.svcrsp));
478
479         msg->ibm_u.svcrsp.ibsr_svc_id = kibnal_data.kib_svc_id;
480         memcpy(msg->ibm_u.svcrsp.ibsr_svc_gid, kibnal_data.kib_svc_gid,
481                sizeof(kibnal_data.kib_svc_gid));
482         msg->ibm_u.svcrsp.ibsr_svc_pkey = kibnal_data.kib_svc_pkey;
483
484         kibnal_pack_msg(msg, version, 0, srcnid, srcstamp);
485
486  reply:
487         rc = libcfs_sock_write (sock, msg, msg->ibm_nob,
488                                 lnet_acceptor_timeout());
489         if (!reject && rc != 0) {
490                 /* Only complain if we're not rejecting */
491                 CERROR("Error %d replying to svcqry from %u.%u.%u.%u/%d\n",
492                        rc, HIPQUAD(peer_ip), peer_port);
493         }
494         
495  out:
496         LIBCFS_FREE(msg, sizeof(*msg));
497 }
498
499 void
500 kibnal_free_acceptsock (kib_acceptsock_t *as)
501 {
502         libcfs_sock_release(as->ibas_sock);
503         LIBCFS_FREE(as, sizeof(*as));
504 }
505
506 int
507 kibnal_accept(lnet_ni_t *ni, struct socket *sock)
508 {
509         kib_acceptsock_t  *as;
510         unsigned long      flags;
511
512         LIBCFS_ALLOC(as, sizeof(*as));
513         if (as == NULL) {
514                 CERROR("Out of Memory\n");
515                 return -ENOMEM;
516         }
517
518         as->ibas_sock = sock;
519                 
520         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
521                 
522         list_add_tail(&as->ibas_list, &kibnal_data.kib_connd_acceptq);
523         wake_up(&kibnal_data.kib_connd_waitq);
524
525         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
526         return 0;
527 }
528
529 int
530 kibnal_start_ib_listener (void) 
531 {
532         int    rc;
533
534         LASSERT (kibnal_data.kib_listen_handle == NULL);
535
536         kibnal_data.kib_svc_id = ib_cm_service_assign();
537         CDEBUG(D_NET, "svc id "LPX64"\n", kibnal_data.kib_svc_id);
538
539         rc = ib_cached_gid_get(kibnal_data.kib_device,
540                                kibnal_data.kib_port, 0,
541                                kibnal_data.kib_svc_gid);
542         if (rc != 0) {
543                 CERROR("Can't get port %d GID: %d\n",
544                        kibnal_data.kib_port, rc);
545                 return rc;
546         }
547         
548         rc = ib_cached_pkey_get(kibnal_data.kib_device,
549                                 kibnal_data.kib_port, 0,
550                                 &kibnal_data.kib_svc_pkey);
551         if (rc != 0) {
552                 CERROR ("Can't get port %d PKEY: %d\n",
553                         kibnal_data.kib_port, rc);
554                 return rc;
555         }
556
557         rc = ib_cm_listen(kibnal_data.kib_svc_id,
558                           TS_IB_CM_SERVICE_EXACT_MASK,
559                           kibnal_passive_conn_callback, NULL,
560                           &kibnal_data.kib_listen_handle);
561         if (rc != 0) {
562                 kibnal_data.kib_listen_handle = NULL;
563                 CERROR ("Can't create IB listener: %d\n", rc);
564                 return rc;
565         }
566         
567         LASSERT (kibnal_data.kib_listen_handle != NULL);
568         return 0;
569 }
570
571 void
572 kibnal_stop_ib_listener (void) 
573 {
574         int    rc;
575         
576         LASSERT (kibnal_data.kib_listen_handle != NULL);
577
578         rc = ib_cm_listen_stop (kibnal_data.kib_listen_handle);
579         if (rc != 0)
580                 CERROR("Error stopping IB listener: %d\n", rc);
581                 
582         kibnal_data.kib_listen_handle = NULL;
583 }
584
585 int
586 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
587 {
588         kib_peer_t     *peer;
589         unsigned long   flags;
590         int             rc;
591
592         LASSERT (nid != LNET_NID_ANY);
593
594         LIBCFS_ALLOC(peer, sizeof (*peer));
595         if (peer == NULL) {
596                 CERROR("Cannot allocate peer\n");
597                 return -ENOMEM;
598         }
599
600         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
601
602         peer->ibp_nid = nid;
603         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
604
605         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
606         INIT_LIST_HEAD (&peer->ibp_conns);
607         INIT_LIST_HEAD (&peer->ibp_tx_queue);
608         INIT_LIST_HEAD (&peer->ibp_connd_list); /* not queued for connecting */
609
610         peer->ibp_error = 0;
611         peer->ibp_last_alive = cfs_time_current();
612         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
613
614         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
615
616         if (atomic_read(&kibnal_data.kib_npeers) >=
617             *kibnal_tunables.kib_concurrent_peers) {
618                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
619         } else if (kibnal_data.kib_nonewpeers) {
620                 rc = -ESHUTDOWN;        /* shutdown has started */
621         } else {
622                 rc = 0;
623                 /* npeers only grows with kib_global_lock held */
624                 atomic_inc(&kibnal_data.kib_npeers);
625         }
626         
627         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
628
629         if (rc != 0) {
630                 CERROR("Can't create peer: %s\n", 
631                        (rc == -ESHUTDOWN) ? "shutting down" : 
632                        "too many peers");
633                 LIBCFS_FREE(peer, sizeof(*peer));
634         } else {
635                 *peerp = peer;
636         }
637         
638         return rc;
639 }
640
641 void
642 kibnal_destroy_peer (kib_peer_t *peer)
643 {
644         CDEBUG (D_NET, "peer %s %p deleted\n", 
645                 libcfs_nid2str(peer->ibp_nid), peer);
646
647         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
648         LASSERT (peer->ibp_persistence == 0);
649         LASSERT (!kibnal_peer_active(peer));
650         LASSERT (peer->ibp_connecting == 0);
651         LASSERT (peer->ibp_accepting == 0);
652         LASSERT (list_empty (&peer->ibp_connd_list));
653         LASSERT (list_empty (&peer->ibp_conns));
654         LASSERT (list_empty (&peer->ibp_tx_queue));
655
656         LIBCFS_FREE (peer, sizeof (*peer));
657
658         /* NB a peer's connections keep a reference on their peer until
659          * they are destroyed, so we can be assured that _all_ state to do
660          * with this peer has been cleaned up when its refcount drops to
661          * zero. */
662         atomic_dec(&kibnal_data.kib_npeers);
663 }
664
665 kib_peer_t *
666 kibnal_find_peer_locked (lnet_nid_t nid)
667 {
668         struct list_head *peer_list = kibnal_nid2peerlist (nid);
669         struct list_head *tmp;
670         kib_peer_t       *peer;
671
672         list_for_each (tmp, peer_list) {
673
674                 peer = list_entry (tmp, kib_peer_t, ibp_list);
675
676                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
677                          peer->ibp_connecting != 0 || /* creating conns */
678                          peer->ibp_accepting != 0 ||
679                          !list_empty (&peer->ibp_conns));  /* active conn */
680
681                 if (peer->ibp_nid != nid)
682                         continue;
683
684                 return (peer);
685         }
686         return (NULL);
687 }
688
689 kib_peer_t *
690 kibnal_get_peer (lnet_nid_t nid)
691 {
692         kib_peer_t     *peer;
693         unsigned long   flags;
694
695         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
696         peer = kibnal_find_peer_locked (nid);
697         if (peer != NULL)                       /* +1 ref for caller? */
698                 kibnal_peer_addref(peer);
699         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
700
701         return (peer);
702 }
703
704 void
705 kibnal_unlink_peer_locked (kib_peer_t *peer)
706 {
707         LASSERT (peer->ibp_persistence == 0);
708         LASSERT (list_empty(&peer->ibp_conns));
709
710         LASSERT (kibnal_peer_active(peer));
711         list_del_init (&peer->ibp_list);
712         /* lose peerlist's ref */
713         kibnal_peer_decref(peer);
714 }
715
716 int
717 kibnal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,
718                       int *persistencep)
719 {
720         kib_peer_t        *peer;
721         struct list_head  *ptmp;
722         unsigned long      flags;
723         int                i;
724
725         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
726
727         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
728
729                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
730                         
731                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
732                         LASSERT (peer->ibp_persistence != 0 ||
733                                  peer->ibp_connecting != 0 ||
734                                  peer->ibp_accepting != 0 ||
735                                  !list_empty (&peer->ibp_conns));
736
737                         if (index-- > 0)
738                                 continue;
739
740                         *nidp = peer->ibp_nid;
741                         *ipp = peer->ibp_ip;
742                         *portp = peer->ibp_port;
743                         *persistencep = peer->ibp_persistence;
744                         
745                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
746                                                flags);
747                         return (0);
748                 }
749         }
750
751         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
752         return (-ENOENT);
753 }
754
755 int
756 kibnal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port)
757 {
758         unsigned long      flags;
759         kib_peer_t        *peer;
760         kib_peer_t        *peer2;
761         int                rc;
762         
763         if (nid == LNET_NID_ANY)
764                 return (-EINVAL);
765
766         rc = kibnal_create_peer (&peer, nid);
767         if (rc != 0)
768                 return rc;
769
770         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
771
772         /* I'm always called with a reference on kibnal_data.kib_ni
773          * so shutdown can't have started */
774         LASSERT (kibnal_data.kib_nonewpeers == 0);
775
776         peer2 = kibnal_find_peer_locked (nid);
777         if (peer2 != NULL) {
778                 kibnal_peer_decref(peer);
779                 peer = peer2;
780         } else {
781                 /* peer table takes existing ref on peer */
782                 list_add_tail (&peer->ibp_list,
783                                kibnal_nid2peerlist (nid));
784         }
785
786         peer->ibp_ip = ip;
787         peer->ibp_port = port;
788         peer->ibp_persistence++;
789         
790         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
791         return (0);
792 }
793
794 void
795 kibnal_del_peer_locked (kib_peer_t *peer)
796 {
797         struct list_head *ctmp;
798         struct list_head *cnxt;
799         kib_conn_t       *conn;
800
801         peer->ibp_persistence = 0;
802
803         if (list_empty(&peer->ibp_conns)) {
804                 kibnal_unlink_peer_locked(peer);
805         } else {
806                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
807                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
808
809                         kibnal_close_conn_locked (conn, 0);
810                 }
811                 /* NB peer is no longer persistent; closing its last conn
812                  * unlinked it. */
813         }
814         /* NB peer now unlinked; might even be freed if the peer table had the
815          * last ref on it. */
816 }
817
818 int
819 kibnal_del_peer (lnet_nid_t nid)
820 {
821         unsigned long      flags;
822         CFS_LIST_HEAD     (zombies);
823         struct list_head  *ptmp;
824         struct list_head  *pnxt;
825         kib_peer_t        *peer;
826         int                lo;
827         int                hi;
828         int                i;
829         int                rc = -ENOENT;
830
831         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
832
833         if (nid != LNET_NID_ANY)
834                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
835         else {
836                 lo = 0;
837                 hi = kibnal_data.kib_peer_hash_size - 1;
838         }
839
840         for (i = lo; i <= hi; i++) {
841                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
842                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
843                         LASSERT (peer->ibp_persistence != 0 ||
844                                  peer->ibp_connecting != 0 ||
845                                  peer->ibp_accepting != 0 ||
846                                  !list_empty (&peer->ibp_conns));
847
848                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
849                                 continue;
850
851                         if (!list_empty(&peer->ibp_tx_queue)) {
852                                 LASSERT (list_empty(&peer->ibp_conns));
853
854                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
855                         }
856
857                         kibnal_del_peer_locked (peer);
858                         rc = 0;         /* matched something */
859                 }
860         }
861
862         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
863
864         kibnal_txlist_done(&zombies, -EIO);
865
866         return (rc);
867 }
868
869 kib_conn_t *
870 kibnal_get_conn_by_idx (int index)
871 {
872         kib_peer_t        *peer;
873         struct list_head  *ptmp;
874         kib_conn_t        *conn;
875         struct list_head  *ctmp;
876         unsigned long      flags;
877         int                i;
878
879         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
880
881         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
882                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
883
884                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
885                         LASSERT (peer->ibp_persistence > 0 ||
886                                  peer->ibp_connecting != 0 ||
887                                  peer->ibp_accepting != 0 ||
888                                  !list_empty (&peer->ibp_conns));
889
890                         list_for_each (ctmp, &peer->ibp_conns) {
891                                 if (index-- > 0)
892                                         continue;
893
894                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
895                                 kibnal_conn_addref(conn);
896                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
897                                                        flags);
898                                 return (conn);
899                         }
900                 }
901         }
902
903         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
904         return (NULL);
905 }
906
907 kib_conn_t *
908 kibnal_create_conn (void)
909 {
910         kib_conn_t  *conn;
911         int          i;
912         __u64        vaddr = 0;
913         __u64        vaddr_base;
914         int          page_offset;
915         int          ipage;
916         int          rc;
917         union {
918                 struct ib_qp_create_param  qp_create;
919                 struct ib_qp_attribute     qp_attr;
920         } params;
921         
922         LIBCFS_ALLOC (conn, sizeof (*conn));
923         if (conn == NULL) {
924                 CERROR ("Can't allocate connection\n");
925                 return (NULL);
926         }
927
928         /* zero flags, NULL pointers etc... */
929         memset (conn, 0, sizeof (*conn));
930
931         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
932         INIT_LIST_HEAD (&conn->ibc_tx_queue);
933         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
934         INIT_LIST_HEAD (&conn->ibc_active_txs);
935         spin_lock_init (&conn->ibc_lock);
936         
937         atomic_inc (&kibnal_data.kib_nconns);
938         /* well not really, but I call destroy() on failure, which decrements */
939
940         LIBCFS_ALLOC (conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
941         if (conn->ibc_rxs == NULL)
942                 goto failed;
943         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
944
945         rc = kibnal_alloc_pages(&conn->ibc_rx_pages,
946                                 IBNAL_RX_MSG_PAGES,
947                                 IB_ACCESS_LOCAL_WRITE);
948         if (rc != 0)
949                 goto failed;
950
951         vaddr_base = vaddr = conn->ibc_rx_pages->ibp_vaddr;
952
953         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
954                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
955                 kib_rx_t   *rx = &conn->ibc_rxs[i];
956
957                 rx->rx_conn = conn;
958                 rx->rx_vaddr = vaddr;
959                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
960                 
961                 vaddr += IBNAL_MSG_SIZE;
962                 LASSERT (vaddr <= vaddr_base + IBNAL_RX_MSG_BYTES);
963                 
964                 page_offset += IBNAL_MSG_SIZE;
965                 LASSERT (page_offset <= PAGE_SIZE);
966
967                 if (page_offset == PAGE_SIZE) {
968                         page_offset = 0;
969                         ipage++;
970                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
971                 }
972         }
973
974         /* We can post up to IBNAL_RX_MSGS, which may also include an
975          * additional RDMA work item */
976
977         params.qp_create = (struct ib_qp_create_param) {
978                 .limit = {
979                         .max_outstanding_send_request    = 2 * IBNAL_RX_MSGS,
980                         .max_outstanding_receive_request = IBNAL_RX_MSGS,
981                         .max_send_gather_element         = 1,
982                         .max_receive_scatter_element     = 1,
983                 },
984                 .pd              = kibnal_data.kib_pd,
985                 .send_queue      = kibnal_data.kib_cq,
986                 .receive_queue   = kibnal_data.kib_cq,
987                 .send_policy     = IB_WQ_SIGNAL_SELECTABLE,
988                 .receive_policy  = IB_WQ_SIGNAL_SELECTABLE,
989                 .rd_domain       = 0,
990                 .transport       = IB_TRANSPORT_RC,
991                 .device_specific = NULL,
992         };
993         
994         rc = ib_qp_create (&params.qp_create, &conn->ibc_qp, &conn->ibc_qpn);
995         if (rc != 0) {
996                 CERROR ("Failed to create queue pair: %d\n", rc);
997                 goto failed;
998         }
999         
1000         /* Mark QP created */
1001         conn->ibc_state = IBNAL_CONN_INIT_QP;
1002
1003         params.qp_attr = (struct ib_qp_attribute) {
1004                 .state             = IB_QP_STATE_INIT,
1005                 .port              = kibnal_data.kib_port,
1006                 .enable_rdma_read  = 1,
1007                 .enable_rdma_write = 1,
1008                 .valid_fields      = (IB_QP_ATTRIBUTE_STATE |
1009                                       IB_QP_ATTRIBUTE_PORT |
1010                                       IB_QP_ATTRIBUTE_PKEY_INDEX |
1011                                       IB_QP_ATTRIBUTE_RDMA_ATOMIC_ENABLE),
1012         };
1013         rc = ib_qp_modify(conn->ibc_qp, &params.qp_attr);
1014         if (rc != 0) {
1015                 CERROR ("Failed to modify queue pair: %d\n", rc);
1016                 goto failed;
1017         }
1018
1019         /* 1 ref for caller */
1020         atomic_set (&conn->ibc_refcount, 1);
1021         return (conn);
1022         
1023  failed:
1024         kibnal_destroy_conn (conn);
1025         return (NULL);
1026 }
1027
1028 void
1029 kibnal_destroy_conn (kib_conn_t *conn)
1030 {
1031         int    rc;
1032         
1033         CDEBUG (D_NET, "connection %p\n", conn);
1034
1035         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1036         LASSERT (list_empty(&conn->ibc_tx_queue));
1037         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1038         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1039         LASSERT (list_empty(&conn->ibc_active_txs));
1040         LASSERT (conn->ibc_nsends_posted == 0);
1041         LASSERT (conn->ibc_connreq == NULL);
1042
1043         switch (conn->ibc_state) {
1044         case IBNAL_CONN_ZOMBIE:
1045                 /* called after connection sequence initiated */
1046
1047         case IBNAL_CONN_INIT_QP:
1048                 rc = ib_qp_destroy(conn->ibc_qp);
1049                 if (rc != 0)
1050                         CERROR("Can't destroy QP: %d\n", rc);
1051                 /* fall through */
1052                 
1053         case IBNAL_CONN_INIT_NOTHING:
1054                 break;
1055
1056         default:
1057                 LASSERT (0);
1058         }
1059
1060         if (conn->ibc_rx_pages != NULL) 
1061                 kibnal_free_pages(conn->ibc_rx_pages);
1062         
1063         if (conn->ibc_rxs != NULL)
1064                 LIBCFS_FREE(conn->ibc_rxs, 
1065                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1066
1067         if (conn->ibc_peer != NULL)
1068                 kibnal_peer_decref(conn->ibc_peer);
1069
1070         LIBCFS_FREE(conn, sizeof (*conn));
1071
1072         atomic_dec(&kibnal_data.kib_nconns);
1073         
1074         if (atomic_read (&kibnal_data.kib_nconns) == 0 &&
1075             kibnal_data.kib_shutdown) {
1076                 /* I just nuked the last connection on shutdown; wake up
1077                  * everyone so they can exit. */
1078                 wake_up_all(&kibnal_data.kib_sched_waitq);
1079                 wake_up_all(&kibnal_data.kib_reaper_waitq);
1080         }
1081 }
1082
1083 int
1084 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1085 {
1086         kib_conn_t         *conn;
1087         struct list_head   *ctmp;
1088         struct list_head   *cnxt;
1089         int                 count = 0;
1090
1091         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1092                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1093
1094                 count++;
1095                 kibnal_close_conn_locked (conn, why);
1096         }
1097
1098         return (count);
1099 }
1100
1101 int
1102 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1103 {
1104         kib_conn_t         *conn;
1105         struct list_head   *ctmp;
1106         struct list_head   *cnxt;
1107         int                 count = 0;
1108
1109         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1110                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1111
1112                 if (conn->ibc_incarnation == incarnation)
1113                         continue;
1114
1115                 CDEBUG(D_NET, "Closing stale conn %p nid: %s"
1116                        " incarnation:"LPX64"("LPX64")\n", conn,
1117                        libcfs_nid2str(peer->ibp_nid), 
1118                        conn->ibc_incarnation, incarnation);
1119                 
1120                 count++;
1121                 kibnal_close_conn_locked (conn, -ESTALE);
1122         }
1123
1124         return (count);
1125 }
1126
1127 int
1128 kibnal_close_matching_conns (lnet_nid_t nid)
1129 {
1130         unsigned long       flags;
1131         kib_peer_t         *peer;
1132         struct list_head   *ptmp;
1133         struct list_head   *pnxt;
1134         int                 lo;
1135         int                 hi;
1136         int                 i;
1137         int                 count = 0;
1138
1139         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1140
1141         if (nid != LNET_NID_ANY)
1142                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1143         else {
1144                 lo = 0;
1145                 hi = kibnal_data.kib_peer_hash_size - 1;
1146         }
1147
1148         for (i = lo; i <= hi; i++) {
1149                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1150
1151                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1152                         LASSERT (peer->ibp_persistence != 0 ||
1153                                  peer->ibp_connecting != 0 ||
1154                                  peer->ibp_accepting != 0 ||
1155                                  !list_empty (&peer->ibp_conns));
1156
1157                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1158                                 continue;
1159
1160                         count += kibnal_close_peer_conns_locked (peer, 0);
1161                 }
1162         }
1163
1164         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1165
1166         /* wildcards always succeed */
1167         if (nid == LNET_NID_ANY)
1168                 return (0);
1169         
1170         return (count == 0 ? -ENOENT : 0);
1171 }
1172
1173 int
1174 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1175 {
1176         struct libcfs_ioctl_data *data = arg;
1177         int                       rc = -EINVAL;
1178
1179         LASSERT (ni == kibnal_data.kib_ni);
1180
1181         switch(cmd) {
1182         case IOC_LIBCFS_GET_PEER: {
1183                 lnet_nid_t   nid = 0;
1184                 __u32       ip = 0;
1185                 int         port = 0;
1186                 int         share_count = 0;
1187
1188                 rc = kibnal_get_peer_info(data->ioc_count,
1189                                           &nid, &ip, &port, &share_count);
1190                 data->ioc_nid    = nid;
1191                 data->ioc_count  = share_count;
1192                 data->ioc_u32[0] = ip;
1193                 data->ioc_u32[1] = port;
1194                 break;
1195         }
1196         case IOC_LIBCFS_ADD_PEER: {
1197                 rc = kibnal_add_persistent_peer (data->ioc_nid,
1198                                                  data->ioc_u32[0], /* IP */
1199                                                  data->ioc_u32[1]); /* port */
1200                 break;
1201         }
1202         case IOC_LIBCFS_DEL_PEER: {
1203                 rc = kibnal_del_peer (data->ioc_nid);
1204                 break;
1205         }
1206         case IOC_LIBCFS_GET_CONN: {
1207                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1208
1209                 if (conn == NULL)
1210                         rc = -ENOENT;
1211                 else {
1212                         rc = 0;
1213                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1214                         kibnal_conn_decref(conn);
1215                 }
1216                 break;
1217         }
1218         case IOC_LIBCFS_CLOSE_CONNECTION: {
1219                 rc = kibnal_close_matching_conns (data->ioc_nid);
1220                 break;
1221         }
1222         case IOC_LIBCFS_REGISTER_MYNID: {
1223                 /* Ignore if this is a noop */
1224                 if (data->ioc_nid == ni->ni_nid) {
1225                         rc = 0;
1226                 } else {
1227                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1228                                libcfs_nid2str(data->ioc_nid),
1229                                libcfs_nid2str(ni->ni_nid));
1230                         rc = -EINVAL;
1231                 }
1232                 break;
1233         }
1234         }
1235
1236         return rc;
1237 }
1238
1239 void
1240 kibnal_free_pages (kib_pages_t *p)
1241 {
1242         int     npages = p->ibp_npages;
1243         int     rc;
1244         int     i;
1245         
1246         if (p->ibp_mapped) {
1247                 rc = ib_memory_deregister(p->ibp_handle);
1248                 if (rc != 0)
1249                         CERROR ("Deregister error: %d\n", rc);
1250         }
1251         
1252         for (i = 0; i < npages; i++)
1253                 if (p->ibp_pages[i] != NULL)
1254                         __free_page(p->ibp_pages[i]);
1255         
1256         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1257 }
1258
1259 int
1260 kibnal_alloc_pages (kib_pages_t **pp, int npages, int access)
1261 {
1262         kib_pages_t                *p;
1263         struct ib_physical_buffer  *phys_pages;
1264         int                         i;
1265         int                         rc;
1266
1267         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1268         if (p == NULL) {
1269                 CERROR ("Can't allocate buffer %d\n", npages);
1270                 return (-ENOMEM);
1271         }
1272
1273         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1274         p->ibp_npages = npages;
1275         
1276         for (i = 0; i < npages; i++) {
1277                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1278                 if (p->ibp_pages[i] == NULL) {
1279                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1280                         kibnal_free_pages(p);
1281                         return (-ENOMEM);
1282                 }
1283         }
1284
1285         LIBCFS_ALLOC(phys_pages, npages * sizeof(*phys_pages));
1286         if (phys_pages == NULL) {
1287                 CERROR ("Can't allocate physarray for %d pages\n", npages);
1288                 kibnal_free_pages(p);
1289                 return (-ENOMEM);
1290         }
1291
1292         for (i = 0; i < npages; i++) {
1293                 phys_pages[i].size = PAGE_SIZE;
1294                 phys_pages[i].address =
1295                         lnet_page2phys(p->ibp_pages[i]);
1296         }
1297
1298         p->ibp_vaddr = 0;
1299         rc = ib_memory_register_physical(kibnal_data.kib_pd,
1300                                          phys_pages, npages,
1301                                          &p->ibp_vaddr,
1302                                          npages * PAGE_SIZE, 0,
1303                                          access,
1304                                          &p->ibp_handle,
1305                                          &p->ibp_lkey,
1306                                          &p->ibp_rkey);
1307         
1308         LIBCFS_FREE(phys_pages, npages * sizeof(*phys_pages));
1309         
1310         if (rc != 0) {
1311                 CERROR ("Error %d mapping %d pages\n", rc, npages);
1312                 kibnal_free_pages(p);
1313                 return (rc);
1314         }
1315         
1316         p->ibp_mapped = 1;
1317         *pp = p;
1318         return (0);
1319 }
1320
1321 int
1322 kibnal_setup_tx_descs (void)
1323 {
1324         int           ipage = 0;
1325         int           page_offset = 0;
1326         __u64         vaddr;
1327         __u64         vaddr_base;
1328         struct page  *page;
1329         kib_tx_t     *tx;
1330         int           i;
1331         int           rc;
1332
1333         /* pre-mapped messages are not bigger than 1 page */
1334         LASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1335
1336         /* No fancy arithmetic when we do the buffer calculations */
1337         LASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1338
1339         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1340                                 IBNAL_TX_MSG_PAGES(), 
1341                                 0);            /* local read access only */
1342         if (rc != 0)
1343                 return (rc);
1344
1345         vaddr = vaddr_base = kibnal_data.kib_tx_pages->ibp_vaddr;
1346
1347         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1348                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1349                 tx = &kibnal_data.kib_tx_descs[i];
1350
1351                 memset (tx, 0, sizeof(*tx));    /* zero flags etc */
1352                 
1353                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
1354                 tx->tx_vaddr = vaddr;
1355                 tx->tx_mapped = KIB_TX_UNMAPPED;
1356
1357                 CDEBUG(D_NET, "Tx[%d] %p->%p - "LPX64"\n", 
1358                        i, tx, tx->tx_msg, tx->tx_vaddr);
1359
1360                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1361
1362                 vaddr += IBNAL_MSG_SIZE;
1363                 LASSERT (vaddr <= vaddr_base + IBNAL_TX_MSG_BYTES());
1364
1365                 page_offset += IBNAL_MSG_SIZE;
1366                 LASSERT (page_offset <= PAGE_SIZE);
1367
1368                 if (page_offset == PAGE_SIZE) {
1369                         page_offset = 0;
1370                         ipage++;
1371                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1372                 }
1373         }
1374         
1375         return (0);
1376 }
1377
1378 void
1379 kibnal_shutdown (lnet_ni_t *ni)
1380 {
1381         int           i;
1382         int           rc;
1383         unsigned long flags;
1384
1385         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1386                atomic_read (&libcfs_kmemory));
1387
1388         LASSERT(ni == kibnal_data.kib_ni);
1389         LASSERT(ni->ni_data == &kibnal_data);
1390
1391         switch (kibnal_data.kib_init) {
1392         default:
1393                 CERROR ("Unexpected state %d\n", kibnal_data.kib_init);
1394                 LBUG();
1395
1396         case IBNAL_INIT_ALL:
1397                 /* Prevent new peers from being created */
1398                 write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1399                 kibnal_data.kib_nonewpeers = 1;
1400                 write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1401
1402                 kibnal_stop_ib_listener();
1403
1404                 /* Remove all existing peers from the peer table */
1405                 kibnal_del_peer(LNET_NID_ANY);
1406                 
1407                 /* Wait for pending conn reqs to be handled */
1408                 i = 2;
1409                 spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1410                 while (!list_empty(&kibnal_data.kib_connd_acceptq)) {
1411                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, 
1412                                                flags);
1413                         i++;
1414                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1415                                "waiting for conn reqs to clean up\n");
1416                         cfs_pause(cfs_time_seconds(1));
1417                         
1418                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
1419                 }
1420                 spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
1421
1422                 /* Wait for all peer state to clean up */
1423                 i = 2;
1424                 while (atomic_read(&kibnal_data.kib_npeers) != 0) {
1425                         i++;
1426                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1427                                "waiting for %d peers to close down\n",
1428                                atomic_read(&kibnal_data.kib_npeers));
1429                         cfs_pause(cfs_time_seconds(1));
1430                 }
1431                 /* fall through */
1432
1433         case IBNAL_INIT_CQ:
1434                 rc = ib_cq_destroy (kibnal_data.kib_cq);
1435                 if (rc != 0)
1436                         CERROR ("Destroy CQ error: %d\n", rc);
1437                 /* fall through */
1438
1439         case IBNAL_INIT_TXD:
1440                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1441                 /* fall through */
1442 #if IBNAL_FMR
1443         case IBNAL_INIT_FMR:
1444                 rc = ib_fmr_pool_destroy (kibnal_data.kib_fmr_pool);
1445                 if (rc != 0)
1446                         CERROR ("Destroy FMR pool error: %d\n", rc);
1447                 /* fall through */
1448 #endif
1449         case IBNAL_INIT_PD:
1450                 rc = ib_pd_destroy(kibnal_data.kib_pd);
1451                 if (rc != 0)
1452                         CERROR ("Destroy PD error: %d\n", rc);
1453                 /* fall through */
1454
1455         case IBNAL_INIT_DATA:
1456                 /* Module refcount only gets to zero when all peers
1457                  * have been closed so all lists must be empty */
1458                 LASSERT (atomic_read(&kibnal_data.kib_npeers) == 0);
1459                 LASSERT (kibnal_data.kib_peers != NULL);
1460                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1461                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1462                 }
1463                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1464                 LASSERT (list_empty (&kibnal_data.kib_sched_rxq));
1465                 LASSERT (list_empty (&kibnal_data.kib_sched_txq));
1466                 LASSERT (list_empty (&kibnal_data.kib_reaper_conns));
1467                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1468                 LASSERT (list_empty (&kibnal_data.kib_connd_acceptq));
1469
1470                 /* flag threads to terminate; wake and wait for them to die */
1471                 kibnal_data.kib_shutdown = 1;
1472                 wake_up_all (&kibnal_data.kib_sched_waitq);
1473                 wake_up_all (&kibnal_data.kib_reaper_waitq);
1474                 wake_up_all (&kibnal_data.kib_connd_waitq);
1475
1476                 i = 2;
1477                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1478                         i++;
1479                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1480                                "Waiting for %d threads to terminate\n",
1481                                atomic_read (&kibnal_data.kib_nthreads));
1482                         cfs_pause(cfs_time_seconds(1));
1483                 }
1484                 /* fall through */
1485                 
1486         case IBNAL_INIT_NOTHING:
1487                 break;
1488         }
1489
1490         if (kibnal_data.kib_tx_descs != NULL)
1491                 LIBCFS_FREE (kibnal_data.kib_tx_descs,
1492                              IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1493
1494         if (kibnal_data.kib_peers != NULL)
1495                 LIBCFS_FREE (kibnal_data.kib_peers,
1496                              sizeof (struct list_head) * 
1497                              kibnal_data.kib_peer_hash_size);
1498
1499         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1500                atomic_read (&libcfs_kmemory));
1501
1502         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1503         PORTAL_MODULE_UNUSE;
1504 }
1505
1506 int
1507 kibnal_get_ipoibidx(void)
1508 {
1509         /* NB single threaded! */
1510         static struct ib_port_properties port_props;
1511
1512         int               ipoibidx = 0;
1513         int               devidx;
1514         int               port;
1515         int               rc;
1516         struct ib_device *device;
1517
1518         for (devidx = 0; devidx <= kibnal_data.kib_hca_idx; devidx++) {
1519                 device = ib_device_get_by_index(devidx);
1520                 
1521                 if (device == NULL) {
1522                         CERROR("Can't get IB device %d\n", devidx);
1523                         return -1;
1524                 }
1525                 
1526                 for (port = 1; port <= 2; port++) {
1527                         if (devidx == kibnal_data.kib_hca_idx &&
1528                             port == kibnal_data.kib_port)
1529                                 return ipoibidx;
1530                         
1531                         rc = ib_port_properties_get(device, port,
1532                                                     &port_props);
1533                         if (rc == 0)
1534                                 ipoibidx++;
1535                 }
1536         }
1537
1538         LBUG();
1539         return -1;
1540 }
1541
1542 int
1543 kibnal_startup (lnet_ni_t *ni)
1544 {
1545         char              ipif_name[32];
1546         __u32             ip;
1547         __u32             netmask;
1548         int               up;
1549         struct timeval    tv;
1550         int               rc;
1551         int               hca;
1552         int               port;
1553         int               i;
1554         int               nob;
1555
1556         LASSERT (ni->ni_lnd == &the_kiblnd);
1557
1558         /* Only 1 instance supported */
1559         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1560                 CERROR ("Only 1 instance supported\n");
1561                 return -EPERM;
1562         }
1563
1564         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1565                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1566                         *kibnal_tunables.kib_credits,
1567                         *kibnal_tunables.kib_ntx);
1568                 return -EINVAL;
1569         }
1570
1571         memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */
1572
1573         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1574         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1575
1576         CLASSERT (LNET_MAX_INTERFACES > 1);
1577
1578
1579         kibnal_data.kib_hca_idx = 0;            /* default: first HCA */
1580         kibnal_data.kib_port = 0;               /* any port */
1581
1582         if (ni->ni_interfaces[0] != NULL) {
1583                 /* hca.port specified in 'networks=openib(h.p)' */
1584                 if (ni->ni_interfaces[1] != NULL) {
1585                         CERROR("Multiple interfaces not supported\n");
1586                         return -EPERM;
1587                 }
1588                 
1589                 nob = strlen(ni->ni_interfaces[0]);
1590                 i = sscanf(ni->ni_interfaces[0], "%d.%d%n", &hca, &port, &nob);
1591                 if (i >= 2 && nob == strlen(ni->ni_interfaces[0])) {
1592                         kibnal_data.kib_hca_idx = hca;
1593                         kibnal_data.kib_port = port;
1594                 } else {
1595                         nob = strlen(ni->ni_interfaces[0]);
1596                         i = sscanf(ni->ni_interfaces[0], "%d%n", &hca, &nob);
1597
1598                         if (i >= 1 && nob == strlen(ni->ni_interfaces[0])) {
1599                                 kibnal_data.kib_hca_idx = hca;
1600                         } else {
1601                                 CERROR("Can't parse interface '%s'\n",
1602                                        ni->ni_interfaces[0]);
1603                                 return -EINVAL;
1604                         }
1605                 }
1606         }
1607         
1608         kibnal_data.kib_ni = ni;
1609         ni->ni_data = &kibnal_data;
1610         
1611         do_gettimeofday(&tv);
1612         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1613
1614         PORTAL_MODULE_USE;
1615
1616         rwlock_init(&kibnal_data.kib_global_lock);
1617
1618         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1619         LIBCFS_ALLOC (kibnal_data.kib_peers,
1620                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1621         if (kibnal_data.kib_peers == NULL) {
1622                 goto failed;
1623         }
1624         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1625                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1626
1627         spin_lock_init (&kibnal_data.kib_reaper_lock);
1628         INIT_LIST_HEAD (&kibnal_data.kib_reaper_conns);
1629         init_waitqueue_head (&kibnal_data.kib_reaper_waitq);
1630
1631         spin_lock_init (&kibnal_data.kib_connd_lock);
1632         INIT_LIST_HEAD (&kibnal_data.kib_connd_acceptq);
1633         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1634         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1635
1636         spin_lock_init (&kibnal_data.kib_sched_lock);
1637         INIT_LIST_HEAD (&kibnal_data.kib_sched_txq);
1638         INIT_LIST_HEAD (&kibnal_data.kib_sched_rxq);
1639         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1640
1641         spin_lock_init (&kibnal_data.kib_tx_lock);
1642         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1643
1644         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1645                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1646         if (kibnal_data.kib_tx_descs == NULL) {
1647                 CERROR ("Can't allocate tx descs\n");
1648                 goto failed;
1649         }
1650
1651         /* lists/ptrs/locks initialised */
1652         kibnal_data.kib_init = IBNAL_INIT_DATA;
1653         /*****************************************************/
1654
1655         for (i = 0; i < IBNAL_N_SCHED; i++) {
1656                 rc = kibnal_thread_start (kibnal_scheduler,
1657                                           (void *)((unsigned long)i));
1658                 if (rc != 0) {
1659                         CERROR("Can't spawn openibnal scheduler[%d]: %d\n",
1660                                i, rc);
1661                         goto failed;
1662                 }
1663         }
1664
1665         /* must have at least 2 connds to remain responsive to svcqry while
1666          * connecting */
1667         if (*kibnal_tunables.kib_n_connd < 2)
1668                 *kibnal_tunables.kib_n_connd = 2;
1669
1670
1671         for (i = 0; i < *kibnal_tunables.kib_n_connd; i++) {
1672                 rc = kibnal_thread_start (kibnal_connd,
1673                                           (void *)((unsigned long)i));
1674                 if (rc != 0) {
1675                         CERROR("Can't spawn openibnal connd[%d]: %d\n",
1676                                i, rc);
1677                         goto failed;
1678                 }
1679         }
1680
1681         rc = kibnal_thread_start (kibnal_reaper, NULL);
1682         if (rc != 0) {
1683                 CERROR ("Can't spawn openibnal reaper: %d\n", rc);
1684                 goto failed;
1685         }
1686
1687         kibnal_data.kib_device = ib_device_get_by_index(kibnal_data.kib_hca_idx);
1688         if (kibnal_data.kib_device == NULL) {
1689                 CERROR ("Can't open ib device %d\n",
1690                         kibnal_data.kib_hca_idx);
1691                 goto failed;
1692         }
1693         
1694         rc = ib_device_properties_get(kibnal_data.kib_device,
1695                                       &kibnal_data.kib_device_props);
1696         if (rc != 0) {
1697                 CERROR ("Can't get device props: %d\n", rc);
1698                 goto failed;
1699         }
1700
1701         CDEBUG(D_NET, "Max Initiator: %d Max Responder %d\n", 
1702                kibnal_data.kib_device_props.max_initiator_per_qp,
1703                kibnal_data.kib_device_props.max_responder_per_qp);
1704
1705         if (kibnal_data.kib_port != 0) {
1706                 rc = ib_port_properties_get(kibnal_data.kib_device, 
1707                                             kibnal_data.kib_port,
1708                                             &kibnal_data.kib_port_props);
1709                 if (rc != 0) {
1710                         CERROR("Error %d open port %d on HCA %d\n", rc,
1711                                kibnal_data.kib_port,
1712                                kibnal_data.kib_hca_idx);
1713                         goto failed;
1714                 }
1715         } else {
1716                 for (i = 1; i <= 2; i++) {
1717                         rc = ib_port_properties_get(kibnal_data.kib_device, i,
1718                                                     &kibnal_data.kib_port_props);
1719                         if (rc == 0) {
1720                                 kibnal_data.kib_port = i;
1721                                 break;
1722                         }
1723                 }
1724                 if (kibnal_data.kib_port == 0) {
1725                         CERROR ("Can't find a port\n");
1726                         goto failed;
1727                 }
1728         }
1729
1730         i = kibnal_get_ipoibidx();
1731         if (i < 0)
1732                 goto failed;
1733         
1734         snprintf(ipif_name, sizeof(ipif_name), "%s%d",
1735                  *kibnal_tunables.kib_ipif_basename, i);
1736         if (strlen(ipif_name) == sizeof(ipif_name) - 1) {
1737                 CERROR("IPoIB interface name %s truncated\n", ipif_name);
1738                 return -EINVAL;
1739         }
1740         
1741         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1742         if (rc != 0) {
1743                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1744                 goto failed;
1745         }
1746         
1747         if (!up) {
1748                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1749                 goto failed;
1750         }
1751         
1752         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1753
1754         rc = ib_pd_create(kibnal_data.kib_device,
1755                           NULL, &kibnal_data.kib_pd);
1756         if (rc != 0) {
1757                 CERROR ("Can't create PD: %d\n", rc);
1758                 goto failed;
1759         }
1760         
1761         /* flag PD initialised */
1762         kibnal_data.kib_init = IBNAL_INIT_PD;
1763         /*****************************************************/
1764 #if IBNAL_FMR
1765         {
1766                 const int pool_size = *kibnal_tunables.kib_ntx;
1767                 struct ib_fmr_pool_param params = {
1768                         .max_pages_per_fmr = LNET_MAX_PAYLOAD/PAGE_SIZE,
1769                         .access            = (IB_ACCESS_LOCAL_WRITE |
1770                                               IB_ACCESS_REMOTE_WRITE |
1771                                               IB_ACCESS_REMOTE_READ),
1772                         .pool_size         = pool_size,
1773                         .dirty_watermark   = (pool_size * 3)/4,
1774                         .flush_function    = NULL,
1775                         .flush_arg         = NULL,
1776                         .cache             = 1,
1777                 };
1778                 rc = ib_fmr_pool_create(kibnal_data.kib_pd, &params,
1779                                         &kibnal_data.kib_fmr_pool);
1780                 if (rc != 0) {
1781                         CERROR ("Can't create FMR pool size %d: %d\n", 
1782                                 pool_size, rc);
1783                         goto failed;
1784                 }
1785         }
1786
1787         /* flag FMR pool initialised */
1788         kibnal_data.kib_init = IBNAL_INIT_FMR;
1789 #endif
1790         /*****************************************************/
1791
1792         rc = kibnal_setup_tx_descs();
1793         if (rc != 0) {
1794                 CERROR ("Can't register tx descs: %d\n", rc);
1795                 goto failed;
1796         }
1797         
1798         /* flag TX descs initialised */
1799         kibnal_data.kib_init = IBNAL_INIT_TXD;
1800         /*****************************************************/
1801         
1802         {
1803                 struct ib_cq_callback callback = {
1804                         .context        = IBNAL_CALLBACK_CTXT,
1805                         .policy         = IB_CQ_PROVIDER_REARM,
1806                         .function       = {
1807                                 .entry  = kibnal_callback,
1808                         },
1809                         .arg            = NULL,
1810                 };
1811                 int  nentries = IBNAL_CQ_ENTRIES();
1812                 
1813                 rc = ib_cq_create (kibnal_data.kib_device, 
1814                                    &nentries, &callback, NULL,
1815                                    &kibnal_data.kib_cq);
1816                 if (rc != 0) {
1817                         CERROR ("Can't create CQ: %d\n", rc);
1818                         goto failed;
1819                 }
1820
1821                 /* I only want solicited events */
1822                 rc = ib_cq_request_notification(kibnal_data.kib_cq, 1);
1823                 LASSERT (rc == 0);
1824         }
1825
1826         /* flag CQ initialised */
1827         kibnal_data.kib_init = IBNAL_INIT_CQ;
1828         /*****************************************************/
1829
1830         rc = kibnal_start_ib_listener();
1831         if (rc != 0)
1832                 goto failed;
1833         
1834         /* flag everything initialised */
1835         kibnal_data.kib_init = IBNAL_INIT_ALL;
1836         /*****************************************************/
1837
1838         return 0;
1839
1840  failed:
1841         kibnal_shutdown(ni);    
1842         return -ENETDOWN;
1843 }
1844
1845 void __exit
1846 kibnal_module_fini (void)
1847 {
1848         lnet_unregister_lnd(&the_kiblnd);
1849         kibnal_tunables_fini();
1850 }
1851
1852 int __init
1853 kibnal_module_init (void)
1854 {
1855         int    rc;
1856
1857         rc = kibnal_tunables_init();
1858         if (rc != 0)
1859                 return rc;
1860         
1861         lnet_register_lnd(&the_kiblnd);
1862
1863         return (0);
1864 }
1865
1866 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1867 #ifdef USING_TSAPI
1868 MODULE_DESCRIPTION("Kernel Cisco IB LND v1.00");
1869 #else
1870 MODULE_DESCRIPTION("Kernel OpenIB(gen1) LND v1.00");
1871 #endif
1872 MODULE_LICENSE("GPL");
1873
1874 module_init(kibnal_module_init);
1875 module_exit(kibnal_module_fini);