Whamcloud - gitweb
* #ifdef for extra proc_dointvec parameter under linux 2.6.8 in ranal and
[fs/lustre-release.git] / lnet / klnds / openiblnd / openiblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openibnal.h"
25
26 nal_t                   kibnal_api;
27 ptl_handle_ni_t         kibnal_ni;
28 kib_data_t              kibnal_data;
29 kib_tunables_t          kibnal_tunables;
30
31 #define IBNAL_SYSCTL             202
32
33 enum {
34         IBNAL_SYSCTL_TIMEOUT=1,
35         IBNAL_SYSCTL_LISTENER_TIMEOUT,
36         IBNAL_SYSCTL_BACKLOG,
37         IBNAL_SYSCTL_PORT
38 };
39
40 static ctl_table kibnal_ctl_table[] = {
41         {IBNAL_SYSCTL_TIMEOUT, "timeout", 
42          &kibnal_tunables.kib_io_timeout, sizeof (int),
43          0644, NULL, &proc_dointvec},
44         {IBNAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", 
45          &kibnal_tunables.kib_listener_timeout, sizeof(int),
46          0644, NULL, &proc_dointvec},
47         {IBNAL_SYSCTL_BACKLOG, "backlog",
48          &kibnal_tunables.kib_backlog, sizeof(int),
49          0644, NULL, kibnal_listener_procint},
50         {IBNAL_SYSCTL_PORT, "port",
51          &kibnal_tunables.kib_port, sizeof(int),
52          0644, NULL, kibnal_listener_procint},
53         { 0 }
54 };
55
56 static ctl_table kibnal_top_ctl_table[] = {
57         {IBNAL_SYSCTL, "openibnal", NULL, 0, 0555, kibnal_ctl_table},
58         { 0 }
59 };
60
61 __u32 
62 kibnal_cksum (void *ptr, int nob)
63 {
64         char  *c  = ptr;
65         __u32  sum = 0;
66
67         while (nob-- > 0)
68                 sum = ((sum << 1) | (sum >> 31)) + *c++;
69
70         /* ensure I don't return 0 (== no checksum) */
71         return (sum == 0) ? 1 : sum;
72 }
73
74 void
75 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
76 {
77         msg->ibm_type = type;
78         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
79 }
80
81 void
82 kibnal_pack_msg(kib_msg_t *msg, int credits, ptl_nid_t dstnid, __u64 dststamp)
83 {
84         /* CAVEAT EMPTOR! all message fields not set here should have been
85          * initialised previously. */
86         msg->ibm_magic    = IBNAL_MSG_MAGIC;
87         msg->ibm_version  = IBNAL_MSG_VERSION;
88         /*   ibm_type */
89         msg->ibm_credits  = credits;
90         /*   ibm_nob */
91         msg->ibm_cksum    = 0;
92         msg->ibm_srcnid   = kibnal_lib.libnal_ni.ni_pid.nid;
93         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
94         msg->ibm_dstnid   = dstnid;
95         msg->ibm_dststamp = dststamp;
96 #if IBNAL_CKSUM
97         /* NB ibm_cksum zero while computing cksum */
98         msg->ibm_cksum    = kibnal_cksum(msg, msg->ibm_nob);
99 #endif
100 }
101
102 int
103 kibnal_unpack_msg(kib_msg_t *msg, int nob)
104 {
105         const int hdr_size = offsetof(kib_msg_t, ibm_u);
106         __u32     msg_cksum;
107         int       flip;
108         int       msg_nob;
109
110         if (nob < 6) {
111                 CERROR("Short message: %d\n", nob);
112                 return -EPROTO;
113         }
114
115         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
116                 flip = 0;
117         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
118                 flip = 1;
119         } else {
120                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
121                 return -EPROTO;
122         }
123
124         if (msg->ibm_version != 
125             (flip ? __swab16(IBNAL_MSG_VERSION) : IBNAL_MSG_VERSION)) {
126                 CERROR("Bad version: %d\n", msg->ibm_version);
127                 return -EPROTO;
128         }
129
130         if (nob < hdr_size) {
131                 CERROR("Short message: %d\n", nob);
132                 return -EPROTO;
133         }
134
135         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
136         if (msg_nob > nob) {
137                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
138                 return -EPROTO;
139         }
140
141         /* checksum must be computed with ibm_cksum zero and BEFORE anything
142          * gets flipped */
143         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
144         msg->ibm_cksum = 0;
145         if (msg_cksum != 0 &&
146             msg_cksum != kibnal_cksum(msg, msg_nob)) {
147                 CERROR("Bad checksum\n");
148                 return -EPROTO;
149         }
150         msg->ibm_cksum = msg_cksum;
151         
152         if (flip) {
153                 /* leave magic unflipped as a clue to peer endianness */
154                 __swab16s(&msg->ibm_version);
155                 LASSERT (sizeof(msg->ibm_type) == 1);
156                 LASSERT (sizeof(msg->ibm_credits) == 1);
157                 msg->ibm_nob = msg_nob;
158                 __swab64s(&msg->ibm_srcnid);
159                 __swab64s(&msg->ibm_srcstamp);
160                 __swab64s(&msg->ibm_dstnid);
161                 __swab64s(&msg->ibm_dststamp);
162         }
163         
164         if (msg->ibm_srcnid == PTL_NID_ANY) {
165                 CERROR("Bad src nid: "LPX64"\n", msg->ibm_srcnid);
166                 return -EPROTO;
167         }
168
169         switch (msg->ibm_type) {
170         default:
171                 CERROR("Unknown message type %x\n", msg->ibm_type);
172                 return -EPROTO;
173                 
174         case IBNAL_MSG_SVCQRY:
175         case IBNAL_MSG_NOOP:
176                 break;
177
178         case IBNAL_MSG_SVCRSP:
179                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.svcrsp)) {
180                         CERROR("Short SVCRSP: %d(%d)\n", msg_nob,
181                                (int)(hdr_size + sizeof(msg->ibm_u.svcrsp)));
182                         return -EPROTO;
183                 }
184                 if (flip) {
185                         __swab64s(&msg->ibm_u.svcrsp.ibsr_svc_id);
186                         __swab16s(&msg->ibm_u.svcrsp.ibsr_svc_pkey);
187                 }
188                 break;
189
190         case IBNAL_MSG_CONNREQ:
191         case IBNAL_MSG_CONNACK:
192                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
193                         CERROR("Short CONNREQ: %d(%d)\n", msg_nob,
194                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
195                         return -EPROTO;
196                 }
197                 if (flip)
198                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
199                 break;
200
201         case IBNAL_MSG_IMMEDIATE:
202                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
203                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
204                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
205                         return -EPROTO;
206                 }
207                 break;
208
209         case IBNAL_MSG_PUT_RDMA:
210         case IBNAL_MSG_GET_RDMA:
211                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.rdma)) {
212                         CERROR("Short RDMA req: %d(%d)\n", msg_nob,
213                                (int)(hdr_size + sizeof(msg->ibm_u.rdma)));
214                         return -EPROTO;
215                 }
216                 if (flip) {
217                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_key);
218                         __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_nob);
219                         __swab64s(&msg->ibm_u.rdma.ibrm_desc.rd_addr);
220                 }
221                 break;
222
223         case IBNAL_MSG_PUT_DONE:
224         case IBNAL_MSG_GET_DONE:
225                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
226                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
227                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
228                         return -EPROTO;
229                 }
230                 if (flip)
231                         __swab32s(&msg->ibm_u.completion.ibcm_status);
232                 break;
233         }
234         return 0;
235 }
236
237 int
238 kibnal_sock_write (struct socket *sock, void *buffer, int nob)
239 {
240         int           rc;
241         mm_segment_t  oldmm = get_fs();
242         struct iovec  iov = {
243                 .iov_base = buffer,
244                 .iov_len  = nob
245         };
246         struct msghdr msg = {
247                 .msg_name       = NULL,
248                 .msg_namelen    = 0,
249                 .msg_iov        = &iov,
250                 .msg_iovlen     = 1,
251                 .msg_control    = NULL,
252                 .msg_controllen = 0,
253                 .msg_flags      = MSG_DONTWAIT
254         };
255
256         /* We've set up the socket's send buffer to be large enough for
257          * everything we send, so a single non-blocking send should
258          * complete without error. */
259
260         set_fs(KERNEL_DS);
261         rc = sock_sendmsg(sock, &msg, iov.iov_len);
262         set_fs(oldmm);
263
264         if (rc == nob)
265                 return 0;
266
267         if (rc >= 0)
268                 return -EAGAIN;
269
270         return rc;
271 }
272
273 int
274 kibnal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
275 {
276         int            rc;
277         mm_segment_t   oldmm = get_fs();
278         long           ticks = timeout * HZ;
279         unsigned long  then;
280         struct timeval tv;
281
282         LASSERT (nob > 0);
283         LASSERT (ticks > 0);
284
285         for (;;) {
286                 struct iovec  iov = {
287                         .iov_base = buffer,
288                         .iov_len  = nob
289                 };
290                 struct msghdr msg = {
291                         .msg_name       = NULL,
292                         .msg_namelen    = 0,
293                         .msg_iov        = &iov,
294                         .msg_iovlen     = 1,
295                         .msg_control    = NULL,
296                         .msg_controllen = 0,
297                         .msg_flags      = 0
298                 };
299
300                 /* Set receive timeout to remaining time */
301                 tv = (struct timeval) {
302                         .tv_sec = ticks / HZ,
303                         .tv_usec = ((ticks % HZ) * 1000000) / HZ
304                 };
305                 set_fs(KERNEL_DS);
306                 rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
307                                      (char *)&tv, sizeof(tv));
308                 set_fs(oldmm);
309                 if (rc != 0) {
310                         CERROR("Can't set socket recv timeout %d: %d\n",
311                                timeout, rc);
312                         return rc;
313                 }
314
315                 set_fs(KERNEL_DS);
316                 then = jiffies;
317                 rc = sock_recvmsg(sock, &msg, iov.iov_len, 0);
318                 ticks -= jiffies - then;
319                 set_fs(oldmm);
320
321                 if (rc < 0)
322                         return rc;
323
324                 if (rc == 0)
325                         return -ECONNABORTED;
326
327                 buffer = ((char *)buffer) + rc;
328                 nob -= rc;
329
330                 if (nob == 0)
331                         return 0;
332
333                 if (ticks <= 0)
334                         return -ETIMEDOUT;
335         }
336 }
337
338 int
339 kibnal_create_sock(struct socket **sockp)
340 {
341         struct socket       *sock;
342         int                  rc;
343         int                  option;
344         mm_segment_t         oldmm = get_fs();
345
346         rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock);
347         if (rc != 0) {
348                 CERROR("Can't create socket: %d\n", rc);
349                 return rc;
350         }
351
352         /* Ensure sends will not block */
353         option = 2 * sizeof(kib_msg_t);
354         set_fs(KERNEL_DS);
355         rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
356                              (char *)&option, sizeof(option));
357         set_fs(oldmm);
358         if (rc != 0) {
359                 CERROR("Can't set send buffer %d: %d\n", option, rc);
360                 goto failed;
361         }
362
363         option = 1;
364         set_fs(KERNEL_DS);
365         rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
366                              (char *)&option, sizeof(option));
367         set_fs(oldmm);
368         if (rc != 0) {
369                 CERROR("Can't set SO_REUSEADDR: %d\n", rc);
370                 goto failed;
371         }
372
373         *sockp = sock;
374         return 0;
375
376  failed:
377         sock_release(sock);
378         return rc;
379 }
380
381 void
382 kibnal_pause(int ticks)
383 {
384         set_current_state(TASK_UNINTERRUPTIBLE);
385         schedule_timeout(ticks);
386 }
387
388 int
389 kibnal_connect_sock(kib_peer_t *peer, struct socket **sockp)
390 {
391         struct sockaddr_in  locaddr;
392         struct sockaddr_in  srvaddr;
393         struct socket      *sock;
394         unsigned int        port;
395         int                 rc;
396
397         for (port = 1023; port >= 512; port--) {
398
399                 memset(&locaddr, 0, sizeof(locaddr)); 
400                 locaddr.sin_family      = AF_INET; 
401                 locaddr.sin_port        = htons(port);
402                 locaddr.sin_addr.s_addr = htonl(INADDR_ANY);
403
404                 memset (&srvaddr, 0, sizeof (srvaddr));
405                 srvaddr.sin_family      = AF_INET;
406                 srvaddr.sin_port        = htons (peer->ibp_port);
407                 srvaddr.sin_addr.s_addr = htonl (peer->ibp_ip);
408
409                 rc = kibnal_create_sock(&sock);
410                 if (rc != 0)
411                         return rc;
412
413                 rc = sock->ops->bind(sock,
414                                      (struct sockaddr *)&locaddr, sizeof(locaddr));
415                 if (rc != 0) {
416                         sock_release(sock);
417                         
418                         if (rc == -EADDRINUSE) {
419                                 CDEBUG(D_NET, "Port %d already in use\n", port);
420                                 continue;
421                         }
422
423                         CERROR("Can't bind to reserved port %d: %d\n", port, rc);
424                         return rc;
425                 }
426
427                 rc = sock->ops->connect(sock,
428                                         (struct sockaddr *)&srvaddr, sizeof(srvaddr),
429                                         0);
430                 if (rc == 0) {
431                         *sockp = sock;
432                         return 0;
433                 }
434                 
435                 sock_release(sock);
436
437                 if (rc != -EADDRNOTAVAIL) {
438                         CERROR("Can't connect port %d to %u.%u.%u.%u/%d: %d\n",
439                                port, HIPQUAD(peer->ibp_ip), peer->ibp_port, rc);
440                         return rc;
441                 }
442                 
443                 CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", 
444                        port, HIPQUAD(peer->ibp_ip), peer->ibp_port);
445         }
446
447         /* all ports busy */
448         return -EHOSTUNREACH;
449 }
450
451 int
452 kibnal_make_svcqry (kib_conn_t *conn) 
453 {
454         kib_peer_t    *peer = conn->ibc_peer;
455         kib_msg_t     *msg;
456         struct socket *sock;
457         int            rc;
458         int            nob;
459
460         LASSERT (conn->ibc_connreq != NULL);
461         msg = &conn->ibc_connreq->cr_msg;
462
463         kibnal_init_msg(msg, IBNAL_MSG_SVCQRY, 0);
464         kibnal_pack_msg(msg, 0, peer->ibp_nid, 0);
465
466         rc = kibnal_connect_sock(peer, &sock);
467         if (rc != 0)
468                 return rc;
469         
470         rc = kibnal_sock_write(sock, msg, msg->ibm_nob);
471         if (rc != 0) {
472                 CERROR("Error %d sending svcqry to "
473                        LPX64"@%u.%u.%u.%u/%d\n", rc, 
474                        peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port);
475                 goto out;
476         }
477
478         nob = offsetof(kib_msg_t, ibm_u) + sizeof(msg->ibm_u.svcrsp);
479         rc = kibnal_sock_read(sock, msg, nob, kibnal_tunables.kib_io_timeout);
480         if (rc != 0) {
481                 CERROR("Error %d receiving svcrsp from "
482                        LPX64"@%u.%u.%u.%u/%d\n", rc, 
483                        peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port);
484                 goto out;
485         }
486
487         rc = kibnal_unpack_msg(msg, nob);
488         if (rc != 0) {
489                 CERROR("Error %d unpacking svcrsp from "
490                        LPX64"@%u.%u.%u.%u/%d\n", rc,
491                        peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port);
492                 goto out;
493         }
494                        
495         if (msg->ibm_type != IBNAL_MSG_SVCRSP) {
496                 CERROR("Unexpected response type %d from "
497                        LPX64"@%u.%u.%u.%u/%d\n", msg->ibm_type, 
498                        peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port);
499                 rc = -EPROTO;
500                 goto out;
501         }
502         
503         if (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid ||
504             msg->ibm_dststamp != kibnal_data.kib_incarnation) {
505                 CERROR("Unexpected dst NID/stamp "LPX64"/"LPX64" from "
506                        LPX64"@%u.%u.%u.%u/%d\n", 
507                        msg->ibm_dstnid, msg->ibm_dststamp,
508                        peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port);
509                 rc = -EPROTO;
510                 goto out;
511         }
512
513         if (msg->ibm_srcnid != peer->ibp_nid) {
514                 CERROR("Unexpected src NID "LPX64" from "
515                        LPX64"@%u.%u.%u.%u/%d\n", msg->ibm_srcnid,
516                        peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port);
517                 rc = -EPROTO;
518                 goto out;
519         }
520
521         conn->ibc_incarnation = msg->ibm_srcstamp;
522         conn->ibc_connreq->cr_svcrsp = msg->ibm_u.svcrsp;
523  out:
524         sock_release(sock);
525         return rc;
526 }
527
528 void
529 kibnal_handle_svcqry (struct socket *sock)
530 {
531         struct sockaddr_in   addr;
532         __u32                peer_ip;
533         unsigned int         peer_port;
534         kib_msg_t           *msg;
535         __u64                srcnid;
536         __u64                srcstamp;
537         int                  len;
538         int                  rc;
539
540         len = sizeof(addr);
541         rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2);
542         if (rc != 0) {
543                 CERROR("Can't get peer's IP: %d\n", rc);
544                 return;
545         }
546
547         peer_ip = ntohl(addr.sin_addr.s_addr);
548         peer_port = ntohs(addr.sin_port);
549
550         if (peer_port >= 1024) {
551                 CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n",
552                        HIPQUAD(peer_ip), peer_port);
553                 return;
554         }
555
556         PORTAL_ALLOC(msg, sizeof(*msg));
557         if (msg == NULL) {
558                 CERROR("Can't allocate msgs for %u.%u.%u.%u/%d\n",
559                        HIPQUAD(peer_ip), peer_port);
560                 goto out;
561         }
562         
563         rc = kibnal_sock_read(sock, msg, offsetof(kib_msg_t, ibm_u),
564                               kibnal_tunables.kib_listener_timeout);
565         if (rc != 0) {
566                 CERROR("Error %d receiving svcqry from %u.%u.%u.%u/%d\n",
567                        rc, HIPQUAD(peer_ip), peer_port);
568                 goto out;
569         }
570         
571         rc = kibnal_unpack_msg(msg, offsetof(kib_msg_t, ibm_u));
572         if (rc != 0) {
573                 CERROR("Error %d unpacking svcqry from %u.%u.%u.%u/%d\n",
574                        rc, HIPQUAD(peer_ip), peer_port);
575                 goto out;
576         }
577         
578         if (msg->ibm_type != IBNAL_MSG_SVCQRY) {
579                 CERROR("Unexpected message %d from %u.%u.%u.%u/%d\n",
580                        msg->ibm_type, HIPQUAD(peer_ip), peer_port);
581                 goto out;
582         }
583         
584         if (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid) {
585                 CERROR("Unexpected dstnid "LPX64"(expected "LPX64" "
586                        "from %u.%u.%u.%u/%d\n", msg->ibm_dstnid,
587                        kibnal_lib.libnal_ni.ni_pid.nid,
588                        HIPQUAD(peer_ip), peer_port);
589                 goto out;
590         }
591
592         srcnid = msg->ibm_srcnid;
593         srcstamp = msg->ibm_srcstamp;
594         
595         kibnal_init_msg(msg, IBNAL_MSG_SVCRSP, sizeof(msg->ibm_u.svcrsp));
596
597         msg->ibm_u.svcrsp.ibsr_svc_id = kibnal_data.kib_svc_id;
598         memcpy(msg->ibm_u.svcrsp.ibsr_svc_gid, kibnal_data.kib_svc_gid,
599                sizeof(kibnal_data.kib_svc_gid));
600         msg->ibm_u.svcrsp.ibsr_svc_pkey = kibnal_data.kib_svc_pkey;
601
602         kibnal_pack_msg(msg, 0, srcnid, srcstamp);
603         
604         rc = kibnal_sock_write (sock, msg, msg->ibm_nob);
605         if (rc != 0) {
606                 CERROR("Error %d replying to svcqry from %u.%u.%u.%u/%d\n",
607                        rc, HIPQUAD(peer_ip), peer_port);
608                 goto out;
609         }
610         
611  out:
612         PORTAL_FREE(msg, sizeof(*msg));
613 }
614
615 void
616 kibnal_free_acceptsock (kib_acceptsock_t *as)
617 {
618         sock_release(as->ibas_sock);
619         PORTAL_FREE(as, sizeof(*as));
620 }
621
622 int
623 kibnal_ip_listener(void *arg)
624 {
625         struct sockaddr_in addr;
626         wait_queue_t       wait;
627         struct socket     *sock;
628         kib_acceptsock_t  *as;
629         int                port;
630         char               name[16];
631         int                rc;
632         unsigned long      flags;
633
634         /* Parent thread holds kib_nid_mutex, and is, or is about to
635          * block on kib_listener_signal */
636
637         port = kibnal_tunables.kib_port;
638         snprintf(name, sizeof(name), "kibnal_lstn%03d", port);
639         kportal_daemonize(name);
640         kportal_blockallsigs();
641
642         init_waitqueue_entry(&wait, current);
643
644         rc = kibnal_create_sock(&sock);
645         if (rc != 0)
646                 goto out_0;
647
648         memset(&addr, 0, sizeof(addr));
649         addr.sin_family      = AF_INET;
650         addr.sin_port        = htons(port);
651         addr.sin_addr.s_addr = INADDR_ANY;
652
653         rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
654         if (rc != 0) {
655                 CERROR("Can't bind to port %d\n", port);
656                 goto out_1;
657         }
658
659         rc = sock->ops->listen(sock, kibnal_tunables.kib_backlog);
660         if (rc != 0) {
661                 CERROR("Can't set listen backlog %d: %d\n", 
662                        kibnal_tunables.kib_backlog, rc);
663                 goto out_1;
664         }
665
666         LASSERT (kibnal_data.kib_listener_sock == NULL);
667         kibnal_data.kib_listener_sock = sock;
668
669         /* unblock waiting parent */
670         LASSERT (kibnal_data.kib_listener_shutdown == 0);
671         up(&kibnal_data.kib_listener_signal);
672
673         /* Wake me any time something happens on my socket */
674         add_wait_queue(sock->sk->sk_sleep, &wait);
675         as = NULL;
676
677         while (kibnal_data.kib_listener_shutdown == 0) {
678
679                 if (as == NULL) {
680                         PORTAL_ALLOC(as, sizeof(*as));
681                         if (as == NULL) {
682                                 CERROR("Out of Memory: pausing...\n");
683                                 kibnal_pause(HZ);
684                                 continue;
685                         }
686                         as->ibas_sock = NULL;
687                 }
688
689                 if (as->ibas_sock == NULL) {
690                         as->ibas_sock = sock_alloc();
691                         if (as->ibas_sock == NULL) {
692                                 CERROR("Can't allocate socket: pausing...\n");
693                                 kibnal_pause(HZ);
694                                 continue;
695                         }
696                         /* XXX this should add a ref to sock->ops->owner, if
697                          * TCP could be a module */
698                         as->ibas_sock->type = sock->type;
699                         as->ibas_sock->ops = sock->ops;
700                 }
701                 
702                 set_current_state(TASK_INTERRUPTIBLE);
703
704                 rc = sock->ops->accept(sock, as->ibas_sock, O_NONBLOCK);
705
706                 /* Sleep for socket activity? */
707                 if (rc == -EAGAIN &&
708                     kibnal_data.kib_listener_shutdown == 0)
709                         schedule();
710
711                 set_current_state(TASK_RUNNING);
712
713                 if (rc == 0) {
714                         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
715                         
716                         list_add_tail(&as->ibas_list, 
717                                       &kibnal_data.kib_connd_acceptq);
718
719                         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
720                         wake_up(&kibnal_data.kib_connd_waitq);
721
722                         as = NULL;
723                         continue;
724                 }
725                 
726                 if (rc != -EAGAIN) {
727                         CERROR("Accept failed: %d, pausing...\n", rc);
728                         kibnal_pause(HZ);
729                 }
730         }
731
732         if (as != NULL) {
733                 if (as->ibas_sock != NULL)
734                         sock_release(as->ibas_sock);
735                 PORTAL_FREE(as, sizeof(*as));
736         }
737
738         rc = 0;
739         remove_wait_queue(sock->sk->sk_sleep, &wait);
740  out_1:
741         sock_release(sock);
742         kibnal_data.kib_listener_sock = NULL;
743  out_0:
744         /* set completion status and unblock thread waiting for me 
745          * (parent on startup failure, executioner on normal shutdown) */
746         kibnal_data.kib_listener_shutdown = rc;
747         up(&kibnal_data.kib_listener_signal);
748
749         return 0;
750 }
751
752 int
753 kibnal_start_ip_listener (void)
754 {
755         long           pid;
756         int            rc;
757
758         CDEBUG(D_NET, "Starting listener\n");
759
760         /* Called holding kib_nid_mutex: listener stopped */
761         LASSERT (kibnal_data.kib_listener_sock == NULL);
762
763         kibnal_data.kib_listener_shutdown = 0;
764         pid = kernel_thread(kibnal_ip_listener, NULL, 0);
765         if (pid < 0) {
766                 CERROR("Can't spawn listener: %ld\n", pid);
767                 return (int)pid;
768         }
769
770         /* Block until listener has started up. */
771         down(&kibnal_data.kib_listener_signal);
772
773         rc = kibnal_data.kib_listener_shutdown;
774         LASSERT ((rc != 0) == (kibnal_data.kib_listener_sock == NULL));
775
776         CDEBUG((rc == 0) ? D_WARNING : D_ERROR, 
777                "Listener %s: pid:%ld port:%d backlog:%d\n", 
778                (rc == 0) ? "started OK" : "startup failed",
779                pid, kibnal_tunables.kib_port, kibnal_tunables.kib_backlog);
780
781         return rc;
782 }
783
784 void
785 kibnal_stop_ip_listener(int clear_acceptq)
786 {
787         struct list_head  zombie_accepts;
788         kib_acceptsock_t *as;
789         unsigned long     flags;
790
791         CDEBUG(D_NET, "Stopping listener\n");
792
793         /* Called holding kib_nid_mutex: listener running */
794         LASSERT (kibnal_data.kib_listener_sock != NULL);
795
796         kibnal_data.kib_listener_shutdown = 1;
797         wake_up_all(kibnal_data.kib_listener_sock->sk->sk_sleep);
798
799         /* Block until listener has torn down. */
800         down(&kibnal_data.kib_listener_signal);
801
802         LASSERT (kibnal_data.kib_listener_sock == NULL);
803         CWARN("Listener stopped\n");
804
805         if (!clear_acceptq)
806                 return;
807
808         /* Close any unhandled accepts */
809         spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);
810
811         list_add(&zombie_accepts, &kibnal_data.kib_connd_acceptq);
812         list_del_init(&kibnal_data.kib_connd_acceptq);
813
814         spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);
815         
816         while (!list_empty(&zombie_accepts)) {
817                 as = list_entry(zombie_accepts.next,
818                                 kib_acceptsock_t, ibas_list);
819                 list_del(&as->ibas_list);
820                 kibnal_free_acceptsock(as);
821         }
822 }
823
824 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,8)
825 int 
826 kibnal_listener_procint(ctl_table *table, int write, struct file *filp,
827                         void *buffer, size_t *lenp)
828 #else
829 int 
830 kibnal_listener_procint(ctl_table *table, int write, struct file *filp,
831                         void *buffer, size_t *lenp, loff_t *ppos)
832 #endif
833 {
834         int   *tunable = (int *)table->data;
835         int    old_val;
836         int    rc;
837
838         /* No race with nal initialisation since the nal is setup all the time
839          * it's loaded.  When that changes, change this! */
840         LASSERT (kibnal_data.kib_init == IBNAL_INIT_ALL);
841
842         down(&kibnal_data.kib_nid_mutex);
843
844         LASSERT (tunable == &kibnal_tunables.kib_port ||
845                  tunable == &kibnal_tunables.kib_backlog);
846         old_val = *tunable;
847
848 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,8)
849         rc = proc_dointvec(table, write, filp, buffer, lenp);
850 #else
851         rc = proc_dointvec(table, write, filp, buffer, lenp, ppos);
852 #endif
853         if (write &&
854             (*tunable != old_val ||
855              kibnal_data.kib_listener_sock == NULL)) {
856
857                 if (kibnal_data.kib_listener_sock != NULL)
858                         kibnal_stop_ip_listener(0);
859
860                 rc = kibnal_start_ip_listener();
861                 if (rc != 0) {
862                         CERROR("Unable to restart listener with new tunable:"
863                                " reverting to old value\n");
864                         *tunable = old_val;
865                         kibnal_start_ip_listener();
866                 }
867         }
868
869         up(&kibnal_data.kib_nid_mutex);
870
871         LASSERT (kibnal_data.kib_init == IBNAL_INIT_ALL);
872         return rc;
873 }
874
875 int
876 kibnal_start_ib_listener (void) 
877 {
878         int    rc;
879
880         LASSERT (kibnal_data.kib_listen_handle == NULL);
881
882         kibnal_data.kib_svc_id = ib_cm_service_assign();
883         CDEBUG(D_NET, "svc id "LPX64"\n", kibnal_data.kib_svc_id);
884
885         rc = ib_cached_gid_get(kibnal_data.kib_device,
886                                kibnal_data.kib_port, 0,
887                                kibnal_data.kib_svc_gid);
888         if (rc != 0) {
889                 CERROR("Can't get port %d GID: %d\n",
890                        kibnal_data.kib_port, rc);
891                 return rc;
892         }
893         
894         rc = ib_cached_pkey_get(kibnal_data.kib_device,
895                                 kibnal_data.kib_port, 0,
896                                 &kibnal_data.kib_svc_pkey);
897         if (rc != 0) {
898                 CERROR ("Can't get port %d PKEY: %d\n",
899                         kibnal_data.kib_port, rc);
900                 return rc;
901         }
902
903         rc = ib_cm_listen(kibnal_data.kib_svc_id,
904                           TS_IB_CM_SERVICE_EXACT_MASK,
905                           kibnal_passive_conn_callback, NULL,
906                           &kibnal_data.kib_listen_handle);
907         if (rc != 0) {
908                 kibnal_data.kib_listen_handle = NULL;
909                 CERROR ("Can't create IB listener: %d\n", rc);
910                 return rc;
911         }
912         
913         LASSERT (kibnal_data.kib_listen_handle != NULL);
914         return 0;
915 }
916
917 void
918 kibnal_stop_ib_listener (void) 
919 {
920         int    rc;
921         
922         LASSERT (kibnal_data.kib_listen_handle != NULL);
923
924         rc = ib_cm_listen_stop (kibnal_data.kib_listen_handle);
925         if (rc != 0)
926                 CERROR("Error stopping IB listener: %d\n", rc);
927                 
928         kibnal_data.kib_listen_handle = NULL;
929 }
930
931 int
932 kibnal_set_mynid (ptl_nid_t nid)
933 {
934         lib_ni_t         *ni = &kibnal_lib.libnal_ni;
935         int               rc;
936
937         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
938                nid, ni->ni_pid.nid);
939
940         down (&kibnal_data.kib_nid_mutex);
941
942         if (nid == kibnal_data.kib_nid) {
943                 /* no change of NID */
944                 up (&kibnal_data.kib_nid_mutex);
945                 return (0);
946         }
947
948         CDEBUG(D_NET, "NID "LPX64"("LPX64")\n",
949                kibnal_data.kib_nid, nid);
950
951         if (kibnal_data.kib_listener_sock != NULL)
952                 kibnal_stop_ip_listener(1);
953         
954         if (kibnal_data.kib_listen_handle != NULL)
955                 kibnal_stop_ib_listener();
956
957         ni->ni_pid.nid = nid;
958         kibnal_data.kib_incarnation++;
959         mb();
960         /* Delete all existing peers and their connections after new
961          * NID/incarnation set to ensure no old connections in our brave new
962          * world. */
963         kibnal_del_peer (PTL_NID_ANY, 0);
964
965         if (ni->ni_pid.nid != PTL_NID_ANY) {
966                 /* got a new NID to install */
967                 rc = kibnal_start_ib_listener();
968                 if (rc != 0) {
969                         CERROR("Can't start IB listener: %d\n", rc);
970                         goto failed_0;
971                 }
972         
973                 rc = kibnal_start_ip_listener();
974                 if (rc != 0) {
975                         CERROR("Can't start IP listener: %d\n", rc);
976                         goto failed_1;
977                 }
978         }
979         
980         up(&kibnal_data.kib_nid_mutex);
981         return 0;
982
983  failed_1:
984         kibnal_stop_ib_listener();
985  failed_0:
986         ni->ni_pid.nid = PTL_NID_ANY;
987         kibnal_data.kib_incarnation++;
988         mb();
989         kibnal_del_peer (PTL_NID_ANY, 0);
990         up(&kibnal_data.kib_nid_mutex);
991         return rc;
992 }
993
994 kib_peer_t *
995 kibnal_create_peer (ptl_nid_t nid)
996 {
997         kib_peer_t *peer;
998
999         LASSERT (nid != PTL_NID_ANY);
1000
1001         PORTAL_ALLOC (peer, sizeof (*peer));
1002         if (peer == NULL)
1003                 return (NULL);
1004
1005         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
1006
1007         peer->ibp_nid = nid;
1008         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
1009
1010         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
1011         INIT_LIST_HEAD (&peer->ibp_conns);
1012         INIT_LIST_HEAD (&peer->ibp_tx_queue);
1013         INIT_LIST_HEAD (&peer->ibp_connd_list); /* not queued for connecting */
1014
1015         peer->ibp_reconnect_time = jiffies;
1016         peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
1017
1018         atomic_inc (&kibnal_data.kib_npeers);
1019         CDEBUG(D_NET, "peer %p "LPX64"\n", peer, nid);
1020
1021         return (peer);
1022 }
1023
1024 void
1025 kibnal_destroy_peer (kib_peer_t *peer)
1026 {
1027         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ibp_nid, peer);
1028
1029         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
1030         LASSERT (peer->ibp_persistence == 0);
1031         LASSERT (!kibnal_peer_active(peer));
1032         LASSERT (peer->ibp_connecting == 0);
1033         LASSERT (list_empty (&peer->ibp_connd_list));
1034         LASSERT (list_empty (&peer->ibp_conns));
1035         LASSERT (list_empty (&peer->ibp_tx_queue));
1036
1037         PORTAL_FREE (peer, sizeof (*peer));
1038
1039         /* NB a peer's connections keep a reference on their peer until
1040          * they are destroyed, so we can be assured that _all_ state to do
1041          * with this peer has been cleaned up when its refcount drops to
1042          * zero. */
1043         atomic_dec (&kibnal_data.kib_npeers);
1044 }
1045
1046 void
1047 kibnal_put_peer (kib_peer_t *peer)
1048 {
1049         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
1050                 peer, peer->ibp_nid,
1051                 atomic_read (&peer->ibp_refcount));
1052
1053         LASSERT (atomic_read (&peer->ibp_refcount) > 0);
1054         if (!atomic_dec_and_test (&peer->ibp_refcount))
1055                 return;
1056
1057         kibnal_destroy_peer (peer);
1058 }
1059
1060 kib_peer_t *
1061 kibnal_find_peer_locked (ptl_nid_t nid)
1062 {
1063         struct list_head *peer_list = kibnal_nid2peerlist (nid);
1064         struct list_head *tmp;
1065         kib_peer_t       *peer;
1066
1067         list_for_each (tmp, peer_list) {
1068
1069                 peer = list_entry (tmp, kib_peer_t, ibp_list);
1070
1071                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
1072                          peer->ibp_connecting != 0 || /* creating conns */
1073                          !list_empty (&peer->ibp_conns));  /* active conn */
1074
1075                 if (peer->ibp_nid != nid)
1076                         continue;
1077
1078                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
1079                        peer, nid, atomic_read (&peer->ibp_refcount));
1080                 return (peer);
1081         }
1082         return (NULL);
1083 }
1084
1085 kib_peer_t *
1086 kibnal_get_peer (ptl_nid_t nid)
1087 {
1088         kib_peer_t     *peer;
1089         unsigned long   flags;
1090
1091         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1092         peer = kibnal_find_peer_locked (nid);
1093         if (peer != NULL)                       /* +1 ref for caller? */
1094                 atomic_inc (&peer->ibp_refcount);
1095         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1096
1097         return (peer);
1098 }
1099
1100 void
1101 kibnal_unlink_peer_locked (kib_peer_t *peer)
1102 {
1103         LASSERT (peer->ibp_persistence == 0);
1104         LASSERT (list_empty(&peer->ibp_conns));
1105
1106         LASSERT (kibnal_peer_active(peer));
1107         list_del_init (&peer->ibp_list);
1108         /* lose peerlist's ref */
1109         kibnal_put_peer (peer);
1110 }
1111
1112 int
1113 kibnal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp,
1114                       int *persistencep)
1115 {
1116         kib_peer_t        *peer;
1117         struct list_head  *ptmp;
1118         unsigned long      flags;
1119         int                i;
1120
1121         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1122
1123         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1124
1125                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
1126                         
1127                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1128                         LASSERT (peer->ibp_persistence != 0 ||
1129                                  peer->ibp_connecting != 0 ||
1130                                  !list_empty (&peer->ibp_conns));
1131
1132                         if (index-- > 0)
1133                                 continue;
1134
1135                         *nidp = peer->ibp_nid;
1136                         *ipp = peer->ibp_ip;
1137                         *portp = peer->ibp_port;
1138                         *persistencep = peer->ibp_persistence;
1139                         
1140                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
1141                                                flags);
1142                         return (0);
1143                 }
1144         }
1145
1146         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1147         return (-ENOENT);
1148 }
1149
1150 int
1151 kibnal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port)
1152 {
1153         unsigned long      flags;
1154         kib_peer_t        *peer;
1155         kib_peer_t        *peer2;
1156         
1157         if (nid == PTL_NID_ANY)
1158                 return (-EINVAL);
1159
1160         peer = kibnal_create_peer (nid);
1161         if (peer == NULL)
1162                 return (-ENOMEM);
1163
1164         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1165
1166         peer2 = kibnal_find_peer_locked (nid);
1167         if (peer2 != NULL) {
1168                 kibnal_put_peer (peer);
1169                 peer = peer2;
1170         } else {
1171                 /* peer table takes existing ref on peer */
1172                 list_add_tail (&peer->ibp_list,
1173                                kibnal_nid2peerlist (nid));
1174         }
1175
1176         peer->ibp_ip = ip;
1177         peer->ibp_port = port;
1178         peer->ibp_persistence++;
1179         
1180         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1181         return (0);
1182 }
1183
1184 void
1185 kibnal_del_peer_locked (kib_peer_t *peer, int single_share)
1186 {
1187         struct list_head *ctmp;
1188         struct list_head *cnxt;
1189         kib_conn_t       *conn;
1190
1191         if (!single_share)
1192                 peer->ibp_persistence = 0;
1193         else if (peer->ibp_persistence > 0)
1194                 peer->ibp_persistence--;
1195
1196         if (peer->ibp_persistence != 0)
1197                 return;
1198
1199         if (list_empty(&peer->ibp_conns)) {
1200                 kibnal_unlink_peer_locked(peer);
1201         } else {
1202                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1203                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
1204
1205                         kibnal_close_conn_locked (conn, 0);
1206                 }
1207                 /* NB peer is no longer persistent; closing its last conn
1208                  * unlinked it. */
1209         }
1210         /* NB peer now unlinked; might even be freed if the peer table had the
1211          * last ref on it. */
1212 }
1213
1214 int
1215 kibnal_del_peer (ptl_nid_t nid, int single_share)
1216 {
1217         unsigned long      flags;
1218         struct list_head  *ptmp;
1219         struct list_head  *pnxt;
1220         kib_peer_t        *peer;
1221         int                lo;
1222         int                hi;
1223         int                i;
1224         int                rc = -ENOENT;
1225
1226         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1227
1228         if (nid != PTL_NID_ANY)
1229                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1230         else {
1231                 lo = 0;
1232                 hi = kibnal_data.kib_peer_hash_size - 1;
1233         }
1234
1235         for (i = lo; i <= hi; i++) {
1236                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1237                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1238                         LASSERT (peer->ibp_persistence != 0 ||
1239                                  peer->ibp_connecting != 0 ||
1240                                  !list_empty (&peer->ibp_conns));
1241
1242                         if (!(nid == PTL_NID_ANY || peer->ibp_nid == nid))
1243                                 continue;
1244
1245                         kibnal_del_peer_locked (peer, single_share);
1246                         rc = 0;         /* matched something */
1247
1248                         if (single_share)
1249                                 goto out;
1250                 }
1251         }
1252  out:
1253         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1254
1255         return (rc);
1256 }
1257
1258 kib_conn_t *
1259 kibnal_get_conn_by_idx (int index)
1260 {
1261         kib_peer_t        *peer;
1262         struct list_head  *ptmp;
1263         kib_conn_t        *conn;
1264         struct list_head  *ctmp;
1265         unsigned long      flags;
1266         int                i;
1267
1268         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1269
1270         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1271                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
1272
1273                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1274                         LASSERT (peer->ibp_persistence > 0 ||
1275                                  peer->ibp_connecting != 0 ||
1276                                  !list_empty (&peer->ibp_conns));
1277
1278                         list_for_each (ctmp, &peer->ibp_conns) {
1279                                 if (index-- > 0)
1280                                         continue;
1281
1282                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1283                                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1284                                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1285                                        atomic_read (&conn->ibc_refcount));
1286                                 atomic_inc (&conn->ibc_refcount);
1287                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
1288                                                        flags);
1289                                 return (conn);
1290                         }
1291                 }
1292         }
1293
1294         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1295         return (NULL);
1296 }
1297
1298 kib_conn_t *
1299 kibnal_create_conn (void)
1300 {
1301         kib_conn_t  *conn;
1302         int          i;
1303         __u64        vaddr = 0;
1304         __u64        vaddr_base;
1305         int          page_offset;
1306         int          ipage;
1307         int          rc;
1308         union {
1309                 struct ib_qp_create_param  qp_create;
1310                 struct ib_qp_attribute     qp_attr;
1311         } params;
1312         
1313         PORTAL_ALLOC (conn, sizeof (*conn));
1314         if (conn == NULL) {
1315                 CERROR ("Can't allocate connection\n");
1316                 return (NULL);
1317         }
1318
1319         /* zero flags, NULL pointers etc... */
1320         memset (conn, 0, sizeof (*conn));
1321
1322         INIT_LIST_HEAD (&conn->ibc_tx_queue);
1323         INIT_LIST_HEAD (&conn->ibc_active_txs);
1324         spin_lock_init (&conn->ibc_lock);
1325         
1326         atomic_inc (&kibnal_data.kib_nconns);
1327         /* well not really, but I call destroy() on failure, which decrements */
1328
1329         PORTAL_ALLOC (conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
1330         if (conn->ibc_rxs == NULL)
1331                 goto failed;
1332         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
1333
1334         rc = kibnal_alloc_pages(&conn->ibc_rx_pages,
1335                                 IBNAL_RX_MSG_PAGES,
1336                                 IB_ACCESS_LOCAL_WRITE);
1337         if (rc != 0)
1338                 goto failed;
1339
1340         vaddr_base = vaddr = conn->ibc_rx_pages->ibp_vaddr;
1341
1342         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
1343                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
1344                 kib_rx_t   *rx = &conn->ibc_rxs[i];
1345
1346                 rx->rx_conn = conn;
1347                 rx->rx_vaddr = vaddr;
1348                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
1349                 
1350                 vaddr += IBNAL_MSG_SIZE;
1351                 LASSERT (vaddr <= vaddr_base + IBNAL_RX_MSG_BYTES);
1352                 
1353                 page_offset += IBNAL_MSG_SIZE;
1354                 LASSERT (page_offset <= PAGE_SIZE);
1355
1356                 if (page_offset == PAGE_SIZE) {
1357                         page_offset = 0;
1358                         ipage++;
1359                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
1360                 }
1361         }
1362
1363         params.qp_create = (struct ib_qp_create_param) {
1364                 .limit = {
1365                         /* Sends have an optional RDMA */
1366                         .max_outstanding_send_request    = 2 * IBNAL_MSG_QUEUE_SIZE,
1367                         .max_outstanding_receive_request = IBNAL_MSG_QUEUE_SIZE,
1368                         .max_send_gather_element         = 1,
1369                         .max_receive_scatter_element     = 1,
1370                 },
1371                 .pd              = kibnal_data.kib_pd,
1372                 .send_queue      = kibnal_data.kib_cq,
1373                 .receive_queue   = kibnal_data.kib_cq,
1374                 .send_policy     = IB_WQ_SIGNAL_SELECTABLE,
1375                 .receive_policy  = IB_WQ_SIGNAL_SELECTABLE,
1376                 .rd_domain       = 0,
1377                 .transport       = IB_TRANSPORT_RC,
1378                 .device_specific = NULL,
1379         };
1380         
1381         rc = ib_qp_create (&params.qp_create, &conn->ibc_qp, &conn->ibc_qpn);
1382         if (rc != 0) {
1383                 CERROR ("Failed to create queue pair: %d\n", rc);
1384                 goto failed;
1385         }
1386         
1387         /* Mark QP created */
1388         conn->ibc_state = IBNAL_CONN_INIT_QP;
1389
1390         params.qp_attr = (struct ib_qp_attribute) {
1391                 .state             = IB_QP_STATE_INIT,
1392                 .port              = kibnal_data.kib_port,
1393                 .enable_rdma_read  = 1,
1394                 .enable_rdma_write = 1,
1395                 .valid_fields      = (IB_QP_ATTRIBUTE_STATE |
1396                                       IB_QP_ATTRIBUTE_PORT |
1397                                       IB_QP_ATTRIBUTE_PKEY_INDEX |
1398                                       IB_QP_ATTRIBUTE_RDMA_ATOMIC_ENABLE),
1399         };
1400         rc = ib_qp_modify(conn->ibc_qp, &params.qp_attr);
1401         if (rc != 0) {
1402                 CERROR ("Failed to modify queue pair: %d\n", rc);
1403                 goto failed;
1404         }
1405
1406         /* 1 ref for caller */
1407         atomic_set (&conn->ibc_refcount, 1);
1408         return (conn);
1409         
1410  failed:
1411         kibnal_destroy_conn (conn);
1412         return (NULL);
1413 }
1414
1415 void
1416 kibnal_destroy_conn (kib_conn_t *conn)
1417 {
1418         int    rc;
1419         
1420         CDEBUG (D_NET, "connection %p\n", conn);
1421
1422         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1423         LASSERT (list_empty(&conn->ibc_tx_queue));
1424         LASSERT (list_empty(&conn->ibc_active_txs));
1425         LASSERT (conn->ibc_nsends_posted == 0);
1426         LASSERT (conn->ibc_connreq == NULL);
1427
1428         switch (conn->ibc_state) {
1429         case IBNAL_CONN_ZOMBIE:
1430                 /* called after connection sequence initiated */
1431
1432         case IBNAL_CONN_INIT_QP:
1433                 rc = ib_qp_destroy(conn->ibc_qp);
1434                 if (rc != 0)
1435                         CERROR("Can't destroy QP: %d\n", rc);
1436                 /* fall through */
1437                 
1438         case IBNAL_CONN_INIT_NOTHING:
1439                 break;
1440
1441         default:
1442                 LASSERT (0);
1443         }
1444
1445         if (conn->ibc_rx_pages != NULL) 
1446                 kibnal_free_pages(conn->ibc_rx_pages);
1447         
1448         if (conn->ibc_rxs != NULL)
1449                 PORTAL_FREE(conn->ibc_rxs, 
1450                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1451
1452         if (conn->ibc_peer != NULL)
1453                 kibnal_put_peer(conn->ibc_peer);
1454
1455         PORTAL_FREE(conn, sizeof (*conn));
1456
1457         atomic_dec(&kibnal_data.kib_nconns);
1458         
1459         if (atomic_read (&kibnal_data.kib_nconns) == 0 &&
1460             kibnal_data.kib_shutdown) {
1461                 /* I just nuked the last connection on shutdown; wake up
1462                  * everyone so they can exit. */
1463                 wake_up_all(&kibnal_data.kib_sched_waitq);
1464                 wake_up_all(&kibnal_data.kib_reaper_waitq);
1465         }
1466 }
1467
1468 void
1469 kibnal_put_conn (kib_conn_t *conn)
1470 {
1471         unsigned long flags;
1472
1473         CDEBUG (D_NET, "putting conn[%p] state %d -> "LPX64" (%d)\n",
1474                 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1475                 atomic_read (&conn->ibc_refcount));
1476
1477         LASSERT (atomic_read (&conn->ibc_refcount) > 0);
1478         if (!atomic_dec_and_test (&conn->ibc_refcount))
1479                 return;
1480
1481         /* last ref only goes on zombies */
1482         LASSERT (conn->ibc_state == IBNAL_CONN_ZOMBIE);
1483
1484         spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags);
1485
1486         list_add (&conn->ibc_list, &kibnal_data.kib_reaper_conns);
1487         wake_up (&kibnal_data.kib_reaper_waitq);
1488
1489         spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags);
1490 }
1491
1492 int
1493 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1494 {
1495         kib_conn_t         *conn;
1496         struct list_head   *ctmp;
1497         struct list_head   *cnxt;
1498         int                 count = 0;
1499
1500         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1501                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1502
1503                 count++;
1504                 kibnal_close_conn_locked (conn, why);
1505         }
1506
1507         return (count);
1508 }
1509
1510 int
1511 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1512 {
1513         kib_conn_t         *conn;
1514         struct list_head   *ctmp;
1515         struct list_head   *cnxt;
1516         int                 count = 0;
1517
1518         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1519                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1520
1521                 if (conn->ibc_incarnation == incarnation)
1522                         continue;
1523
1524                 CDEBUG(D_NET, "Closing stale conn %p nid:"LPX64
1525                        " incarnation:"LPX64"("LPX64")\n", conn,
1526                        peer->ibp_nid, conn->ibc_incarnation, incarnation);
1527                 
1528                 count++;
1529                 kibnal_close_conn_locked (conn, -ESTALE);
1530         }
1531
1532         return (count);
1533 }
1534
1535 int
1536 kibnal_close_matching_conns (ptl_nid_t nid)
1537 {
1538         unsigned long       flags;
1539         kib_peer_t         *peer;
1540         struct list_head   *ptmp;
1541         struct list_head   *pnxt;
1542         int                 lo;
1543         int                 hi;
1544         int                 i;
1545         int                 count = 0;
1546
1547         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1548
1549         if (nid != PTL_NID_ANY)
1550                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1551         else {
1552                 lo = 0;
1553                 hi = kibnal_data.kib_peer_hash_size - 1;
1554         }
1555
1556         for (i = lo; i <= hi; i++) {
1557                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1558
1559                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1560                         LASSERT (peer->ibp_persistence != 0 ||
1561                                  peer->ibp_connecting != 0 ||
1562                                  !list_empty (&peer->ibp_conns));
1563
1564                         if (!(nid == PTL_NID_ANY || nid == peer->ibp_nid))
1565                                 continue;
1566
1567                         count += kibnal_close_peer_conns_locked (peer, 0);
1568                 }
1569         }
1570
1571         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1572
1573         /* wildcards always succeed */
1574         if (nid == PTL_NID_ANY)
1575                 return (0);
1576         
1577         return (count == 0 ? -ENOENT : 0);
1578 }
1579
1580 int
1581 kibnal_cmd(struct portals_cfg *pcfg, void * private)
1582 {
1583         int rc = -EINVAL;
1584
1585         LASSERT (pcfg != NULL);
1586
1587         switch(pcfg->pcfg_command) {
1588         case NAL_CMD_GET_PEER: {
1589                 ptl_nid_t   nid = 0;
1590                 __u32       ip = 0;
1591                 int         port = 0;
1592                 int         share_count = 0;
1593
1594                 rc = kibnal_get_peer_info(pcfg->pcfg_count,
1595                                           &nid, &ip, &port, &share_count);
1596                 pcfg->pcfg_nid   = nid;
1597                 pcfg->pcfg_size  = 0;
1598                 pcfg->pcfg_id    = ip;
1599                 pcfg->pcfg_misc  = port;
1600                 pcfg->pcfg_count = 0;
1601                 pcfg->pcfg_wait  = share_count;
1602                 break;
1603         }
1604         case NAL_CMD_ADD_PEER: {
1605                 rc = kibnal_add_persistent_peer (pcfg->pcfg_nid,
1606                                                  pcfg->pcfg_id, /* IP */
1607                                                  pcfg->pcfg_misc); /* port */
1608                 break;
1609         }
1610         case NAL_CMD_DEL_PEER: {
1611                 rc = kibnal_del_peer (pcfg->pcfg_nid, 
1612                                        /* flags == single_share */
1613                                        pcfg->pcfg_flags != 0);
1614                 break;
1615         }
1616         case NAL_CMD_GET_CONN: {
1617                 kib_conn_t *conn = kibnal_get_conn_by_idx (pcfg->pcfg_count);
1618
1619                 if (conn == NULL)
1620                         rc = -ENOENT;
1621                 else {
1622                         rc = 0;
1623                         pcfg->pcfg_nid   = conn->ibc_peer->ibp_nid;
1624                         pcfg->pcfg_id    = 0;
1625                         pcfg->pcfg_misc  = 0;
1626                         pcfg->pcfg_flags = 0;
1627                         kibnal_put_conn (conn);
1628                 }
1629                 break;
1630         }
1631         case NAL_CMD_CLOSE_CONNECTION: {
1632                 rc = kibnal_close_matching_conns (pcfg->pcfg_nid);
1633                 break;
1634         }
1635         case NAL_CMD_REGISTER_MYNID: {
1636                 if (pcfg->pcfg_nid == PTL_NID_ANY)
1637                         rc = -EINVAL;
1638                 else
1639                         rc = kibnal_set_mynid (pcfg->pcfg_nid);
1640                 break;
1641         }
1642         }
1643
1644         return rc;
1645 }
1646
1647 void
1648 kibnal_free_pages (kib_pages_t *p)
1649 {
1650         int     npages = p->ibp_npages;
1651         int     rc;
1652         int     i;
1653         
1654         if (p->ibp_mapped) {
1655                 rc = ib_memory_deregister(p->ibp_handle);
1656                 if (rc != 0)
1657                         CERROR ("Deregister error: %d\n", rc);
1658         }
1659         
1660         for (i = 0; i < npages; i++)
1661                 if (p->ibp_pages[i] != NULL)
1662                         __free_page(p->ibp_pages[i]);
1663         
1664         PORTAL_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1665 }
1666
1667 int
1668 kibnal_alloc_pages (kib_pages_t **pp, int npages, int access)
1669 {
1670         kib_pages_t                *p;
1671         struct ib_physical_buffer  *phys_pages;
1672         int                         i;
1673         int                         rc;
1674
1675         PORTAL_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1676         if (p == NULL) {
1677                 CERROR ("Can't allocate buffer %d\n", npages);
1678                 return (-ENOMEM);
1679         }
1680
1681         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1682         p->ibp_npages = npages;
1683         
1684         for (i = 0; i < npages; i++) {
1685                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1686                 if (p->ibp_pages[i] == NULL) {
1687                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1688                         kibnal_free_pages(p);
1689                         return (-ENOMEM);
1690                 }
1691         }
1692
1693         PORTAL_ALLOC(phys_pages, npages * sizeof(*phys_pages));
1694         if (phys_pages == NULL) {
1695                 CERROR ("Can't allocate physarray for %d pages\n", npages);
1696                 kibnal_free_pages(p);
1697                 return (-ENOMEM);
1698         }
1699
1700         for (i = 0; i < npages; i++) {
1701                 phys_pages[i].size = PAGE_SIZE;
1702                 phys_pages[i].address =
1703                         kibnal_page2phys(p->ibp_pages[i]);
1704         }
1705
1706         p->ibp_vaddr = 0;
1707         rc = ib_memory_register_physical(kibnal_data.kib_pd,
1708                                          phys_pages, npages,
1709                                          &p->ibp_vaddr,
1710                                          npages * PAGE_SIZE, 0,
1711                                          access,
1712                                          &p->ibp_handle,
1713                                          &p->ibp_lkey,
1714                                          &p->ibp_rkey);
1715         
1716         PORTAL_FREE(phys_pages, npages * sizeof(*phys_pages));
1717         
1718         if (rc != 0) {
1719                 CERROR ("Error %d mapping %d pages\n", rc, npages);
1720                 kibnal_free_pages(p);
1721                 return (rc);
1722         }
1723         
1724         p->ibp_mapped = 1;
1725         *pp = p;
1726         return (0);
1727 }
1728
1729 int
1730 kibnal_setup_tx_descs (void)
1731 {
1732         int           ipage = 0;
1733         int           page_offset = 0;
1734         __u64         vaddr;
1735         __u64         vaddr_base;
1736         struct page  *page;
1737         kib_tx_t     *tx;
1738         int           i;
1739         int           rc;
1740
1741         /* pre-mapped messages are not bigger than 1 page */
1742         LASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1743
1744         /* No fancy arithmetic when we do the buffer calculations */
1745         LASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1746
1747         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1748                                 IBNAL_TX_MSG_PAGES, 
1749                                 0);            /* local read access only */
1750         if (rc != 0)
1751                 return (rc);
1752
1753         vaddr = vaddr_base = kibnal_data.kib_tx_pages->ibp_vaddr;
1754
1755         for (i = 0; i < IBNAL_TX_MSGS; i++) {
1756                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1757                 tx = &kibnal_data.kib_tx_descs[i];
1758
1759                 memset (tx, 0, sizeof(*tx));    /* zero flags etc */
1760                 
1761                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);
1762                 tx->tx_vaddr = vaddr;
1763                 tx->tx_isnblk = (i >= IBNAL_NTX);
1764                 tx->tx_mapped = KIB_TX_UNMAPPED;
1765
1766                 CDEBUG(D_NET, "Tx[%d] %p->%p - "LPX64"\n", 
1767                        i, tx, tx->tx_msg, tx->tx_vaddr);
1768
1769                 if (tx->tx_isnblk)
1770                         list_add (&tx->tx_list, 
1771                                   &kibnal_data.kib_idle_nblk_txs);
1772                 else
1773                         list_add (&tx->tx_list, 
1774                                   &kibnal_data.kib_idle_txs);
1775
1776                 vaddr += IBNAL_MSG_SIZE;
1777                 LASSERT (vaddr <= vaddr_base + IBNAL_TX_MSG_BYTES);
1778
1779                 page_offset += IBNAL_MSG_SIZE;
1780                 LASSERT (page_offset <= PAGE_SIZE);
1781
1782                 if (page_offset == PAGE_SIZE) {
1783                         page_offset = 0;
1784                         ipage++;
1785                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES);
1786                 }
1787         }
1788         
1789         return (0);
1790 }
1791
1792 void
1793 kibnal_api_shutdown (nal_t *nal)
1794 {
1795         int   i;
1796         int   rc;
1797
1798         if (nal->nal_refct != 0) {
1799                 /* This module got the first ref */
1800                 PORTAL_MODULE_UNUSE;
1801                 return;
1802         }
1803
1804         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1805                atomic_read (&portal_kmemory));
1806
1807         LASSERT(nal == &kibnal_api);
1808
1809         switch (kibnal_data.kib_init) {
1810         default:
1811                 CERROR ("Unexpected state %d\n", kibnal_data.kib_init);
1812                 LBUG();
1813
1814         case IBNAL_INIT_ALL:
1815                 /* stop calls to nal_cmd */
1816                 libcfs_nal_cmd_unregister(OPENIBNAL);
1817                 /* No new peers */
1818
1819                 /* resetting my NID unadvertises me, removes my
1820                  * listener and nukes all current peers */
1821                 kibnal_set_mynid (PTL_NID_ANY);
1822
1823                 /* Wait for all peer state to clean up */
1824                 i = 2;
1825                 while (atomic_read (&kibnal_data.kib_npeers) != 0) {
1826                         i++;
1827                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1828                                "waiting for %d peers to close down\n",
1829                                atomic_read (&kibnal_data.kib_npeers));
1830                         set_current_state (TASK_INTERRUPTIBLE);
1831                         schedule_timeout (HZ);
1832                 }
1833                 /* fall through */
1834
1835         case IBNAL_INIT_CQ:
1836                 rc = ib_cq_destroy (kibnal_data.kib_cq);
1837                 if (rc != 0)
1838                         CERROR ("Destroy CQ error: %d\n", rc);
1839                 /* fall through */
1840
1841         case IBNAL_INIT_TXD:
1842                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1843                 /* fall through */
1844 #if IBNAL_FMR
1845         case IBNAL_INIT_FMR:
1846                 rc = ib_fmr_pool_destroy (kibnal_data.kib_fmr_pool);
1847                 if (rc != 0)
1848                         CERROR ("Destroy FMR pool error: %d\n", rc);
1849                 /* fall through */
1850 #endif
1851         case IBNAL_INIT_PD:
1852                 rc = ib_pd_destroy(kibnal_data.kib_pd);
1853                 if (rc != 0)
1854                         CERROR ("Destroy PD error: %d\n", rc);
1855                 /* fall through */
1856
1857         case IBNAL_INIT_LIB:
1858                 lib_fini(&kibnal_lib);
1859                 /* fall through */
1860
1861         case IBNAL_INIT_DATA:
1862                 /* Module refcount only gets to zero when all peers
1863                  * have been closed so all lists must be empty */
1864                 LASSERT (atomic_read (&kibnal_data.kib_npeers) == 0);
1865                 LASSERT (kibnal_data.kib_peers != NULL);
1866                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1867                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1868                 }
1869                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1870                 LASSERT (list_empty (&kibnal_data.kib_sched_rxq));
1871                 LASSERT (list_empty (&kibnal_data.kib_sched_txq));
1872                 LASSERT (list_empty (&kibnal_data.kib_reaper_conns));
1873                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1874                 LASSERT (list_empty (&kibnal_data.kib_connd_acceptq));
1875
1876                 /* flag threads to terminate; wake and wait for them to die */
1877                 kibnal_data.kib_shutdown = 1;
1878                 wake_up_all (&kibnal_data.kib_sched_waitq);
1879                 wake_up_all (&kibnal_data.kib_reaper_waitq);
1880                 wake_up_all (&kibnal_data.kib_connd_waitq);
1881
1882                 i = 2;
1883                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1884                         i++;
1885                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1886                                "Waiting for %d threads to terminate\n",
1887                                atomic_read (&kibnal_data.kib_nthreads));
1888                         set_current_state (TASK_INTERRUPTIBLE);
1889                         schedule_timeout (HZ);
1890                 }
1891                 /* fall through */
1892                 
1893         case IBNAL_INIT_NOTHING:
1894                 break;
1895         }
1896
1897         if (kibnal_data.kib_tx_descs != NULL)
1898                 PORTAL_FREE (kibnal_data.kib_tx_descs,
1899                              IBNAL_TX_MSGS * sizeof(kib_tx_t));
1900
1901         if (kibnal_data.kib_peers != NULL)
1902                 PORTAL_FREE (kibnal_data.kib_peers,
1903                              sizeof (struct list_head) * 
1904                              kibnal_data.kib_peer_hash_size);
1905
1906         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1907                atomic_read (&portal_kmemory));
1908         printk(KERN_INFO "Lustre: OpenIB NAL unloaded (final mem %d)\n",
1909                atomic_read(&portal_kmemory));
1910
1911         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1912 }
1913
1914 int
1915 kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1916                      ptl_ni_limits_t *requested_limits,
1917                      ptl_ni_limits_t *actual_limits)
1918 {
1919         struct timeval    tv;
1920         ptl_process_id_t  process_id;
1921         int               pkmem = atomic_read(&portal_kmemory);
1922         int               rc;
1923         int               i;
1924
1925         LASSERT (nal == &kibnal_api);
1926
1927         if (nal->nal_refct != 0) {
1928                 if (actual_limits != NULL)
1929                         *actual_limits = kibnal_lib.libnal_ni.ni_actual_limits;
1930                 /* This module got the first ref */
1931                 PORTAL_MODULE_USE;
1932                 return (PTL_OK);
1933         }
1934
1935         LASSERT (kibnal_data.kib_init == IBNAL_INIT_NOTHING);
1936
1937         memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */
1938
1939         do_gettimeofday(&tv);
1940         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1941
1942         init_MUTEX (&kibnal_data.kib_nid_mutex);
1943         init_MUTEX_LOCKED (&kibnal_data.kib_listener_signal);
1944
1945         rwlock_init(&kibnal_data.kib_global_lock);
1946
1947         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1948         PORTAL_ALLOC (kibnal_data.kib_peers,
1949                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1950         if (kibnal_data.kib_peers == NULL) {
1951                 goto failed;
1952         }
1953         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1954                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1955
1956         spin_lock_init (&kibnal_data.kib_reaper_lock);
1957         INIT_LIST_HEAD (&kibnal_data.kib_reaper_conns);
1958         init_waitqueue_head (&kibnal_data.kib_reaper_waitq);
1959
1960         spin_lock_init (&kibnal_data.kib_connd_lock);
1961         INIT_LIST_HEAD (&kibnal_data.kib_connd_acceptq);
1962         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1963         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1964
1965         spin_lock_init (&kibnal_data.kib_sched_lock);
1966         INIT_LIST_HEAD (&kibnal_data.kib_sched_txq);
1967         INIT_LIST_HEAD (&kibnal_data.kib_sched_rxq);
1968         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1969
1970         spin_lock_init (&kibnal_data.kib_tx_lock);
1971         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1972         INIT_LIST_HEAD (&kibnal_data.kib_idle_nblk_txs);
1973         init_waitqueue_head(&kibnal_data.kib_idle_tx_waitq);
1974
1975         PORTAL_ALLOC (kibnal_data.kib_tx_descs,
1976                       IBNAL_TX_MSGS * sizeof(kib_tx_t));
1977         if (kibnal_data.kib_tx_descs == NULL) {
1978                 CERROR ("Can't allocate tx descs\n");
1979                 goto failed;
1980         }
1981
1982         /* lists/ptrs/locks initialised */
1983         kibnal_data.kib_init = IBNAL_INIT_DATA;
1984         /*****************************************************/
1985
1986
1987         process_id.pid = requested_pid;
1988         process_id.nid = PTL_NID_ANY;           /* don't know my NID yet */
1989         
1990         rc = lib_init(&kibnal_lib, nal, process_id,
1991                       requested_limits, actual_limits);
1992         if (rc != PTL_OK) {
1993                 CERROR("lib_init failed: error %d\n", rc);
1994                 goto failed;
1995         }
1996
1997         /* lib interface initialised */
1998         kibnal_data.kib_init = IBNAL_INIT_LIB;
1999         /*****************************************************/
2000
2001         for (i = 0; i < IBNAL_N_SCHED; i++) {
2002                 rc = kibnal_thread_start (kibnal_scheduler,
2003                                           (void *)((unsigned long)i));
2004                 if (rc != 0) {
2005                         CERROR("Can't spawn openibnal scheduler[%d]: %d\n",
2006                                i, rc);
2007                         goto failed;
2008                 }
2009         }
2010
2011         for (i = 0; i < IBNAL_N_CONND; i++) {
2012                 rc = kibnal_thread_start (kibnal_connd,
2013                                           (void *)((unsigned long)i));
2014                 if (rc != 0) {
2015                         CERROR("Can't spawn openibnal connd[%d]: %d\n",
2016                                i, rc);
2017                         goto failed;
2018                 }
2019         }
2020
2021         rc = kibnal_thread_start (kibnal_reaper, NULL);
2022         if (rc != 0) {
2023                 CERROR ("Can't spawn openibnal reaper: %d\n", rc);
2024                 goto failed;
2025         }
2026
2027         kibnal_data.kib_device = ib_device_get_by_index(0);
2028         if (kibnal_data.kib_device == NULL) {
2029                 CERROR ("Can't open ib device 0\n");
2030                 goto failed;
2031         }
2032         
2033         rc = ib_device_properties_get(kibnal_data.kib_device,
2034                                       &kibnal_data.kib_device_props);
2035         if (rc != 0) {
2036                 CERROR ("Can't get device props: %d\n", rc);
2037                 goto failed;
2038         }
2039
2040         CDEBUG(D_NET, "Max Initiator: %d Max Responder %d\n", 
2041                kibnal_data.kib_device_props.max_initiator_per_qp,
2042                kibnal_data.kib_device_props.max_responder_per_qp);
2043
2044         kibnal_data.kib_port = 0;
2045         for (i = 1; i <= 2; i++) {
2046                 rc = ib_port_properties_get(kibnal_data.kib_device, i,
2047                                             &kibnal_data.kib_port_props);
2048                 if (rc == 0) {
2049                         kibnal_data.kib_port = i;
2050                         break;
2051                 }
2052         }
2053         if (kibnal_data.kib_port == 0) {
2054                 CERROR ("Can't find a port\n");
2055                 goto failed;
2056         }
2057
2058         rc = ib_pd_create(kibnal_data.kib_device,
2059                           NULL, &kibnal_data.kib_pd);
2060         if (rc != 0) {
2061                 CERROR ("Can't create PD: %d\n", rc);
2062                 goto failed;
2063         }
2064         
2065         /* flag PD initialised */
2066         kibnal_data.kib_init = IBNAL_INIT_PD;
2067         /*****************************************************/
2068 #if IBNAL_FMR
2069         {
2070                 const int pool_size = IBNAL_NTX + IBNAL_NTX_NBLK;
2071                 struct ib_fmr_pool_param params = {
2072                         .max_pages_per_fmr = PTL_MTU/PAGE_SIZE,
2073                         .access            = (IB_ACCESS_LOCAL_WRITE |
2074                                               IB_ACCESS_REMOTE_WRITE |
2075                                               IB_ACCESS_REMOTE_READ),
2076                         .pool_size         = pool_size,
2077                         .dirty_watermark   = (pool_size * 3)/4,
2078                         .flush_function    = NULL,
2079                         .flush_arg         = NULL,
2080                         .cache             = 1,
2081                 };
2082                 rc = ib_fmr_pool_create(kibnal_data.kib_pd, &params,
2083                                         &kibnal_data.kib_fmr_pool);
2084                 if (rc != 0) {
2085                         CERROR ("Can't create FMR pool size %d: %d\n", 
2086                                 pool_size, rc);
2087                         goto failed;
2088                 }
2089         }
2090
2091         /* flag FMR pool initialised */
2092         kibnal_data.kib_init = IBNAL_INIT_FMR;
2093 #endif
2094         /*****************************************************/
2095
2096         rc = kibnal_setup_tx_descs();
2097         if (rc != 0) {
2098                 CERROR ("Can't register tx descs: %d\n", rc);
2099                 goto failed;
2100         }
2101         
2102         /* flag TX descs initialised */
2103         kibnal_data.kib_init = IBNAL_INIT_TXD;
2104         /*****************************************************/
2105         
2106         {
2107                 struct ib_cq_callback callback = {
2108                         .context        = IBNAL_CALLBACK_CTXT,
2109                         .policy         = IB_CQ_PROVIDER_REARM,
2110                         .function       = {
2111                                 .entry  = kibnal_callback,
2112                         },
2113                         .arg            = NULL,
2114                 };
2115                 int  nentries = IBNAL_CQ_ENTRIES;
2116                 
2117                 rc = ib_cq_create (kibnal_data.kib_device, 
2118                                    &nentries, &callback, NULL,
2119                                    &kibnal_data.kib_cq);
2120                 if (rc != 0) {
2121                         CERROR ("Can't create CQ: %d\n", rc);
2122                         goto failed;
2123                 }
2124
2125                 /* I only want solicited events */
2126                 rc = ib_cq_request_notification(kibnal_data.kib_cq, 1);
2127                 LASSERT (rc == 0);
2128         }
2129         
2130         /* flag CQ initialised */
2131         kibnal_data.kib_init = IBNAL_INIT_CQ;
2132         /*****************************************************/
2133         
2134         rc = libcfs_nal_cmd_register(OPENIBNAL, &kibnal_cmd, NULL);
2135         if (rc != 0) {
2136                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
2137                 goto failed;
2138         }
2139
2140         /* flag everything initialised */
2141         kibnal_data.kib_init = IBNAL_INIT_ALL;
2142         /*****************************************************/
2143
2144         printk(KERN_INFO "Lustre: OpenIB NAL loaded "
2145                "(initial mem %d)\n", pkmem);
2146
2147         return (PTL_OK);
2148
2149  failed:
2150         kibnal_api_shutdown (&kibnal_api);    
2151         return (PTL_FAIL);
2152 }
2153
2154 void __exit
2155 kibnal_module_fini (void)
2156 {
2157         if (kibnal_tunables.kib_sysctl != NULL)
2158                 unregister_sysctl_table (kibnal_tunables.kib_sysctl);
2159         PtlNIFini(kibnal_ni);
2160
2161         ptl_unregister_nal(OPENIBNAL);
2162 }
2163
2164 int __init
2165 kibnal_module_init (void)
2166 {
2167         int    rc;
2168
2169         /* the following must be sizeof(int) for proc_dointvec() */
2170         LASSERT (sizeof(kibnal_tunables.kib_io_timeout) == sizeof(int));
2171         LASSERT (sizeof(kibnal_tunables.kib_listener_timeout) == sizeof(int));
2172         LASSERT (sizeof(kibnal_tunables.kib_backlog) == sizeof(int));
2173         LASSERT (sizeof(kibnal_tunables.kib_port) == sizeof(int));
2174
2175         kibnal_api.nal_ni_init = kibnal_api_startup;
2176         kibnal_api.nal_ni_fini = kibnal_api_shutdown;
2177
2178         /* Initialise dynamic tunables to defaults once only */
2179         kibnal_tunables.kib_io_timeout = IBNAL_IO_TIMEOUT;
2180         kibnal_tunables.kib_listener_timeout = IBNAL_LISTENER_TIMEOUT;
2181         kibnal_tunables.kib_backlog = IBNAL_BACKLOG;
2182         kibnal_tunables.kib_port = IBNAL_PORT;
2183
2184         rc = ptl_register_nal(OPENIBNAL, &kibnal_api);
2185         if (rc != PTL_OK) {
2186                 CERROR("Can't register IBNAL: %d\n", rc);
2187                 return (-ENOMEM);               /* or something... */
2188         }
2189
2190         /* Pure gateways want the NAL started up at module load time... */
2191         rc = PtlNIInit(OPENIBNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kibnal_ni);
2192         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
2193                 ptl_unregister_nal(OPENIBNAL);
2194                 return (-ENODEV);
2195         }
2196         
2197         kibnal_tunables.kib_sysctl = 
2198                 register_sysctl_table (kibnal_top_ctl_table, 0);
2199         if (kibnal_tunables.kib_sysctl == NULL) {
2200                 CERROR("Can't register sysctl table\n");
2201                 PtlNIFini(kibnal_ni);
2202                 ptl_unregister_nal(OPENIBNAL);
2203                 return (-ENOMEM);
2204         }
2205
2206         return (0);
2207 }
2208
2209 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2210 MODULE_DESCRIPTION("Kernel OpenIB NAL v0.01");
2211 MODULE_LICENSE("GPL");
2212
2213 module_init(kibnal_module_init);
2214 module_exit(kibnal_module_fini);
2215