Whamcloud - gitweb
* ranal changes in response to Igor's changes to the RapidArray
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23 #include "ranal.h"
24
25
26 nal_t                   kranal_api;
27 ptl_handle_ni_t         kranal_ni;
28 kra_data_t              kranal_data;
29 kra_tunables_t          kranal_tunables;
30
31 #ifdef CONFIG_SYSCTL
32 #define RANAL_SYSCTL_TIMEOUT           1
33 #define RANAL_SYSCTL_LISTENER_TIMEOUT  2
34 #define RANAL_SYSCTL_BACKLOG           3
35 #define RANAL_SYSCTL_PORT              4
36 #define RANAL_SYSCTL_MAX_IMMEDIATE     5
37
38 #define RANAL_SYSCTL                   202
39
40 static ctl_table kranal_ctl_table[] = {
41         {RANAL_SYSCTL_TIMEOUT, "timeout", 
42          &kranal_tunables.kra_timeout, sizeof(int),
43          0644, NULL, &proc_dointvec},
44         {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", 
45          &kranal_tunables.kra_listener_timeout, sizeof(int),
46          0644, NULL, &proc_dointvec},
47         {RANAL_SYSCTL_BACKLOG, "backlog",
48          &kranal_tunables.kra_backlog, sizeof(int),
49          0644, NULL, kranal_listener_procint},
50         {RANAL_SYSCTL_PORT, "port",
51          &kranal_tunables.kra_port, sizeof(int),
52          0644, NULL, kranal_listener_procint},
53         {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", 
54          &kranal_tunables.kra_max_immediate, sizeof(int),
55          0644, NULL, &proc_dointvec},
56         { 0 }
57 };
58
59 static ctl_table kranal_top_ctl_table[] = {
60         {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table},
61         { 0 }
62 };
63 #endif
64
65 int
66 kranal_sock_write (struct socket *sock, void *buffer, int nob)
67 {
68         int           rc;
69         mm_segment_t  oldmm = get_fs();
70         struct iovec  iov = {
71                 .iov_base = buffer,
72                 .iov_len  = nob
73         };
74         struct msghdr msg = {
75                 .msg_name       = NULL,
76                 .msg_namelen    = 0,
77                 .msg_iov        = &iov,
78                 .msg_iovlen     = 1,
79                 .msg_control    = NULL,
80                 .msg_controllen = 0,
81                 .msg_flags      = MSG_DONTWAIT
82         };
83
84         /* We've set up the socket's send buffer to be large enough for
85          * everything we send, so a single non-blocking send should
86          * complete without error. */
87
88         set_fs(KERNEL_DS);
89         rc = sock_sendmsg(sock, &msg, iov.iov_len);
90         set_fs(oldmm);
91
92         return rc;
93 }
94
95 int
96 kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
97 {
98         int            rc;
99         mm_segment_t   oldmm = get_fs();
100         long           ticks = timeout * HZ;
101         unsigned long  then;
102         struct timeval tv;
103
104         LASSERT (nob > 0);
105         LASSERT (ticks > 0);
106
107         for (;;) {
108                 struct iovec  iov = {
109                         .iov_base = buffer,
110                         .iov_len  = nob
111                 };
112                 struct msghdr msg = {
113                         .msg_name       = NULL,
114                         .msg_namelen    = 0,
115                         .msg_iov        = &iov,
116                         .msg_iovlen     = 1,
117                         .msg_control    = NULL,
118                         .msg_controllen = 0,
119                         .msg_flags      = 0
120                 };
121
122                 /* Set receive timeout to remaining time */
123                 tv = (struct timeval) {
124                         .tv_sec = ticks / HZ,
125                         .tv_usec = ((ticks % HZ) * 1000000) / HZ
126                 };
127                 set_fs(KERNEL_DS);
128                 rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
129                                      (char *)&tv, sizeof(tv));
130                 set_fs(oldmm);
131                 if (rc != 0) {
132                         CERROR("Can't set socket recv timeout %d: %d\n",
133                                timeout, rc);
134                         return rc;
135                 }
136
137                 set_fs(KERNEL_DS);
138                 then = jiffies;
139                 rc = sock_recvmsg(sock, &msg, iov.iov_len, 0);
140                 ticks -= jiffies - then;
141                 set_fs(oldmm);
142
143                 if (rc < 0)
144                         return rc;
145
146                 if (rc == 0)
147                         return -ECONNABORTED;
148
149                 buffer = ((char *)buffer) + rc;
150                 nob -= rc;
151
152                 if (nob == 0)
153                         return 0;
154
155                 if (ticks <= 0)
156                         return -ETIMEDOUT;
157         }
158 }
159
160 int
161 kranal_create_sock(struct socket **sockp)
162 {
163         struct socket       *sock;
164         int                  rc;
165         int                  option;
166         mm_segment_t         oldmm = get_fs();
167
168         rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock);
169         if (rc != 0) {
170                 CERROR("Can't create socket: %d\n", rc);
171                 return rc;
172         }
173
174         /* Ensure sending connection info doesn't block */
175         option = 2 * sizeof(kra_connreq_t);
176         set_fs(KERNEL_DS);
177         rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
178                              (char *)&option, sizeof(option));
179         set_fs(oldmm);
180         if (rc != 0) {
181                 CERROR("Can't set send buffer %d: %d\n", option, rc);
182                 goto failed;
183         }
184
185         option = 1;
186         set_fs(KERNEL_DS);
187         rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
188                              (char *)&option, sizeof(option));
189         set_fs(oldmm);
190         if (rc != 0) {
191                 CERROR("Can't set SO_REUSEADDR: %d\n", rc);
192                 goto failed;
193         }
194
195         *sockp = sock;
196         return 0;
197
198  failed:
199         sock_release(sock);
200         return rc;
201 }
202
203 void
204 kranal_pause(int ticks)
205 {
206         set_current_state(TASK_UNINTERRUPTIBLE);
207         schedule_timeout(ticks);
208 }
209
210 void
211 kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
212 {
213         RAP_RETURN   rrc;
214
215         memset(connreq, 0, sizeof(*connreq));
216
217         connreq->racr_magic     = RANAL_MSG_MAGIC;
218         connreq->racr_version   = RANAL_MSG_VERSION;
219         connreq->racr_devid     = conn->rac_device->rad_id;
220         connreq->racr_srcnid    = kranal_lib.libnal_ni.ni_pid.nid;
221         connreq->racr_dstnid    = conn->rac_peer->rap_nid;
222         connreq->racr_peerstamp = kranal_data.kra_peerstamp;
223         connreq->racr_connstamp = conn->rac_my_connstamp;
224         connreq->racr_timeout   = conn->rac_timeout;
225
226         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
227         LASSERT(rrc == RAP_SUCCESS);
228 }
229
230 int
231 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
232 {
233         int         rc;
234
235         rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
236         if (rc != 0) {
237                 CERROR("Read failed: %d\n", rc);
238                 return rc;
239         }
240
241         if (connreq->racr_magic != RANAL_MSG_MAGIC) {
242                 if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) {
243                         CERROR("Unexpected magic %08x\n", connreq->racr_magic);
244                         return -EPROTO;
245                 }
246
247                 __swab32s(&connreq->racr_magic);
248                 __swab16s(&connreq->racr_version);
249                 __swab16s(&connreq->racr_devid);
250                 __swab64s(&connreq->racr_srcnid);
251                 __swab64s(&connreq->racr_dstnid);
252                 __swab64s(&connreq->racr_peerstamp);
253                 __swab64s(&connreq->racr_connstamp);
254                 __swab32s(&connreq->racr_timeout);
255
256                 __swab32s(&connreq->racr_riparams.HostId);
257                 __swab32s(&connreq->racr_riparams.FmaDomainHndl);
258                 __swab32s(&connreq->racr_riparams.PTag);
259                 __swab32s(&connreq->racr_riparams.CompletionCookie);
260         }
261
262         if (connreq->racr_version != RANAL_MSG_VERSION) {
263                 CERROR("Unexpected version %d\n", connreq->racr_version);
264                 return -EPROTO;
265         }
266
267         if (connreq->racr_srcnid == PTL_NID_ANY ||
268             connreq->racr_dstnid == PTL_NID_ANY) {
269                 CERROR("Received PTL_NID_ANY\n");
270                 return -EPROTO;
271         }
272
273         if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {
274                 CERROR("Received timeout %d < MIN %d\n",
275                        connreq->racr_timeout, RANAL_MIN_TIMEOUT);
276                 return -EPROTO;
277         }
278         
279         return 0;
280 }
281
282 int
283 kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
284 {
285         kra_conn_t         *conn;
286         struct list_head   *ctmp;
287         struct list_head   *cnxt;
288         int                 loopback;
289         int                 count = 0;
290
291         loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
292
293         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
294                 conn = list_entry(ctmp, kra_conn_t, rac_list);
295
296                 if (conn == newconn)
297                         continue;
298
299                 if (conn->rac_peerstamp != newconn->rac_peerstamp) {
300                         CDEBUG(D_NET, "Closing stale conn nid:"LPX64
301                                " peerstamp:"LPX64"("LPX64")\n", peer->rap_nid,
302                                conn->rac_peerstamp, newconn->rac_peerstamp);
303                         LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
304                         count++;
305                         kranal_close_conn_locked(conn, -ESTALE);
306                         continue;
307                 }
308
309                 if (conn->rac_device != newconn->rac_device)
310                         continue;
311                 
312                 if (loopback &&
313                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
314                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
315                         continue;
316                     
317                 LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
318
319                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64
320                        " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, 
321                        conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
322
323                 count++;
324                 kranal_close_conn_locked(conn, -ESTALE);
325         }
326
327         return count;
328 }
329
330 int
331 kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
332 {
333         kra_conn_t       *conn;
334         struct list_head *tmp;
335         int               loopback;
336
337         loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
338         
339         list_for_each(tmp, &peer->rap_conns) {
340                 conn = list_entry(tmp, kra_conn_t, rac_list);
341
342                 /* 'newconn' is from an earlier version of 'peer'!!! */
343                 if (newconn->rac_peerstamp < conn->rac_peerstamp)
344                         return 1;
345
346                 /* 'conn' is from an earlier version of 'peer': it will be
347                  * removed when we cull stale conns later on... */
348                 if (newconn->rac_peerstamp > conn->rac_peerstamp)
349                         continue;
350
351                 /* Different devices are OK */
352                 if (conn->rac_device != newconn->rac_device)
353                         continue;
354
355                 /* It's me connecting to myself */
356                 if (loopback &&
357                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
358                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
359                         continue;
360
361                 /* 'newconn' is an earlier connection from 'peer'!!! */
362                 if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
363                         return 2;
364                 
365                 /* 'conn' is an earlier connection from 'peer': it will be
366                  * removed when we cull stale conns later on... */
367                 if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
368                         continue;
369                 
370                 /* 'newconn' has the SAME connection stamp; 'peer' isn't
371                  * playing the game... */
372                 return 3;
373         }
374
375         return 0;
376 }
377
378 void
379 kranal_set_conn_uniqueness (kra_conn_t *conn)
380 {
381         unsigned long  flags;
382
383         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
384
385         conn->rac_my_connstamp = kranal_data.kra_connstamp++;
386
387         do {    /* allocate a unique cqid */
388                 conn->rac_cqid = kranal_data.kra_next_cqid++;
389         } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
390         
391
392         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
393 }
394
395 int
396 kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
397 {
398         kra_conn_t    *conn;
399         RAP_RETURN     rrc;
400
401         LASSERT (!in_interrupt());
402         PORTAL_ALLOC(conn, sizeof(*conn));
403
404         if (conn == NULL)
405                 return -ENOMEM;
406
407         memset(conn, 0, sizeof(*conn));
408         atomic_set(&conn->rac_refcount, 1);
409         INIT_LIST_HEAD(&conn->rac_list);
410         INIT_LIST_HEAD(&conn->rac_hashlist);
411         INIT_LIST_HEAD(&conn->rac_schedlist);
412         INIT_LIST_HEAD(&conn->rac_fmaq);
413         INIT_LIST_HEAD(&conn->rac_rdmaq);
414         INIT_LIST_HEAD(&conn->rac_replyq);
415         spin_lock_init(&conn->rac_lock);
416
417         kranal_set_conn_uniqueness(conn);
418
419         conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
420         kranal_update_reaper_timeout(conn->rac_timeout);
421
422         rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
423                            &conn->rac_rihandle);
424         if (rrc != RAP_SUCCESS) {
425                 CERROR("RapkCreateRi failed: %d\n", rrc);
426                 PORTAL_FREE(conn, sizeof(*conn));
427                 return -ENETDOWN;
428         }
429
430         atomic_inc(&kranal_data.kra_nconns);
431         *connp = conn;
432         return 0;
433 }
434
435 void
436 kranal_destroy_conn(kra_conn_t *conn) 
437 {
438         RAP_RETURN         rrc;
439
440         LASSERT (!in_interrupt());
441         LASSERT (!conn->rac_scheduled);
442         LASSERT (list_empty(&conn->rac_list));
443         LASSERT (list_empty(&conn->rac_hashlist));
444         LASSERT (list_empty(&conn->rac_schedlist));
445         LASSERT (atomic_read(&conn->rac_refcount) == 0);
446         LASSERT (list_empty(&conn->rac_fmaq));
447         LASSERT (list_empty(&conn->rac_rdmaq));
448         LASSERT (list_empty(&conn->rac_replyq));
449
450         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
451                             conn->rac_rihandle);
452         LASSERT (rrc == RAP_SUCCESS);
453
454         if (conn->rac_peer != NULL)
455                 kranal_peer_decref(conn->rac_peer);
456
457         PORTAL_FREE(conn, sizeof(*conn));
458         atomic_dec(&kranal_data.kra_nconns);
459 }
460
461 void
462 kranal_terminate_conn_locked (kra_conn_t *conn)
463 {
464         LASSERT (!in_interrupt());
465         LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
466         LASSERT (!list_empty(&conn->rac_hashlist));
467         LASSERT (list_empty(&conn->rac_list));
468
469         /* Remove from conn hash table: no new callbacks */
470         list_del_init(&conn->rac_hashlist);
471         kranal_conn_decref(conn);
472
473         conn->rac_state = RANAL_CONN_CLOSED;
474
475         /* schedule to clear out all uncompleted comms in context of dev's
476          * scheduler */
477         kranal_schedule_conn(conn);
478 }
479
480 void
481 kranal_close_conn_locked (kra_conn_t *conn, int error)
482 {
483         kra_peer_t        *peer = conn->rac_peer;
484
485         CDEBUG(error == 0 ? D_NET : D_ERROR,
486                "closing conn to "LPX64": error %d\n", peer->rap_nid, error);
487
488         LASSERT (!in_interrupt());
489         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
490         LASSERT (!list_empty(&conn->rac_hashlist));
491         LASSERT (!list_empty(&conn->rac_list));
492
493         list_del_init(&conn->rac_list);
494
495         if (list_empty(&peer->rap_conns) &&
496             peer->rap_persistence == 0) {
497                 /* Non-persistent peer with no more conns... */
498                 kranal_unlink_peer_locked(peer);
499         }
500                         
501         /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
502          * full timeout */
503         conn->rac_last_rx = jiffies;
504         mb();
505
506         conn->rac_state = RANAL_CONN_CLOSING;
507         kranal_schedule_conn(conn);             /* schedule sending CLOSE */
508
509         kranal_conn_decref(conn);               /* lose peer's ref */
510 }
511
512 void
513 kranal_close_conn (kra_conn_t *conn, int error)
514 {
515         unsigned long    flags;
516         
517
518         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
519         
520         if (conn->rac_state == RANAL_CONN_ESTABLISHED)
521                 kranal_close_conn_locked(conn, error);
522         
523         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
524 }
525
526 int
527 kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, 
528                        __u32 peer_ip, int peer_port)
529 {
530         RAP_RETURN    rrc;
531         
532         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
533         if (rrc != RAP_SUCCESS) {
534                 CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", 
535                        HIPQUAD(peer_ip), peer_port, rrc);
536                 return -EPROTO;
537         }
538         
539         conn->rac_peerstamp = connreq->racr_peerstamp;
540         conn->rac_peer_connstamp = connreq->racr_connstamp;
541         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
542         kranal_update_reaper_timeout(conn->rac_keepalive);
543         return 0;
544 }
545
546 int
547 kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, 
548                                ptl_nid_t *dst_nidp, kra_conn_t **connp)
549 {
550         struct sockaddr_in   addr;
551         __u32                peer_ip;
552         unsigned int         peer_port;
553         kra_connreq_t        connreq;
554         ptl_nid_t            src_nid;
555         ptl_nid_t            dst_nid;
556         kra_conn_t          *conn;
557         kra_device_t        *dev;
558         int                  rc;
559         int                  len;
560         int                  i;
561
562         len = sizeof(addr);
563         rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2);
564         if (rc != 0) {
565                 CERROR("Can't get peer's IP: %d\n", rc);
566                 return rc;
567         }
568
569         peer_ip = ntohl(addr.sin_addr.s_addr);
570         peer_port = ntohs(addr.sin_port);
571
572         if (peer_port >= 1024) {
573                 CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n",
574                        HIPQUAD(peer_ip), peer_port);
575                 return -ECONNREFUSED;
576         }
577
578         rc = kranal_recv_connreq(sock, &connreq, 
579                                  kranal_tunables.kra_listener_timeout);
580         if (rc != 0) {
581                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
582                        HIPQUAD(peer_ip), peer_port, rc);
583                 return rc;
584         }
585
586         src_nid = connreq.racr_srcnid;
587         dst_nid = connreq.racr_dstnid;
588
589         for (i = 0;;i++) {
590                 if (i == kranal_data.kra_ndevs) {
591                         CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
592                                connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
593                         return -ENODEV;
594                 }
595                 dev = &kranal_data.kra_devices[i];
596                 if (dev->rad_id == connreq.racr_devid)
597                         break;
598         }
599
600         rc = kranal_create_conn(&conn, dev);
601         if (rc != 0)
602                 return rc;
603
604         rc = kranal_set_conn_params(conn, &connreq, peer_ip, peer_port);
605         if (rc != 0) {
606                 kranal_conn_decref(conn);
607                 return rc;
608         }
609
610         kranal_pack_connreq(&connreq, conn);
611
612         rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
613         if (rc != 0) {
614                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
615                        HIPQUAD(peer_ip), peer_port, rc);
616                 kranal_conn_decref(conn);
617                 return rc;
618         }
619
620         *connp = conn;
621         *src_nidp = src_nid;
622         *dst_nidp = dst_nid;
623         return 0;
624 }
625
626 int
627 ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
628 {
629         struct sockaddr_in  locaddr;
630         struct sockaddr_in  srvaddr;
631         struct socket      *sock;
632         unsigned int        port;
633         int                 rc;
634
635         for (port = 1023; port >= 512; port--) {
636
637                 memset(&locaddr, 0, sizeof(locaddr)); 
638                 locaddr.sin_family      = AF_INET; 
639                 locaddr.sin_port        = htons(port);
640                 locaddr.sin_addr.s_addr = htonl(INADDR_ANY);
641
642                 memset (&srvaddr, 0, sizeof (srvaddr));
643                 srvaddr.sin_family      = AF_INET;
644                 srvaddr.sin_port        = htons (peer->rap_port);
645                 srvaddr.sin_addr.s_addr = htonl (peer->rap_ip);
646
647                 rc = kranal_create_sock(&sock);
648                 if (rc != 0)
649                         return rc;
650
651                 rc = sock->ops->bind(sock,
652                                      (struct sockaddr *)&locaddr, sizeof(locaddr));
653                 if (rc != 0) {
654                         sock_release(sock);
655                         
656                         if (rc == -EADDRINUSE) {
657                                 CDEBUG(D_NET, "Port %d already in use\n", port);
658                                 continue;
659                         }
660
661                         CERROR("Can't bind to reserved port %d: %d\n", port, rc);
662                         return rc;
663                 }
664
665                 rc = sock->ops->connect(sock,
666                                         (struct sockaddr *)&srvaddr, sizeof(srvaddr),
667                                         0);
668                 if (rc == 0) {
669                         *sockp = sock;
670                         return 0;
671                 }
672                 
673                 sock_release(sock);
674
675                 if (rc != -EADDRNOTAVAIL) {
676                         CERROR("Can't connect port %d to %u.%u.%u.%u/%d: %d\n",
677                                port, HIPQUAD(peer->rap_ip), peer->rap_port, rc);
678                         return rc;
679                 }
680                 
681                 CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", 
682                        port, HIPQUAD(peer->rap_ip), peer->rap_port);
683         }
684
685         /* all ports busy */
686         return -EHOSTUNREACH;
687 }
688
689
690 int
691 kranal_active_conn_handshake(kra_peer_t *peer, 
692                              ptl_nid_t *dst_nidp, kra_conn_t **connp)
693 {
694         kra_connreq_t       connreq;
695         kra_conn_t         *conn;
696         kra_device_t       *dev;
697         struct socket      *sock;
698         int                 rc;
699         unsigned int        idx;
700
701         /* spread connections over all devices using both peer NIDs to ensure
702          * all nids use all devices */
703         idx = peer->rap_nid + kranal_lib.libnal_ni.ni_pid.nid;
704         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
705
706         rc = kranal_create_conn(&conn, dev);
707         if (rc != 0)
708                 return rc;
709
710         kranal_pack_connreq(&connreq, conn);
711         
712         rc = ranal_connect_sock(peer, &sock);
713         if (rc != 0)
714                 goto failed_0;
715
716         /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
717          * immediately after accepting a connection, so we connect and then
718          * send immediately. */
719
720         rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
721         if (rc != 0) {
722                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
723                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
724                 goto failed_1;
725         }
726
727         rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout);
728         if (rc != 0) {
729                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
730                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
731                 goto failed_1;
732         }
733
734         sock_release(sock);
735         rc = -EPROTO;
736
737         if (connreq.racr_srcnid != peer->rap_nid) {
738                 CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
739                        "received "LPX64" expected "LPX64"\n",
740                        HIPQUAD(peer->rap_ip), peer->rap_port, 
741                        connreq.racr_srcnid, peer->rap_nid);
742                 goto failed_0;
743         }
744
745         if (connreq.racr_devid != dev->rad_id) {
746                 CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
747                        "received %d expected %d\n",
748                        HIPQUAD(peer->rap_ip), peer->rap_port, 
749                        connreq.racr_devid, dev->rad_id);
750                 goto failed_0;
751         }
752
753         rc = kranal_set_conn_params(conn, &connreq, 
754                                     peer->rap_ip, peer->rap_port);
755         if (rc != 0)
756                 goto failed_0;
757
758         *connp = conn;
759         *dst_nidp = connreq.racr_dstnid;
760         return 0;
761
762  failed_1:
763         sock_release(sock);
764  failed_0:
765         kranal_conn_decref(conn);
766         return rc;
767 }
768
769 int
770 kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
771 {
772         kra_peer_t        *peer2;
773         kra_tx_t          *tx;
774         ptl_nid_t          peer_nid;
775         ptl_nid_t          dst_nid;
776         unsigned long      flags;
777         kra_conn_t        *conn;
778         int                rc;
779         int                nstale;
780         int                new_peer = 0;
781
782         if (sock == NULL) {
783                 /* active: connd wants to connect to 'peer' */
784                 LASSERT (peer != NULL);
785                 LASSERT (peer->rap_connecting);
786                 
787                 rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
788                 if (rc != 0)
789                         return rc;
790
791                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
792
793                 if (!kranal_peer_active(peer)) {
794                         /* raced with peer getting unlinked */
795                         write_unlock_irqrestore(&kranal_data.kra_global_lock, 
796                                                 flags);
797                         kranal_conn_decref(conn);
798                         return -ESTALE;
799                 }
800
801                 peer_nid = peer->rap_nid;
802         } else {
803                 /* passive: listener accepted 'sock' */
804                 LASSERT (peer == NULL);
805
806                 rc = kranal_passive_conn_handshake(sock, &peer_nid,
807                                                    &dst_nid, &conn);
808                 if (rc != 0)
809                         return rc;
810
811                 /* assume this is a new peer */
812                 peer = kranal_create_peer(peer_nid);
813                 if (peer == NULL) {
814                         CERROR("Can't allocate peer for "LPX64"\n", peer_nid);
815                         kranal_conn_decref(conn);
816                         return -ENOMEM;
817                 }
818
819                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
820
821                 peer2 = kranal_find_peer_locked(peer_nid);
822                 if (peer2 == NULL) {
823                         new_peer = 1;
824                 } else {
825                         /* peer_nid already in the peer table */
826                         kranal_peer_decref(peer);
827                         peer = peer2;
828                 }
829         }
830
831         LASSERT (!new_peer == !kranal_peer_active(peer));
832
833         /* Refuse connection if peer thinks we are a different NID.  We check
834          * this while holding the global lock, to synch with connection
835          * destruction on NID change. */
836         if (dst_nid != kranal_lib.libnal_ni.ni_pid.nid) {
837                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
838
839                 CERROR("Stale/bad connection with "LPX64
840                        ": dst_nid "LPX64", expected "LPX64"\n",
841                        peer_nid, dst_nid, kranal_lib.libnal_ni.ni_pid.nid);
842                 rc = -ESTALE;
843                 goto failed;
844         }
845
846         /* Refuse to duplicate an existing connection (both sides might try to
847          * connect at once).  NB we return success!  We _are_ connected so we
848          * _don't_ have any blocked txs to complete with failure. */
849         rc = kranal_conn_isdup_locked(peer, conn);
850         if (rc != 0) {
851                 LASSERT (!list_empty(&peer->rap_conns));
852                 LASSERT (list_empty(&peer->rap_tx_queue));
853                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
854                 CWARN("Not creating duplicate connection to "LPX64": %d\n",
855                       peer_nid, rc);
856                 rc = 0;
857                 goto failed;
858         }
859
860         if (new_peer) {
861                 /* peer table takes my ref on the new peer */
862                 list_add_tail(&peer->rap_list,
863                               kranal_nid2peerlist(peer_nid));
864         }
865         
866         kranal_peer_addref(peer);               /* +1 ref for conn */
867         conn->rac_peer = peer;
868         list_add_tail(&conn->rac_list, &peer->rap_conns);
869
870         kranal_conn_addref(conn);               /* +1 ref for conn table */
871         list_add_tail(&conn->rac_hashlist,
872                       kranal_cqid2connlist(conn->rac_cqid));
873
874         /* Schedule all packets blocking for a connection */
875         while (!list_empty(&peer->rap_tx_queue)) {
876                 tx = list_entry(&peer->rap_tx_queue.next,
877                                 kra_tx_t, tx_list);
878
879                 list_del(&tx->tx_list);
880                 kranal_post_fma(conn, tx);
881         }
882
883         nstale = kranal_close_stale_conns_locked(peer, conn);
884
885         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
886
887         /* CAVEAT EMPTOR: passive peer can disappear NOW */
888
889         if (nstale != 0)
890                 CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid);
891
892         /* Ensure conn gets checked.  Transmits may have been queued and an
893          * FMA event may have happened before it got in the cq hash table */
894         kranal_schedule_conn(conn);
895         return 0;
896
897  failed:
898         if (new_peer)
899                 kranal_peer_decref(peer);
900         kranal_conn_decref(conn);
901         return rc;
902 }
903
904 void
905 kranal_connect (kra_peer_t *peer)
906 {
907         kra_tx_t          *tx;
908         unsigned long      flags;
909         struct list_head   zombies;
910         int                rc;
911
912         LASSERT (peer->rap_connecting);
913
914         rc = kranal_conn_handshake(NULL, peer);
915
916         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
917
918         LASSERT (peer->rap_connecting);
919         peer->rap_connecting = 0;
920
921         if (rc == 0) {
922                 /* kranal_conn_handshake() queues blocked txs immediately on
923                  * success to avoid messages jumping the queue */
924                 LASSERT (list_empty(&peer->rap_tx_queue));
925
926                 /* reset reconnection timeouts */
927                 peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
928                 peer->rap_reconnect_time = CURRENT_TIME;
929
930                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
931                 return;
932         }
933
934         LASSERT (peer->rap_reconnect_interval != 0);
935         peer->rap_reconnect_time = CURRENT_TIME + peer->rap_reconnect_interval;
936         peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL,
937                                            1 * peer->rap_reconnect_interval);
938
939         /* Grab all blocked packets while we have the global lock */
940         list_add(&zombies, &peer->rap_tx_queue);
941         list_del_init(&peer->rap_tx_queue);
942
943         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
944
945         if (list_empty(&zombies))
946                 return;
947
948         CWARN("Dropping packets for "LPX64": connection failed\n",
949               peer->rap_nid);
950
951         do {
952                 tx = list_entry(zombies.next, kra_tx_t, tx_list);
953
954                 list_del(&tx->tx_list);
955                 kranal_tx_done(tx, -EHOSTUNREACH);
956
957         } while (!list_empty(&zombies));
958 }
959
960 int
961 kranal_listener(void *arg)
962 {
963         struct sockaddr_in addr;
964         wait_queue_t       wait;
965         struct socket     *sock;
966         kra_acceptsock_t  *ras;
967         int                port;
968         char               name[16];
969         int                rc;
970         unsigned long      flags;
971
972         /* Parent thread holds kra_nid_mutex, and is, or is about to
973          * block on kra_listener_signal */
974
975         port = kranal_tunables.kra_port;
976         snprintf(name, sizeof(name), "kranal_lstn%03d", port);
977         kportal_daemonize(name);
978         kportal_blockallsigs();
979
980         init_waitqueue_entry(&wait, current);
981
982         rc = kranal_create_sock(&sock);
983         if (rc != 0)
984                 goto out_0;
985
986         memset(&addr, 0, sizeof(addr));
987         addr.sin_family      = AF_INET;
988         addr.sin_port        = htons(port);
989         addr.sin_addr.s_addr = INADDR_ANY;
990
991         rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
992         if (rc != 0) {
993                 CERROR("Can't bind to port %d\n", port);
994                 goto out_1;
995         }
996
997         rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
998         if (rc != 0) {
999                 CERROR("Can't set listen backlog %d: %d\n", 
1000                        kranal_tunables.kra_backlog, rc);
1001                 goto out_1;
1002         }
1003
1004         LASSERT (kranal_data.kra_listener_sock == NULL);
1005         kranal_data.kra_listener_sock = sock;
1006
1007         /* unblock waiting parent */
1008         LASSERT (kranal_data.kra_listener_shutdown == 0);
1009         up(&kranal_data.kra_listener_signal);
1010
1011         /* Wake me any time something happens on my socket */
1012         add_wait_queue(sock->sk->sk_sleep, &wait);
1013         ras = NULL;
1014
1015         while (kranal_data.kra_listener_shutdown == 0) {
1016
1017                 if (ras == NULL) {
1018                         PORTAL_ALLOC(ras, sizeof(*ras));
1019                         if (ras == NULL) {
1020                                 CERROR("Out of Memory: pausing...\n");
1021                                 kranal_pause(HZ);
1022                                 continue;
1023                         }
1024                         ras->ras_sock = NULL;
1025                 }
1026
1027                 if (ras->ras_sock == NULL) {
1028                         ras->ras_sock = sock_alloc();
1029                         if (ras->ras_sock == NULL) {
1030                                 CERROR("Can't allocate socket: pausing...\n");
1031                                 kranal_pause(HZ);
1032                                 continue;
1033                         }
1034                 }
1035                 
1036                 set_current_state(TASK_INTERRUPTIBLE);
1037
1038                 rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK);
1039
1040                 /* Sleep for socket activity? */
1041                 if (rc == -EAGAIN &&
1042                     kranal_data.kra_listener_shutdown == 0)
1043                         schedule();
1044
1045                 set_current_state(TASK_RUNNING);
1046
1047                 if (rc == 0) {
1048                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1049                         
1050                         list_add_tail(&ras->ras_list, 
1051                                       &kranal_data.kra_connd_acceptq);
1052
1053                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1054                         wake_up(&kranal_data.kra_connd_waitq);
1055
1056                         ras = NULL;
1057                         continue;
1058                 }
1059                 
1060                 if (rc != -EAGAIN) {
1061                         CERROR("Accept failed: %d, pausing...\n", rc);
1062                         kranal_pause(HZ);
1063                 }
1064         }
1065
1066         if (ras != NULL) {
1067                 if (ras->ras_sock != NULL)
1068                         sock_release(ras->ras_sock);
1069                 PORTAL_FREE(ras, sizeof(*ras));
1070         }
1071
1072         rc = 0;
1073         remove_wait_queue(sock->sk->sk_sleep, &wait);
1074  out_1:
1075         sock_release(sock);
1076         kranal_data.kra_listener_sock = NULL;
1077  out_0:
1078         /* set completion status and unblock thread waiting for me 
1079          * (parent on startup failure, executioner on normal shutdown) */
1080         kranal_data.kra_listener_shutdown = rc;
1081         up(&kranal_data.kra_listener_signal);
1082
1083         return 0;
1084 }
1085
1086 int
1087 kranal_start_listener (void)
1088 {
1089         long           pid;
1090         int            rc;
1091
1092         CDEBUG(D_WARNING, "Starting listener\n");
1093
1094         /* Called holding kra_nid_mutex: listener stopped */
1095         LASSERT (kranal_data.kra_listener_sock == NULL);
1096
1097         kranal_data.kra_listener_shutdown = 0;
1098         pid = kernel_thread(kranal_listener, NULL, 0);
1099         if (pid < 0) {
1100                 CERROR("Can't spawn listener: %ld\n", pid);
1101                 return (int)pid;
1102         }
1103
1104         /* Block until listener has started up. */
1105         down(&kranal_data.kra_listener_signal);
1106
1107         rc = kranal_data.kra_listener_shutdown;
1108         LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL));
1109
1110         CDEBUG(D_WARNING, "Listener %ld started OK\n", pid);
1111         return rc;
1112 }
1113
1114 void
1115 kranal_stop_listener(void)
1116 {
1117         CDEBUG(D_WARNING, "Stopping listener\n");
1118
1119         /* Called holding kra_nid_mutex: listener running */
1120         LASSERT (kranal_data.kra_listener_sock != NULL);
1121
1122         kranal_data.kra_listener_shutdown = 1;
1123         wake_up_all(kranal_data.kra_listener_sock->sk->sk_sleep);
1124
1125         /* Block until listener has torn down. */
1126         down(&kranal_data.kra_listener_signal);
1127
1128         LASSERT (kranal_data.kra_listener_sock == NULL);
1129         CDEBUG(D_WARNING, "Listener stopped\n");
1130 }
1131
1132 int 
1133 kranal_listener_procint(ctl_table *table, int write, struct file *filp,
1134                         void *buffer, size_t *lenp)
1135 {
1136         int   *tunable = (int *)table->data;
1137         int    old_val;
1138         int    rc;
1139
1140         /* No race with nal initialisation since the nal is setup all the time
1141          * it's loaded.  When that changes, change this! */
1142         LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
1143
1144         down(&kranal_data.kra_nid_mutex);
1145
1146         LASSERT (tunable == &kranal_tunables.kra_port ||
1147                  tunable == &kranal_tunables.kra_backlog);
1148         old_val = *tunable;
1149
1150         rc = proc_dointvec(table, write, filp, buffer, lenp);
1151
1152         if (write &&
1153             (*tunable != old_val ||
1154              kranal_data.kra_listener_sock == NULL)) {
1155
1156                 if (kranal_data.kra_listener_sock != NULL)
1157                         kranal_stop_listener();
1158
1159                 rc = kranal_start_listener();
1160
1161                 if (rc != 0) {
1162                         CWARN("Unable to start listener with new tunable:"
1163                               " reverting to old value\n");
1164                         *tunable = old_val;
1165                         kranal_start_listener();
1166                 }
1167         }
1168
1169         up(&kranal_data.kra_nid_mutex);
1170
1171         LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
1172         return rc;
1173 }
1174
1175 int
1176 kranal_set_mynid(ptl_nid_t nid)
1177 {
1178         unsigned long  flags;
1179         lib_ni_t      *ni = &kranal_lib.libnal_ni;
1180         int            rc = 0;
1181
1182         CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n",
1183                nid, ni->ni_pid.nid);
1184
1185         down(&kranal_data.kra_nid_mutex);
1186
1187         if (nid == ni->ni_pid.nid) {
1188                 /* no change of NID */
1189                 up(&kranal_data.kra_nid_mutex);
1190                 return 0;
1191         }
1192
1193         if (kranal_data.kra_listener_sock != NULL)
1194                 kranal_stop_listener();
1195
1196         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1197         kranal_data.kra_peerstamp++;
1198         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1199
1200         ni->ni_pid.nid = nid;
1201
1202         /* Delete all existing peers and their connections after new
1203          * NID/connstamp set to ensure no old connections in our brave
1204          * new world. */
1205         kranal_del_peer(PTL_NID_ANY, 0);
1206
1207         if (nid != PTL_NID_ANY)
1208                 rc = kranal_start_listener();
1209
1210         up(&kranal_data.kra_nid_mutex);
1211         return rc;
1212 }
1213
1214 kra_peer_t *
1215 kranal_create_peer (ptl_nid_t nid)
1216 {
1217         kra_peer_t *peer;
1218
1219         LASSERT (nid != PTL_NID_ANY);
1220
1221         PORTAL_ALLOC(peer, sizeof(*peer));
1222         if (peer == NULL)
1223                 return NULL;
1224
1225         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
1226
1227         peer->rap_nid = nid;
1228         atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
1229
1230         INIT_LIST_HEAD(&peer->rap_list);
1231         INIT_LIST_HEAD(&peer->rap_connd_list);
1232         INIT_LIST_HEAD(&peer->rap_conns);
1233         INIT_LIST_HEAD(&peer->rap_tx_queue);
1234
1235         peer->rap_reconnect_time = CURRENT_TIME;
1236         peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
1237
1238         atomic_inc(&kranal_data.kra_npeers);
1239         return peer;
1240 }
1241
1242 void
1243 kranal_destroy_peer (kra_peer_t *peer)
1244 {
1245         CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer);
1246
1247         LASSERT (atomic_read(&peer->rap_refcount) == 0);
1248         LASSERT (peer->rap_persistence == 0);
1249         LASSERT (!kranal_peer_active(peer));
1250         LASSERT (!peer->rap_connecting);
1251         LASSERT (list_empty(&peer->rap_conns));
1252         LASSERT (list_empty(&peer->rap_tx_queue));
1253         LASSERT (list_empty(&peer->rap_connd_list));
1254
1255         PORTAL_FREE(peer, sizeof(*peer));
1256
1257         /* NB a peer's connections keep a reference on their peer until
1258          * they are destroyed, so we can be assured that _all_ state to do
1259          * with this peer has been cleaned up when its refcount drops to
1260          * zero. */
1261         atomic_dec(&kranal_data.kra_npeers);
1262 }
1263
1264 kra_peer_t *
1265 kranal_find_peer_locked (ptl_nid_t nid)
1266 {
1267         struct list_head *peer_list = kranal_nid2peerlist(nid);
1268         struct list_head *tmp;
1269         kra_peer_t       *peer;
1270
1271         list_for_each (tmp, peer_list) {
1272
1273                 peer = list_entry(tmp, kra_peer_t, rap_list);
1274
1275                 LASSERT (peer->rap_persistence > 0 ||     /* persistent peer */
1276                          !list_empty(&peer->rap_conns));  /* active conn */
1277
1278                 if (peer->rap_nid != nid)
1279                         continue;
1280
1281                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
1282                        peer, nid, atomic_read(&peer->rap_refcount));
1283                 return peer;
1284         }
1285         return NULL;
1286 }
1287
1288 kra_peer_t *
1289 kranal_find_peer (ptl_nid_t nid)
1290 {
1291         kra_peer_t     *peer;
1292
1293         read_lock(&kranal_data.kra_global_lock);
1294         peer = kranal_find_peer_locked(nid);
1295         if (peer != NULL)                       /* +1 ref for caller? */
1296                 kranal_peer_addref(peer);
1297         read_unlock(&kranal_data.kra_global_lock);
1298
1299         return peer;
1300 }
1301
1302 void
1303 kranal_unlink_peer_locked (kra_peer_t *peer)
1304 {
1305         LASSERT (peer->rap_persistence == 0);
1306         LASSERT (list_empty(&peer->rap_conns));
1307
1308         LASSERT (kranal_peer_active(peer));
1309         list_del_init(&peer->rap_list);
1310
1311         /* lose peerlist's ref */
1312         kranal_peer_decref(peer);
1313 }
1314
1315 int
1316 kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, 
1317                       int *persistencep)
1318 {
1319         kra_peer_t        *peer;
1320         struct list_head  *ptmp;
1321         int                i;
1322
1323         read_lock(&kranal_data.kra_global_lock);
1324
1325         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1326
1327                 list_for_each(ptmp, &kranal_data.kra_peers[i]) {
1328
1329                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1330                         LASSERT (peer->rap_persistence > 0 ||
1331                                  !list_empty(&peer->rap_conns));
1332
1333                         if (index-- > 0)
1334                                 continue;
1335
1336                         *nidp = peer->rap_nid;
1337                         *ipp = peer->rap_ip;
1338                         *portp = peer->rap_port;
1339                         *persistencep = peer->rap_persistence;
1340
1341                         read_unlock(&kranal_data.kra_global_lock);
1342                         return 0;
1343                 }
1344         }
1345
1346         read_unlock(&kranal_data.kra_global_lock);
1347         return -ENOENT;
1348 }
1349
1350 int
1351 kranal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port)
1352 {
1353         unsigned long      flags;
1354         kra_peer_t        *peer;
1355         kra_peer_t        *peer2;
1356
1357         if (nid == PTL_NID_ANY)
1358                 return -EINVAL;
1359
1360         peer = kranal_create_peer(nid);
1361         if (peer == NULL)
1362                 return -ENOMEM;
1363
1364         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1365
1366         peer2 = kranal_find_peer_locked(nid);
1367         if (peer2 != NULL) {
1368                 kranal_peer_decref(peer);
1369                 peer = peer2;
1370         } else {
1371                 /* peer table takes existing ref on peer */
1372                 list_add_tail(&peer->rap_list,
1373                               kranal_nid2peerlist(nid));
1374         }
1375
1376         peer->rap_ip = ip;
1377         peer->rap_port = port;
1378         peer->rap_persistence++;
1379
1380         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1381         return 0;
1382 }
1383
1384 void
1385 kranal_del_peer_locked (kra_peer_t *peer, int single_share)
1386 {
1387         struct list_head *ctmp;
1388         struct list_head *cnxt;
1389         kra_conn_t       *conn;
1390
1391         if (!single_share)
1392                 peer->rap_persistence = 0;
1393         else if (peer->rap_persistence > 0)
1394                 peer->rap_persistence--;
1395
1396         if (peer->rap_persistence != 0)
1397                 return;
1398
1399         if (list_empty(&peer->rap_conns)) {
1400                 kranal_unlink_peer_locked(peer);
1401         } else {
1402                 list_for_each_safe(ctmp, cnxt, &peer->rap_conns) {
1403                         conn = list_entry(ctmp, kra_conn_t, rac_list);
1404
1405                         kranal_close_conn_locked(conn, 0);
1406                 }
1407                 /* peer unlinks itself when last conn is closed */
1408         }
1409 }
1410
1411 int
1412 kranal_del_peer (ptl_nid_t nid, int single_share)
1413 {
1414         unsigned long      flags;
1415         struct list_head  *ptmp;
1416         struct list_head  *pnxt;
1417         kra_peer_t        *peer;
1418         int                lo;
1419         int                hi;
1420         int                i;
1421         int                rc = -ENOENT;
1422
1423         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1424
1425         if (nid != PTL_NID_ANY)
1426                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1427         else {
1428                 lo = 0;
1429                 hi = kranal_data.kra_peer_hash_size - 1;
1430         }
1431
1432         for (i = lo; i <= hi; i++) {
1433                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1434                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1435                         LASSERT (peer->rap_persistence > 0 ||
1436                                  !list_empty(&peer->rap_conns));
1437
1438                         if (!(nid == PTL_NID_ANY || peer->rap_nid == nid))
1439                                 continue;
1440
1441                         kranal_del_peer_locked(peer, single_share);
1442                         rc = 0;         /* matched something */
1443
1444                         if (single_share)
1445                                 goto out;
1446                 }
1447         }
1448  out:
1449         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1450
1451         return rc;
1452 }
1453
1454 kra_conn_t *
1455 kranal_get_conn_by_idx (int index)
1456 {
1457         kra_peer_t        *peer;
1458         struct list_head  *ptmp;
1459         kra_conn_t        *conn;
1460         struct list_head  *ctmp;
1461         int                i;
1462
1463         read_lock (&kranal_data.kra_global_lock);
1464
1465         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1466                 list_for_each (ptmp, &kranal_data.kra_peers[i]) {
1467
1468                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1469                         LASSERT (peer->rap_persistence > 0 ||
1470                                  !list_empty(&peer->rap_conns));
1471
1472                         list_for_each (ctmp, &peer->rap_conns) {
1473                                 if (index-- > 0)
1474                                         continue;
1475
1476                                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1477                                 CDEBUG(D_NET, "++conn[%p] -> "LPX64" (%d)\n",
1478                                        conn, conn->rac_peer->rap_nid,
1479                                        atomic_read(&conn->rac_refcount));
1480                                 atomic_inc(&conn->rac_refcount);
1481                                 read_unlock(&kranal_data.kra_global_lock);
1482                                 return conn;
1483                         }
1484                 }
1485         }
1486
1487         read_unlock(&kranal_data.kra_global_lock);
1488         return NULL;
1489 }
1490
1491 int
1492 kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
1493 {
1494         kra_conn_t         *conn;
1495         struct list_head   *ctmp;
1496         struct list_head   *cnxt;
1497         int                 count = 0;
1498
1499         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
1500                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1501
1502                 count++;
1503                 kranal_close_conn_locked(conn, why);
1504         }
1505
1506         return count;
1507 }
1508
1509 int
1510 kranal_close_matching_conns (ptl_nid_t nid)
1511 {
1512         unsigned long       flags;
1513         kra_peer_t         *peer;
1514         struct list_head   *ptmp;
1515         struct list_head   *pnxt;
1516         int                 lo;
1517         int                 hi;
1518         int                 i;
1519         int                 count = 0;
1520
1521         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1522
1523         if (nid != PTL_NID_ANY)
1524                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1525         else {
1526                 lo = 0;
1527                 hi = kranal_data.kra_peer_hash_size - 1;
1528         }
1529
1530         for (i = lo; i <= hi; i++) {
1531                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1532
1533                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1534                         LASSERT (peer->rap_persistence > 0 ||
1535                                  !list_empty(&peer->rap_conns));
1536
1537                         if (!(nid == PTL_NID_ANY || nid == peer->rap_nid))
1538                                 continue;
1539
1540                         count += kranal_close_peer_conns_locked(peer, 0);
1541                 }
1542         }
1543
1544         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1545
1546         /* wildcards always succeed */
1547         if (nid == PTL_NID_ANY)
1548                 return 0;
1549
1550         return (count == 0) ? -ENOENT : 0;
1551 }
1552
1553 int
1554 kranal_cmd(struct portals_cfg *pcfg, void * private)
1555 {
1556         int rc = -EINVAL;
1557
1558         LASSERT (pcfg != NULL);
1559
1560         switch(pcfg->pcfg_command) {
1561         case NAL_CMD_GET_PEER: {
1562                 ptl_nid_t   nid = 0;
1563                 __u32       ip = 0;
1564                 int         port = 0;
1565                 int         share_count = 0;
1566
1567                 rc = kranal_get_peer_info(pcfg->pcfg_count,
1568                                           &nid, &ip, &port, &share_count);
1569                 pcfg->pcfg_nid   = nid;
1570                 pcfg->pcfg_size  = 0;
1571                 pcfg->pcfg_id    = ip;
1572                 pcfg->pcfg_misc  = port;
1573                 pcfg->pcfg_count = 0;
1574                 pcfg->pcfg_wait  = share_count;
1575                 break;
1576         }
1577         case NAL_CMD_ADD_PEER: {
1578                 rc = kranal_add_persistent_peer(pcfg->pcfg_nid,
1579                                                 pcfg->pcfg_id, /* IP */
1580                                                 pcfg->pcfg_misc); /* port */
1581                 break;
1582         }
1583         case NAL_CMD_DEL_PEER: {
1584                 rc = kranal_del_peer(pcfg->pcfg_nid, 
1585                                      /* flags == single_share */
1586                                      pcfg->pcfg_flags != 0);
1587                 break;
1588         }
1589         case NAL_CMD_GET_CONN: {
1590                 kra_conn_t *conn = kranal_get_conn_by_idx(pcfg->pcfg_count);
1591
1592                 if (conn == NULL)
1593                         rc = -ENOENT;
1594                 else {
1595                         rc = 0;
1596                         pcfg->pcfg_nid   = conn->rac_peer->rap_nid;
1597                         pcfg->pcfg_id    = conn->rac_device->rad_id;
1598                         pcfg->pcfg_misc  = 0;
1599                         pcfg->pcfg_flags = 0;
1600                         kranal_conn_decref(conn);
1601                 }
1602                 break;
1603         }
1604         case NAL_CMD_CLOSE_CONNECTION: {
1605                 rc = kranal_close_matching_conns(pcfg->pcfg_nid);
1606                 break;
1607         }
1608         case NAL_CMD_REGISTER_MYNID: {
1609                 if (pcfg->pcfg_nid == PTL_NID_ANY)
1610                         rc = -EINVAL;
1611                 else
1612                         rc = kranal_set_mynid(pcfg->pcfg_nid);
1613                 break;
1614         }
1615         }
1616
1617         return rc;
1618 }
1619
1620 void
1621 kranal_free_txdescs(struct list_head *freelist)
1622 {
1623         kra_tx_t    *tx;
1624
1625         while (!list_empty(freelist)) {
1626                 tx = list_entry(freelist->next, kra_tx_t, tx_list);
1627
1628                 list_del(&tx->tx_list);
1629                 PORTAL_FREE(tx->tx_phys, PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
1630                 PORTAL_FREE(tx, sizeof(*tx));
1631         }
1632 }
1633
1634 int
1635 kranal_alloc_txdescs(struct list_head *freelist, int n)
1636 {
1637         int            isnblk = (freelist == &kranal_data.kra_idle_nblk_txs);
1638         int            i;
1639         kra_tx_t      *tx;
1640
1641         LASSERT (freelist == &kranal_data.kra_idle_txs ||
1642                  freelist == &kranal_data.kra_idle_nblk_txs);
1643         LASSERT (list_empty(freelist));
1644
1645         for (i = 0; i < n; i++) {
1646
1647                 PORTAL_ALLOC(tx, sizeof(*tx));
1648                 if (tx == NULL) {
1649                         CERROR("Can't allocate %stx[%d]\n",
1650                                isnblk ? "nblk " : "", i);
1651                         kranal_free_txdescs(freelist);
1652                         return -ENOMEM;
1653                 }
1654
1655                 PORTAL_ALLOC(tx->tx_phys,
1656                              PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
1657                 if (tx->tx_phys == NULL) {
1658                         CERROR("Can't allocate %stx[%d]->tx_phys\n", 
1659                                isnblk ? "nblk " : "", i);
1660
1661                         PORTAL_FREE(tx, sizeof(*tx));
1662                         kranal_free_txdescs(freelist);
1663                         return -ENOMEM;
1664                 }
1665
1666                 tx->tx_isnblk = isnblk;
1667                 tx->tx_buftype = RANAL_BUF_NONE;
1668                 tx->tx_msg.ram_type = RANAL_MSG_NONE;
1669
1670                 list_add(&tx->tx_list, freelist);
1671         }
1672
1673         return 0;
1674 }
1675
1676 int
1677 kranal_device_init(int id, kra_device_t *dev)
1678 {
1679         const int         total_ntx = RANAL_NTX + RANAL_NTX_NBLK;
1680         RAP_RETURN        rrc;
1681
1682         dev->rad_id = id;
1683         rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
1684                                    &dev->rad_handle);
1685         if (rrc != RAP_SUCCESS) {
1686                 CERROR("Can't get Rapidarray Device %d: %d\n", id, rrc);
1687                 goto failed_0;
1688         }
1689
1690         rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
1691         if (rrc != RAP_SUCCESS) {
1692                 CERROR("Can't reserve %d RDMA descriptors"
1693                        " for device %d: %d\n", total_ntx, id, rrc);
1694                 goto failed_1;
1695         }
1696
1697         rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
1698                            &dev->rad_rdma_cqh);
1699         if (rrc != RAP_SUCCESS) {
1700                 CERROR("Can't create rdma cq size %d"
1701                        " for device %d: %d\n", total_ntx, id, rrc);
1702                 goto failed_1;
1703         }
1704
1705         rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, RAP_CQTYPE_RECV,
1706                            &dev->rad_fma_cqh);
1707         if (rrc != RAP_SUCCESS) {
1708                 CERROR("Can't create fma cq size %d"
1709                        " for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc);
1710                 goto failed_2;
1711         }
1712
1713         return 0;
1714
1715  failed_2:
1716         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1717  failed_1:
1718         RapkReleaseDevice(dev->rad_handle);
1719  failed_0:
1720         return -ENODEV;
1721 }
1722
1723 void
1724 kranal_device_fini(kra_device_t *dev)
1725 {
1726         LASSERT(dev->rad_scheduler == NULL);
1727         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
1728         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1729         RapkReleaseDevice(dev->rad_handle);
1730 }
1731
1732 void
1733 kranal_api_shutdown (nal_t *nal)
1734 {
1735         int           i;
1736         unsigned long flags;
1737         
1738         if (nal->nal_refct != 0) {
1739                 /* This module got the first ref */
1740                 PORTAL_MODULE_UNUSE;
1741                 return;
1742         }
1743
1744         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1745                atomic_read(&portal_kmemory));
1746
1747         LASSERT (nal == &kranal_api);
1748
1749         switch (kranal_data.kra_init) {
1750         default:
1751                 CERROR("Unexpected state %d\n", kranal_data.kra_init);
1752                 LBUG();
1753
1754         case RANAL_INIT_ALL:
1755                 /* stop calls to nal_cmd */
1756                 libcfs_nal_cmd_unregister(RANAL);
1757                 /* No new persistent peers */
1758
1759                 /* resetting my NID to unadvertises me, removes my
1760                  * listener and nukes all current peers */
1761                 kranal_set_mynid(PTL_NID_ANY);
1762                 /* no new peers or conns */
1763
1764                 /* Wait for all peer/conn state to clean up */
1765                 i = 2;
1766                 while (atomic_read(&kranal_data.kra_nconns) != 0 ||
1767                        atomic_read(&kranal_data.kra_npeers) != 0) {
1768                         i++;
1769                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1770                                "waiting for %d peers and %d conns to close down\n",
1771                                atomic_read(&kranal_data.kra_npeers),
1772                                atomic_read(&kranal_data.kra_nconns));
1773                         kranal_pause(HZ);
1774                 }
1775                 /* fall through */
1776
1777         case RANAL_INIT_LIB:
1778                 lib_fini(&kranal_lib);
1779                 /* fall through */
1780
1781         case RANAL_INIT_DATA:
1782                 break;
1783         }
1784
1785         /* flag threads to terminate; wake and wait for them to die */
1786         kranal_data.kra_shutdown = 1;
1787
1788         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1789                 kra_device_t *dev = &kranal_data.kra_devices[i];
1790
1791                 LASSERT (list_empty(&dev->rad_connq));
1792
1793                 spin_lock_irqsave(&dev->rad_lock, flags);
1794                 wake_up(&dev->rad_waitq);
1795                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1796         }
1797
1798         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1799         wake_up_all(&kranal_data.kra_reaper_waitq);
1800         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1801
1802         LASSERT (list_empty(&kranal_data.kra_connd_peers));
1803         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); 
1804         wake_up_all(&kranal_data.kra_connd_waitq);
1805         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); 
1806
1807         i = 2;
1808         while (atomic_read(&kranal_data.kra_nthreads) != 0) {
1809                 i++;
1810                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1811                        "Waiting for %d threads to terminate\n",
1812                        atomic_read(&kranal_data.kra_nthreads));
1813                 kranal_pause(HZ);
1814         }
1815
1816         LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
1817         if (kranal_data.kra_peers != NULL) {
1818                 for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1819                         LASSERT (list_empty(&kranal_data.kra_peers[i]));
1820
1821                 PORTAL_FREE(kranal_data.kra_peers,
1822                             sizeof (struct list_head) * 
1823                             kranal_data.kra_peer_hash_size);
1824         }
1825
1826         LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
1827         if (kranal_data.kra_conns != NULL) {
1828                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1829                         LASSERT (list_empty(&kranal_data.kra_conns[i]));
1830
1831                 PORTAL_FREE(kranal_data.kra_conns,
1832                             sizeof (struct list_head) * 
1833                             kranal_data.kra_conn_hash_size);
1834         }
1835
1836         for (i = 0; i < kranal_data.kra_ndevs; i++)
1837                 kranal_device_fini(&kranal_data.kra_devices[i]);
1838
1839         kranal_free_txdescs(&kranal_data.kra_idle_txs);
1840         kranal_free_txdescs(&kranal_data.kra_idle_nblk_txs);
1841
1842         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1843                atomic_read(&portal_kmemory));
1844         printk(KERN_INFO "Lustre: RapidArray NAL unloaded (final mem %d)\n",
1845                atomic_read(&portal_kmemory));
1846
1847         kranal_data.kra_init = RANAL_INIT_NOTHING;
1848 }
1849
1850 int
1851 kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1852                     ptl_ni_limits_t *requested_limits,
1853                     ptl_ni_limits_t *actual_limits)
1854 {
1855         static int        device_ids[] = {RAPK_MAIN_DEVICE_ID,
1856                                           RAPK_EXPANSION_DEVICE_ID};
1857         struct timeval    tv;
1858         ptl_process_id_t  process_id;
1859         int               pkmem = atomic_read(&portal_kmemory);
1860         int               rc;
1861         int               i;
1862         kra_device_t     *dev;
1863
1864         LASSERT (nal == &kranal_api);
1865
1866         if (nal->nal_refct != 0) {
1867                 if (actual_limits != NULL)
1868                         *actual_limits = kranal_lib.libnal_ni.ni_actual_limits;
1869                 /* This module got the first ref */
1870                 PORTAL_MODULE_USE;
1871                 return PTL_OK;
1872         }
1873
1874         LASSERT (kranal_data.kra_init == RANAL_INIT_NOTHING);
1875
1876         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
1877
1878         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
1879          * a unique (for all time) connstamp so we can uniquely identify
1880          * the sender.  The connstamp is an incrementing counter
1881          * initialised with seconds + microseconds at startup time.  So we
1882          * rely on NOT creating connections more frequently on average than
1883          * 1MHz to ensure we don't use old connstamps when we reboot. */
1884         do_gettimeofday(&tv);
1885         kranal_data.kra_connstamp =
1886         kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1887
1888         init_MUTEX(&kranal_data.kra_nid_mutex);
1889         init_MUTEX_LOCKED(&kranal_data.kra_listener_signal);
1890
1891         rwlock_init(&kranal_data.kra_global_lock);
1892
1893         for (i = 0; i < RANAL_MAXDEVS; i++ ) {
1894                 kra_device_t  *dev = &kranal_data.kra_devices[i];
1895
1896                 dev->rad_idx = i;
1897                 INIT_LIST_HEAD(&dev->rad_connq);
1898                 init_waitqueue_head(&dev->rad_waitq);
1899                 spin_lock_init(&dev->rad_lock);
1900         }
1901
1902         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1903         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
1904         spin_lock_init(&kranal_data.kra_reaper_lock);
1905
1906         INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
1907         init_waitqueue_head(&kranal_data.kra_connd_waitq);
1908         spin_lock_init(&kranal_data.kra_connd_lock);
1909
1910         INIT_LIST_HEAD(&kranal_data.kra_idle_txs);
1911         INIT_LIST_HEAD(&kranal_data.kra_idle_nblk_txs);
1912         init_waitqueue_head(&kranal_data.kra_idle_tx_waitq);
1913         spin_lock_init(&kranal_data.kra_tx_lock);
1914
1915         /* OK to call kranal_api_shutdown() to cleanup now */
1916         kranal_data.kra_init = RANAL_INIT_DATA;
1917         
1918         kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
1919         PORTAL_ALLOC(kranal_data.kra_peers,
1920                      sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
1921         if (kranal_data.kra_peers == NULL)
1922                 goto failed;
1923
1924         for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1925                 INIT_LIST_HEAD(&kranal_data.kra_peers[i]);
1926
1927         kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE;
1928         PORTAL_ALLOC(kranal_data.kra_conns,
1929                      sizeof(struct list_head) * kranal_data.kra_conn_hash_size);
1930         if (kranal_data.kra_conns == NULL)
1931                 goto failed;
1932
1933         for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1934                 INIT_LIST_HEAD(&kranal_data.kra_conns[i]);
1935
1936         rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, RANAL_NTX);
1937         if (rc != 0)
1938                 goto failed;
1939
1940         rc = kranal_alloc_txdescs(&kranal_data.kra_idle_nblk_txs,RANAL_NTX_NBLK);
1941         if (rc != 0)
1942                 goto failed;
1943
1944         process_id.pid = requested_pid;
1945         process_id.nid = PTL_NID_ANY;           /* don't know my NID yet */
1946
1947         rc = lib_init(&kranal_lib, nal, process_id,
1948                       requested_limits, actual_limits);
1949         if (rc != PTL_OK) {
1950                 CERROR("lib_init failed: error %d\n", rc);
1951                 goto failed;
1952         }
1953
1954         /* lib interface initialised */
1955         kranal_data.kra_init = RANAL_INIT_LIB;
1956         /*****************************************************/
1957
1958         rc = kranal_thread_start(kranal_reaper, NULL);
1959         if (rc != 0) {
1960                 CERROR("Can't spawn ranal reaper: %d\n", rc);
1961                 goto failed;
1962         }
1963
1964         for (i = 0; i < RANAL_N_CONND; i++) {
1965                 rc = kranal_thread_start(kranal_connd, (void *)i);
1966                 if (rc != 0) {
1967                         CERROR("Can't spawn ranal connd[%d]: %d\n",
1968                                i, rc);
1969                         goto failed;
1970                 }
1971         }
1972
1973         LASSERT(kranal_data.kra_ndevs == 0);
1974         for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) {
1975                 dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
1976
1977                 rc = kranal_device_init(device_ids[i], dev);
1978                 if (rc == 0)
1979                         kranal_data.kra_ndevs++;
1980
1981                 rc = kranal_thread_start(kranal_scheduler, dev);
1982                 if (rc != 0) {
1983                         CERROR("Can't spawn ranal scheduler[%d]: %d\n",
1984                                i, rc);
1985                         goto failed;
1986                 }
1987         }
1988
1989         if (kranal_data.kra_ndevs == 0)
1990                 goto failed;
1991
1992         rc = libcfs_nal_cmd_register(RANAL, &kranal_cmd, NULL);
1993         if (rc != 0) {
1994                 CERROR("Can't initialise command interface (rc = %d)\n", rc);
1995                 goto failed;
1996         }
1997
1998         /* flag everything initialised */
1999         kranal_data.kra_init = RANAL_INIT_ALL;
2000         /*****************************************************/
2001
2002         CDEBUG(D_MALLOC, "initial kmem %d\n", atomic_read(&portal_kmemory));
2003         printk(KERN_INFO "Lustre: RapidArray NAL loaded "
2004                "(initial mem %d)\n", pkmem);
2005
2006         return PTL_OK;
2007
2008  failed:
2009         kranal_api_shutdown(&kranal_api);    
2010         return PTL_FAIL;
2011 }
2012
2013 void __exit
2014 kranal_module_fini (void)
2015 {
2016 #ifdef CONFIG_SYSCTL
2017         if (kranal_tunables.kra_sysctl != NULL)
2018                 unregister_sysctl_table(kranal_tunables.kra_sysctl);
2019 #endif
2020         PtlNIFini(kranal_ni);
2021
2022         ptl_unregister_nal(RANAL);
2023 }
2024
2025 int __init
2026 kranal_module_init (void)
2027 {
2028         int    rc;
2029
2030         /* the following must be sizeof(int) for
2031          * proc_dointvec/kranal_listener_procint() */
2032         LASSERT (sizeof(kranal_tunables.kra_timeout) == sizeof(int));
2033         LASSERT (sizeof(kranal_tunables.kra_listener_timeout) == sizeof(int));
2034         LASSERT (sizeof(kranal_tunables.kra_backlog) == sizeof(int));
2035         LASSERT (sizeof(kranal_tunables.kra_port) == sizeof(int));
2036         LASSERT (sizeof(kranal_tunables.kra_max_immediate) == sizeof(int));
2037
2038         kranal_api.nal_ni_init = kranal_api_startup;
2039         kranal_api.nal_ni_fini = kranal_api_shutdown;
2040
2041         /* Initialise dynamic tunables to defaults once only */
2042         kranal_tunables.kra_timeout = RANAL_TIMEOUT;
2043
2044         rc = ptl_register_nal(RANAL, &kranal_api);
2045         if (rc != PTL_OK) {
2046                 CERROR("Can't register RANAL: %d\n", rc);
2047                 return -ENOMEM;               /* or something... */
2048         }
2049
2050         /* Pure gateways want the NAL started up at module load time... */
2051         rc = PtlNIInit(RANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni);
2052         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
2053                 ptl_unregister_nal(RANAL);
2054                 return -ENODEV;
2055         }
2056
2057 #ifdef CONFIG_SYSCTL
2058         /* Press on regardless even if registering sysctl doesn't work */
2059         kranal_tunables.kra_sysctl = 
2060                 register_sysctl_table(kranal_top_ctl_table, 0);
2061 #endif
2062         return 0;
2063 }
2064
2065 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2066 MODULE_DESCRIPTION("Kernel RapidArray NAL v0.01");
2067 MODULE_LICENSE("GPL");
2068
2069 module_init(kranal_module_init);
2070 module_exit(kranal_module_fini);