Whamcloud - gitweb
* 5602 fix improves checks that NID is set correctly and causes incorrect
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23 #include "ranal.h"
24
25
26 nal_t                   kranal_api;
27 ptl_handle_ni_t         kranal_ni;
28 kra_data_t              kranal_data;
29 kra_tunables_t          kranal_tunables;
30
31 #define RANAL_SYSCTL_TIMEOUT           1
32 #define RANAL_SYSCTL_LISTENER_TIMEOUT  2
33 #define RANAL_SYSCTL_BACKLOG           3
34 #define RANAL_SYSCTL_PORT              4
35 #define RANAL_SYSCTL_MAX_IMMEDIATE     5
36
37 #define RANAL_SYSCTL                   202
38
39 static ctl_table kranal_ctl_table[] = {
40         {RANAL_SYSCTL_TIMEOUT, "timeout", 
41          &kranal_tunables.kra_timeout, sizeof(int),
42          0644, NULL, &proc_dointvec},
43         {RANAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", 
44          &kranal_tunables.kra_listener_timeout, sizeof(int),
45          0644, NULL, &proc_dointvec},
46         {RANAL_SYSCTL_BACKLOG, "backlog",
47          &kranal_tunables.kra_backlog, sizeof(int),
48          0644, NULL, kranal_listener_procint},
49         {RANAL_SYSCTL_PORT, "port",
50          &kranal_tunables.kra_port, sizeof(int),
51          0644, NULL, kranal_listener_procint},
52         {RANAL_SYSCTL_MAX_IMMEDIATE, "max_immediate", 
53          &kranal_tunables.kra_max_immediate, sizeof(int),
54          0644, NULL, &proc_dointvec},
55         { 0 }
56 };
57
58 static ctl_table kranal_top_ctl_table[] = {
59         {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table},
60         { 0 }
61 };
62
63 int
64 kranal_sock_write (struct socket *sock, void *buffer, int nob)
65 {
66         int           rc;
67         mm_segment_t  oldmm = get_fs();
68         struct iovec  iov = {
69                 .iov_base = buffer,
70                 .iov_len  = nob
71         };
72         struct msghdr msg = {
73                 .msg_name       = NULL,
74                 .msg_namelen    = 0,
75                 .msg_iov        = &iov,
76                 .msg_iovlen     = 1,
77                 .msg_control    = NULL,
78                 .msg_controllen = 0,
79                 .msg_flags      = MSG_DONTWAIT
80         };
81
82         /* We've set up the socket's send buffer to be large enough for
83          * everything we send, so a single non-blocking send should
84          * complete without error. */
85
86         set_fs(KERNEL_DS);
87         rc = sock_sendmsg(sock, &msg, iov.iov_len);
88         set_fs(oldmm);
89
90         if (rc == nob)
91                 return 0;
92         
93         if (rc >= 0)
94                 return -EAGAIN;
95
96         return rc;
97 }
98
99 int
100 kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
101 {
102         int            rc;
103         mm_segment_t   oldmm = get_fs();
104         long           ticks = timeout * HZ;
105         unsigned long  then;
106         struct timeval tv;
107
108         LASSERT (nob > 0);
109         LASSERT (ticks > 0);
110
111         for (;;) {
112                 struct iovec  iov = {
113                         .iov_base = buffer,
114                         .iov_len  = nob
115                 };
116                 struct msghdr msg = {
117                         .msg_name       = NULL,
118                         .msg_namelen    = 0,
119                         .msg_iov        = &iov,
120                         .msg_iovlen     = 1,
121                         .msg_control    = NULL,
122                         .msg_controllen = 0,
123                         .msg_flags      = 0
124                 };
125
126                 /* Set receive timeout to remaining time */
127                 tv = (struct timeval) {
128                         .tv_sec = ticks / HZ,
129                         .tv_usec = ((ticks % HZ) * 1000000) / HZ
130                 };
131                 set_fs(KERNEL_DS);
132                 rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
133                                      (char *)&tv, sizeof(tv));
134                 set_fs(oldmm);
135                 if (rc != 0) {
136                         CERROR("Can't set socket recv timeout %d: %d\n",
137                                timeout, rc);
138                         return rc;
139                 }
140
141                 set_fs(KERNEL_DS);
142                 then = jiffies;
143                 rc = sock_recvmsg(sock, &msg, iov.iov_len, 0);
144                 ticks -= jiffies - then;
145                 set_fs(oldmm);
146
147                 if (rc < 0)
148                         return rc;
149
150                 if (rc == 0)
151                         return -ECONNABORTED;
152
153                 buffer = ((char *)buffer) + rc;
154                 nob -= rc;
155
156                 if (nob == 0)
157                         return 0;
158
159                 if (ticks <= 0)
160                         return -ETIMEDOUT;
161         }
162 }
163
164 int
165 kranal_create_sock(struct socket **sockp)
166 {
167         struct socket       *sock;
168         int                  rc;
169         int                  option;
170         mm_segment_t         oldmm = get_fs();
171
172         rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock);
173         if (rc != 0) {
174                 CERROR("Can't create socket: %d\n", rc);
175                 return rc;
176         }
177
178         /* Ensure sending connection info doesn't block */
179         option = 2 * sizeof(kra_connreq_t);
180         set_fs(KERNEL_DS);
181         rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
182                              (char *)&option, sizeof(option));
183         set_fs(oldmm);
184         if (rc != 0) {
185                 CERROR("Can't set send buffer %d: %d\n", option, rc);
186                 goto failed;
187         }
188
189         option = 1;
190         set_fs(KERNEL_DS);
191         rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
192                              (char *)&option, sizeof(option));
193         set_fs(oldmm);
194         if (rc != 0) {
195                 CERROR("Can't set SO_REUSEADDR: %d\n", rc);
196                 goto failed;
197         }
198
199         *sockp = sock;
200         return 0;
201
202  failed:
203         sock_release(sock);
204         return rc;
205 }
206
207 void
208 kranal_pause(int ticks)
209 {
210         set_current_state(TASK_UNINTERRUPTIBLE);
211         schedule_timeout(ticks);
212 }
213
214 void
215 kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
216 {
217         RAP_RETURN   rrc;
218
219         memset(connreq, 0, sizeof(*connreq));
220
221         connreq->racr_magic     = RANAL_MSG_MAGIC;
222         connreq->racr_version   = RANAL_MSG_VERSION;
223         connreq->racr_devid     = conn->rac_device->rad_id;
224         connreq->racr_srcnid    = kranal_lib.libnal_ni.ni_pid.nid;
225         connreq->racr_dstnid    = conn->rac_peer->rap_nid;
226         connreq->racr_peerstamp = kranal_data.kra_peerstamp;
227         connreq->racr_connstamp = conn->rac_my_connstamp;
228         connreq->racr_timeout   = conn->rac_timeout;
229
230         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
231         LASSERT(rrc == RAP_SUCCESS);
232 }
233
234 int
235 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
236 {
237         int         rc;
238
239         rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
240         if (rc != 0) {
241                 CERROR("Read failed: %d\n", rc);
242                 return rc;
243         }
244
245         if (connreq->racr_magic != RANAL_MSG_MAGIC) {
246                 if (__swab32(connreq->racr_magic) != RANAL_MSG_MAGIC) {
247                         CERROR("Unexpected magic %08x\n", connreq->racr_magic);
248                         return -EPROTO;
249                 }
250
251                 __swab32s(&connreq->racr_magic);
252                 __swab16s(&connreq->racr_version);
253                 __swab16s(&connreq->racr_devid);
254                 __swab64s(&connreq->racr_srcnid);
255                 __swab64s(&connreq->racr_dstnid);
256                 __swab64s(&connreq->racr_peerstamp);
257                 __swab64s(&connreq->racr_connstamp);
258                 __swab32s(&connreq->racr_timeout);
259
260                 __swab32s(&connreq->racr_riparams.HostId);
261                 __swab32s(&connreq->racr_riparams.FmaDomainHndl);
262                 __swab32s(&connreq->racr_riparams.PTag);
263                 __swab32s(&connreq->racr_riparams.CompletionCookie);
264         }
265
266         if (connreq->racr_version != RANAL_MSG_VERSION) {
267                 CERROR("Unexpected version %d\n", connreq->racr_version);
268                 return -EPROTO;
269         }
270
271         if (connreq->racr_srcnid == PTL_NID_ANY ||
272             connreq->racr_dstnid == PTL_NID_ANY) {
273                 CERROR("Received PTL_NID_ANY\n");
274                 return -EPROTO;
275         }
276
277         if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {
278                 CERROR("Received timeout %d < MIN %d\n",
279                        connreq->racr_timeout, RANAL_MIN_TIMEOUT);
280                 return -EPROTO;
281         }
282         
283         return 0;
284 }
285
286 int
287 kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
288 {
289         kra_conn_t         *conn;
290         struct list_head   *ctmp;
291         struct list_head   *cnxt;
292         int                 loopback;
293         int                 count = 0;
294
295         loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
296
297         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
298                 conn = list_entry(ctmp, kra_conn_t, rac_list);
299
300                 if (conn == newconn)
301                         continue;
302
303                 if (conn->rac_peerstamp != newconn->rac_peerstamp) {
304                         CDEBUG(D_NET, "Closing stale conn nid:"LPX64
305                                " peerstamp:"LPX64"("LPX64")\n", peer->rap_nid,
306                                conn->rac_peerstamp, newconn->rac_peerstamp);
307                         LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
308                         count++;
309                         kranal_close_conn_locked(conn, -ESTALE);
310                         continue;
311                 }
312
313                 if (conn->rac_device != newconn->rac_device)
314                         continue;
315                 
316                 if (loopback &&
317                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
318                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
319                         continue;
320                     
321                 LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
322
323                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64
324                        " connstamp:"LPX64"("LPX64")\n", peer->rap_nid, 
325                        conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
326
327                 count++;
328                 kranal_close_conn_locked(conn, -ESTALE);
329         }
330
331         return count;
332 }
333
334 int
335 kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
336 {
337         kra_conn_t       *conn;
338         struct list_head *tmp;
339         int               loopback;
340
341         loopback = peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid;
342         
343         list_for_each(tmp, &peer->rap_conns) {
344                 conn = list_entry(tmp, kra_conn_t, rac_list);
345
346                 /* 'newconn' is from an earlier version of 'peer'!!! */
347                 if (newconn->rac_peerstamp < conn->rac_peerstamp)
348                         return 1;
349
350                 /* 'conn' is from an earlier version of 'peer': it will be
351                  * removed when we cull stale conns later on... */
352                 if (newconn->rac_peerstamp > conn->rac_peerstamp)
353                         continue;
354
355                 /* Different devices are OK */
356                 if (conn->rac_device != newconn->rac_device)
357                         continue;
358
359                 /* It's me connecting to myself */
360                 if (loopback &&
361                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
362                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
363                         continue;
364
365                 /* 'newconn' is an earlier connection from 'peer'!!! */
366                 if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
367                         return 2;
368                 
369                 /* 'conn' is an earlier connection from 'peer': it will be
370                  * removed when we cull stale conns later on... */
371                 if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
372                         continue;
373                 
374                 /* 'newconn' has the SAME connection stamp; 'peer' isn't
375                  * playing the game... */
376                 return 3;
377         }
378
379         return 0;
380 }
381
382 void
383 kranal_set_conn_uniqueness (kra_conn_t *conn)
384 {
385         unsigned long  flags;
386
387         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
388
389         conn->rac_my_connstamp = kranal_data.kra_connstamp++;
390
391         do {    /* allocate a unique cqid */
392                 conn->rac_cqid = kranal_data.kra_next_cqid++;
393         } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
394         
395
396         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
397 }
398
399 int
400 kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
401 {
402         kra_conn_t    *conn;
403         RAP_RETURN     rrc;
404
405         LASSERT (!in_interrupt());
406         PORTAL_ALLOC(conn, sizeof(*conn));
407
408         if (conn == NULL)
409                 return -ENOMEM;
410
411         memset(conn, 0, sizeof(*conn));
412         atomic_set(&conn->rac_refcount, 1);
413         INIT_LIST_HEAD(&conn->rac_list);
414         INIT_LIST_HEAD(&conn->rac_hashlist);
415         INIT_LIST_HEAD(&conn->rac_schedlist);
416         INIT_LIST_HEAD(&conn->rac_fmaq);
417         INIT_LIST_HEAD(&conn->rac_rdmaq);
418         INIT_LIST_HEAD(&conn->rac_replyq);
419         spin_lock_init(&conn->rac_lock);
420
421         kranal_set_conn_uniqueness(conn);
422
423         conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
424         kranal_update_reaper_timeout(conn->rac_timeout);
425
426         rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
427                            &conn->rac_rihandle);
428         if (rrc != RAP_SUCCESS) {
429                 CERROR("RapkCreateRi failed: %d\n", rrc);
430                 PORTAL_FREE(conn, sizeof(*conn));
431                 return -ENETDOWN;
432         }
433
434         atomic_inc(&kranal_data.kra_nconns);
435         *connp = conn;
436         return 0;
437 }
438
439 void
440 kranal_destroy_conn(kra_conn_t *conn) 
441 {
442         RAP_RETURN         rrc;
443
444         LASSERT (!in_interrupt());
445         LASSERT (!conn->rac_scheduled);
446         LASSERT (list_empty(&conn->rac_list));
447         LASSERT (list_empty(&conn->rac_hashlist));
448         LASSERT (list_empty(&conn->rac_schedlist));
449         LASSERT (atomic_read(&conn->rac_refcount) == 0);
450         LASSERT (list_empty(&conn->rac_fmaq));
451         LASSERT (list_empty(&conn->rac_rdmaq));
452         LASSERT (list_empty(&conn->rac_replyq));
453
454         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
455                             conn->rac_rihandle);
456         LASSERT (rrc == RAP_SUCCESS);
457
458         if (conn->rac_peer != NULL)
459                 kranal_peer_decref(conn->rac_peer);
460
461         PORTAL_FREE(conn, sizeof(*conn));
462         atomic_dec(&kranal_data.kra_nconns);
463 }
464
465 void
466 kranal_terminate_conn_locked (kra_conn_t *conn)
467 {
468         LASSERT (!in_interrupt());
469         LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
470         LASSERT (!list_empty(&conn->rac_hashlist));
471         LASSERT (list_empty(&conn->rac_list));
472
473         /* Remove from conn hash table: no new callbacks */
474         list_del_init(&conn->rac_hashlist);
475         kranal_conn_decref(conn);
476
477         conn->rac_state = RANAL_CONN_CLOSED;
478
479         /* schedule to clear out all uncompleted comms in context of dev's
480          * scheduler */
481         kranal_schedule_conn(conn);
482 }
483
484 void
485 kranal_close_conn_locked (kra_conn_t *conn, int error)
486 {
487         kra_peer_t        *peer = conn->rac_peer;
488
489         CDEBUG(error == 0 ? D_NET : D_ERROR,
490                "closing conn to "LPX64": error %d\n", peer->rap_nid, error);
491
492         LASSERT (!in_interrupt());
493         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
494         LASSERT (!list_empty(&conn->rac_hashlist));
495         LASSERT (!list_empty(&conn->rac_list));
496
497         list_del_init(&conn->rac_list);
498
499         if (list_empty(&peer->rap_conns) &&
500             peer->rap_persistence == 0) {
501                 /* Non-persistent peer with no more conns... */
502                 kranal_unlink_peer_locked(peer);
503         }
504                         
505         /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
506          * full timeout */
507         conn->rac_last_rx = jiffies;
508         mb();
509
510         conn->rac_state = RANAL_CONN_CLOSING;
511         kranal_schedule_conn(conn);             /* schedule sending CLOSE */
512
513         kranal_conn_decref(conn);               /* lose peer's ref */
514 }
515
516 void
517 kranal_close_conn (kra_conn_t *conn, int error)
518 {
519         unsigned long    flags;
520         
521
522         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
523         
524         if (conn->rac_state == RANAL_CONN_ESTABLISHED)
525                 kranal_close_conn_locked(conn, error);
526         
527         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
528 }
529
530 int
531 kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq, 
532                        __u32 peer_ip, int peer_port)
533 {
534         RAP_RETURN    rrc;
535         
536         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
537         if (rrc != RAP_SUCCESS) {
538                 CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n", 
539                        HIPQUAD(peer_ip), peer_port, rrc);
540                 return -EPROTO;
541         }
542         
543         conn->rac_peerstamp = connreq->racr_peerstamp;
544         conn->rac_peer_connstamp = connreq->racr_connstamp;
545         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
546         kranal_update_reaper_timeout(conn->rac_keepalive);
547         return 0;
548 }
549
550 int
551 kranal_passive_conn_handshake (struct socket *sock, ptl_nid_t *src_nidp, 
552                                ptl_nid_t *dst_nidp, kra_conn_t **connp)
553 {
554         struct sockaddr_in   addr;
555         __u32                peer_ip;
556         unsigned int         peer_port;
557         kra_connreq_t        connreq;
558         ptl_nid_t            src_nid;
559         ptl_nid_t            dst_nid;
560         kra_conn_t          *conn;
561         kra_device_t        *dev;
562         int                  rc;
563         int                  len;
564         int                  i;
565
566         len = sizeof(addr);
567         rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2);
568         if (rc != 0) {
569                 CERROR("Can't get peer's IP: %d\n", rc);
570                 return rc;
571         }
572
573         peer_ip = ntohl(addr.sin_addr.s_addr);
574         peer_port = ntohs(addr.sin_port);
575
576         if (peer_port >= 1024) {
577                 CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n",
578                        HIPQUAD(peer_ip), peer_port);
579                 return -ECONNREFUSED;
580         }
581
582         rc = kranal_recv_connreq(sock, &connreq, 
583                                  kranal_tunables.kra_listener_timeout);
584         if (rc != 0) {
585                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
586                        HIPQUAD(peer_ip), peer_port, rc);
587                 return rc;
588         }
589
590         src_nid = connreq.racr_srcnid;
591         dst_nid = connreq.racr_dstnid;
592
593         for (i = 0;;i++) {
594                 if (i == kranal_data.kra_ndevs) {
595                         CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
596                                connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
597                         return -ENODEV;
598                 }
599                 dev = &kranal_data.kra_devices[i];
600                 if (dev->rad_id == connreq.racr_devid)
601                         break;
602         }
603
604         rc = kranal_create_conn(&conn, dev);
605         if (rc != 0)
606                 return rc;
607
608         rc = kranal_set_conn_params(conn, &connreq, peer_ip, peer_port);
609         if (rc != 0) {
610                 kranal_conn_decref(conn);
611                 return rc;
612         }
613
614         kranal_pack_connreq(&connreq, conn);
615
616         rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
617         if (rc != 0) {
618                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
619                        HIPQUAD(peer_ip), peer_port, rc);
620                 kranal_conn_decref(conn);
621                 return rc;
622         }
623
624         *connp = conn;
625         *src_nidp = src_nid;
626         *dst_nidp = dst_nid;
627         return 0;
628 }
629
630 int
631 ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
632 {
633         struct sockaddr_in  locaddr;
634         struct sockaddr_in  srvaddr;
635         struct socket      *sock;
636         unsigned int        port;
637         int                 rc;
638
639         for (port = 1023; port >= 512; port--) {
640
641                 memset(&locaddr, 0, sizeof(locaddr)); 
642                 locaddr.sin_family      = AF_INET; 
643                 locaddr.sin_port        = htons(port);
644                 locaddr.sin_addr.s_addr = htonl(INADDR_ANY);
645
646                 memset (&srvaddr, 0, sizeof (srvaddr));
647                 srvaddr.sin_family      = AF_INET;
648                 srvaddr.sin_port        = htons (peer->rap_port);
649                 srvaddr.sin_addr.s_addr = htonl (peer->rap_ip);
650
651                 rc = kranal_create_sock(&sock);
652                 if (rc != 0)
653                         return rc;
654
655                 rc = sock->ops->bind(sock,
656                                      (struct sockaddr *)&locaddr, sizeof(locaddr));
657                 if (rc != 0) {
658                         sock_release(sock);
659                         
660                         if (rc == -EADDRINUSE) {
661                                 CDEBUG(D_NET, "Port %d already in use\n", port);
662                                 continue;
663                         }
664
665                         CERROR("Can't bind to reserved port %d: %d\n", port, rc);
666                         return rc;
667                 }
668
669                 rc = sock->ops->connect(sock,
670                                         (struct sockaddr *)&srvaddr, sizeof(srvaddr),
671                                         0);
672                 if (rc == 0) {
673                         *sockp = sock;
674                         return 0;
675                 }
676                 
677                 sock_release(sock);
678
679                 if (rc != -EADDRNOTAVAIL) {
680                         CERROR("Can't connect port %d to %u.%u.%u.%u/%d: %d\n",
681                                port, HIPQUAD(peer->rap_ip), peer->rap_port, rc);
682                         return rc;
683                 }
684                 
685                 CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", 
686                        port, HIPQUAD(peer->rap_ip), peer->rap_port);
687         }
688
689         /* all ports busy */
690         return -EHOSTUNREACH;
691 }
692
693
694 int
695 kranal_active_conn_handshake(kra_peer_t *peer, 
696                              ptl_nid_t *dst_nidp, kra_conn_t **connp)
697 {
698         kra_connreq_t       connreq;
699         kra_conn_t         *conn;
700         kra_device_t       *dev;
701         struct socket      *sock;
702         int                 rc;
703         unsigned int        idx;
704
705         /* spread connections over all devices using both peer NIDs to ensure
706          * all nids use all devices */
707         idx = peer->rap_nid + kranal_lib.libnal_ni.ni_pid.nid;
708         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
709
710         rc = kranal_create_conn(&conn, dev);
711         if (rc != 0)
712                 return rc;
713
714         kranal_pack_connreq(&connreq, conn);
715         
716         rc = ranal_connect_sock(peer, &sock);
717         if (rc != 0)
718                 goto failed_0;
719
720         /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
721          * immediately after accepting a connection, so we connect and then
722          * send immediately. */
723
724         rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
725         if (rc != 0) {
726                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
727                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
728                 goto failed_1;
729         }
730
731         rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout);
732         if (rc != 0) {
733                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
734                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
735                 goto failed_1;
736         }
737
738         sock_release(sock);
739         rc = -EPROTO;
740
741         if (connreq.racr_srcnid != peer->rap_nid) {
742                 CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
743                        "received "LPX64" expected "LPX64"\n",
744                        HIPQUAD(peer->rap_ip), peer->rap_port, 
745                        connreq.racr_srcnid, peer->rap_nid);
746                 goto failed_0;
747         }
748
749         if (connreq.racr_devid != dev->rad_id) {
750                 CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
751                        "received %d expected %d\n",
752                        HIPQUAD(peer->rap_ip), peer->rap_port, 
753                        connreq.racr_devid, dev->rad_id);
754                 goto failed_0;
755         }
756
757         rc = kranal_set_conn_params(conn, &connreq, 
758                                     peer->rap_ip, peer->rap_port);
759         if (rc != 0)
760                 goto failed_0;
761
762         *connp = conn;
763         *dst_nidp = connreq.racr_dstnid;
764         return 0;
765
766  failed_1:
767         sock_release(sock);
768  failed_0:
769         kranal_conn_decref(conn);
770         return rc;
771 }
772
773 int
774 kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
775 {
776         kra_peer_t        *peer2;
777         kra_tx_t          *tx;
778         ptl_nid_t          peer_nid;
779         ptl_nid_t          dst_nid;
780         unsigned long      flags;
781         kra_conn_t        *conn;
782         int                rc;
783         int                nstale;
784         int                new_peer = 0;
785
786         if (sock == NULL) {
787                 /* active: connd wants to connect to 'peer' */
788                 LASSERT (peer != NULL);
789                 LASSERT (peer->rap_connecting);
790                 
791                 rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
792                 if (rc != 0)
793                         return rc;
794
795                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
796
797                 if (!kranal_peer_active(peer)) {
798                         /* raced with peer getting unlinked */
799                         write_unlock_irqrestore(&kranal_data.kra_global_lock, 
800                                                 flags);
801                         kranal_conn_decref(conn);
802                         return -ESTALE;
803                 }
804
805                 peer_nid = peer->rap_nid;
806         } else {
807                 /* passive: listener accepted 'sock' */
808                 LASSERT (peer == NULL);
809
810                 rc = kranal_passive_conn_handshake(sock, &peer_nid,
811                                                    &dst_nid, &conn);
812                 if (rc != 0)
813                         return rc;
814
815                 /* assume this is a new peer */
816                 peer = kranal_create_peer(peer_nid);
817                 if (peer == NULL) {
818                         CERROR("Can't allocate peer for "LPX64"\n", peer_nid);
819                         kranal_conn_decref(conn);
820                         return -ENOMEM;
821                 }
822
823                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
824
825                 peer2 = kranal_find_peer_locked(peer_nid);
826                 if (peer2 == NULL) {
827                         new_peer = 1;
828                 } else {
829                         /* peer_nid already in the peer table */
830                         kranal_peer_decref(peer);
831                         peer = peer2;
832                 }
833         }
834
835         LASSERT (!new_peer == !kranal_peer_active(peer));
836
837         /* Refuse connection if peer thinks we are a different NID.  We check
838          * this while holding the global lock, to synch with connection
839          * destruction on NID change. */
840         if (dst_nid != kranal_lib.libnal_ni.ni_pid.nid) {
841                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
842
843                 CERROR("Stale/bad connection with "LPX64
844                        ": dst_nid "LPX64", expected "LPX64"\n",
845                        peer_nid, dst_nid, kranal_lib.libnal_ni.ni_pid.nid);
846                 rc = -ESTALE;
847                 goto failed;
848         }
849
850         /* Refuse to duplicate an existing connection (both sides might try to
851          * connect at once).  NB we return success!  We _are_ connected so we
852          * _don't_ have any blocked txs to complete with failure. */
853         rc = kranal_conn_isdup_locked(peer, conn);
854         if (rc != 0) {
855                 LASSERT (!list_empty(&peer->rap_conns));
856                 LASSERT (list_empty(&peer->rap_tx_queue));
857                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
858                 CWARN("Not creating duplicate connection to "LPX64": %d\n",
859                       peer_nid, rc);
860                 rc = 0;
861                 goto failed;
862         }
863
864         if (new_peer) {
865                 /* peer table takes my ref on the new peer */
866                 list_add_tail(&peer->rap_list,
867                               kranal_nid2peerlist(peer_nid));
868         }
869         
870         kranal_peer_addref(peer);               /* +1 ref for conn */
871         conn->rac_peer = peer;
872         list_add_tail(&conn->rac_list, &peer->rap_conns);
873
874         kranal_conn_addref(conn);               /* +1 ref for conn table */
875         list_add_tail(&conn->rac_hashlist,
876                       kranal_cqid2connlist(conn->rac_cqid));
877
878         /* Schedule all packets blocking for a connection */
879         while (!list_empty(&peer->rap_tx_queue)) {
880                 tx = list_entry(peer->rap_tx_queue.next,
881                                 kra_tx_t, tx_list);
882
883                 list_del(&tx->tx_list);
884                 kranal_post_fma(conn, tx);
885         }
886
887         nstale = kranal_close_stale_conns_locked(peer, conn);
888
889         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
890
891         /* CAVEAT EMPTOR: passive peer can disappear NOW */
892
893         if (nstale != 0)
894                 CWARN("Closed %d stale conns to "LPX64"\n", nstale, peer_nid);
895
896         /* Ensure conn gets checked.  Transmits may have been queued and an
897          * FMA event may have happened before it got in the cq hash table */
898         kranal_schedule_conn(conn);
899         return 0;
900
901  failed:
902         if (new_peer)
903                 kranal_peer_decref(peer);
904         kranal_conn_decref(conn);
905         return rc;
906 }
907
908 void
909 kranal_connect (kra_peer_t *peer)
910 {
911         kra_tx_t          *tx;
912         unsigned long      flags;
913         struct list_head   zombies;
914         int                rc;
915
916         LASSERT (peer->rap_connecting);
917
918         rc = kranal_conn_handshake(NULL, peer);
919
920         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
921
922         LASSERT (peer->rap_connecting);
923         peer->rap_connecting = 0;
924
925         if (rc == 0) {
926                 /* kranal_conn_handshake() queues blocked txs immediately on
927                  * success to avoid messages jumping the queue */
928                 LASSERT (list_empty(&peer->rap_tx_queue));
929
930                 /* reset reconnection timeouts */
931                 peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
932                 peer->rap_reconnect_time = CURRENT_SECONDS;
933
934                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
935                 return;
936         }
937
938         LASSERT (peer->rap_reconnect_interval != 0);
939         peer->rap_reconnect_time = CURRENT_SECONDS + peer->rap_reconnect_interval;
940         peer->rap_reconnect_interval = MAX(RANAL_MAX_RECONNECT_INTERVAL,
941                                            1 * peer->rap_reconnect_interval);
942
943         /* Grab all blocked packets while we have the global lock */
944         list_add(&zombies, &peer->rap_tx_queue);
945         list_del_init(&peer->rap_tx_queue);
946
947         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
948
949         if (list_empty(&zombies))
950                 return;
951
952         CWARN("Dropping packets for "LPX64": connection failed\n",
953               peer->rap_nid);
954
955         do {
956                 tx = list_entry(zombies.next, kra_tx_t, tx_list);
957
958                 list_del(&tx->tx_list);
959                 kranal_tx_done(tx, -EHOSTUNREACH);
960
961         } while (!list_empty(&zombies));
962 }
963
964 void
965 kranal_free_acceptsock (kra_acceptsock_t *ras)
966 {
967         sock_release(ras->ras_sock);
968         PORTAL_FREE(ras, sizeof(*ras));
969 }
970
971 int
972 kranal_listener (void *arg)
973 {
974         struct sockaddr_in addr;
975         wait_queue_t       wait;
976         struct socket     *sock;
977         kra_acceptsock_t  *ras;
978         int                port;
979         char               name[16];
980         int                rc;
981         unsigned long      flags;
982
983         /* Parent thread holds kra_nid_mutex, and is, or is about to
984          * block on kra_listener_signal */
985
986         port = kranal_tunables.kra_port;
987         snprintf(name, sizeof(name), "kranal_lstn%03d", port);
988         kportal_daemonize(name);
989         kportal_blockallsigs();
990
991         init_waitqueue_entry(&wait, current);
992
993         rc = kranal_create_sock(&sock);
994         if (rc != 0)
995                 goto out_0;
996
997         memset(&addr, 0, sizeof(addr));
998         addr.sin_family      = AF_INET;
999         addr.sin_port        = htons(port);
1000         addr.sin_addr.s_addr = INADDR_ANY;
1001
1002         rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
1003         if (rc != 0) {
1004                 CERROR("Can't bind to port %d\n", port);
1005                 goto out_1;
1006         }
1007
1008         rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
1009         if (rc != 0) {
1010                 CERROR("Can't set listen backlog %d: %d\n", 
1011                        kranal_tunables.kra_backlog, rc);
1012                 goto out_1;
1013         }
1014
1015         LASSERT (kranal_data.kra_listener_sock == NULL);
1016         kranal_data.kra_listener_sock = sock;
1017
1018         /* unblock waiting parent */
1019         LASSERT (kranal_data.kra_listener_shutdown == 0);
1020         up(&kranal_data.kra_listener_signal);
1021
1022         /* Wake me any time something happens on my socket */
1023         add_wait_queue(sock->sk->sk_sleep, &wait);
1024         ras = NULL;
1025
1026         while (kranal_data.kra_listener_shutdown == 0) {
1027
1028                 if (ras == NULL) {
1029                         PORTAL_ALLOC(ras, sizeof(*ras));
1030                         if (ras == NULL) {
1031                                 CERROR("Out of Memory: pausing...\n");
1032                                 kranal_pause(HZ);
1033                                 continue;
1034                         }
1035                         ras->ras_sock = NULL;
1036                 }
1037
1038                 if (ras->ras_sock == NULL) {
1039                         ras->ras_sock = sock_alloc();
1040                         if (ras->ras_sock == NULL) {
1041                                 CERROR("Can't allocate socket: pausing...\n");
1042                                 kranal_pause(HZ);
1043                                 continue;
1044                         }
1045                         /* XXX this should add a ref to sock->ops->owner, if
1046                          * TCP could be a module */
1047                         ras->ras_sock->type = sock->type;
1048                         ras->ras_sock->ops = sock->ops;
1049                 }
1050                 
1051                 set_current_state(TASK_INTERRUPTIBLE);
1052
1053                 rc = sock->ops->accept(sock, ras->ras_sock, O_NONBLOCK);
1054
1055                 /* Sleep for socket activity? */
1056                 if (rc == -EAGAIN &&
1057                     kranal_data.kra_listener_shutdown == 0)
1058                         schedule();
1059
1060                 set_current_state(TASK_RUNNING);
1061
1062                 if (rc == 0) {
1063                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1064                         
1065                         list_add_tail(&ras->ras_list, 
1066                                       &kranal_data.kra_connd_acceptq);
1067
1068                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1069                         wake_up(&kranal_data.kra_connd_waitq);
1070
1071                         ras = NULL;
1072                         continue;
1073                 }
1074                 
1075                 if (rc != -EAGAIN) {
1076                         CERROR("Accept failed: %d, pausing...\n", rc);
1077                         kranal_pause(HZ);
1078                 }
1079         }
1080
1081         if (ras != NULL) {
1082                 if (ras->ras_sock != NULL)
1083                         sock_release(ras->ras_sock);
1084                 PORTAL_FREE(ras, sizeof(*ras));
1085         }
1086
1087         rc = 0;
1088         remove_wait_queue(sock->sk->sk_sleep, &wait);
1089  out_1:
1090         sock_release(sock);
1091         kranal_data.kra_listener_sock = NULL;
1092  out_0:
1093         /* set completion status and unblock thread waiting for me 
1094          * (parent on startup failure, executioner on normal shutdown) */
1095         kranal_data.kra_listener_shutdown = rc;
1096         up(&kranal_data.kra_listener_signal);
1097
1098         return 0;
1099 }
1100
1101 int
1102 kranal_start_listener (void)
1103 {
1104         long           pid;
1105         int            rc;
1106
1107         CDEBUG(D_WARNING, "Starting listener\n");
1108
1109         /* Called holding kra_nid_mutex: listener stopped */
1110         LASSERT (kranal_data.kra_listener_sock == NULL);
1111
1112         kranal_data.kra_listener_shutdown = 0;
1113         pid = kernel_thread(kranal_listener, NULL, 0);
1114         if (pid < 0) {
1115                 CERROR("Can't spawn listener: %ld\n", pid);
1116                 return (int)pid;
1117         }
1118
1119         /* Block until listener has started up. */
1120         down(&kranal_data.kra_listener_signal);
1121
1122         rc = kranal_data.kra_listener_shutdown;
1123         LASSERT ((rc != 0) == (kranal_data.kra_listener_sock == NULL));
1124
1125         CDEBUG(D_WARNING, "Listener %ld started OK\n", pid);
1126         return rc;
1127 }
1128
1129 void
1130 kranal_stop_listener(int clear_acceptq)
1131 {
1132         struct list_head  zombie_accepts;
1133         unsigned long     flags;
1134         kra_acceptsock_t *ras;
1135
1136         CDEBUG(D_WARNING, "Stopping listener\n");
1137
1138         /* Called holding kra_nid_mutex: listener running */
1139         LASSERT (kranal_data.kra_listener_sock != NULL);
1140
1141         kranal_data.kra_listener_shutdown = 1;
1142         wake_up_all(kranal_data.kra_listener_sock->sk->sk_sleep);
1143
1144         /* Block until listener has torn down. */
1145         down(&kranal_data.kra_listener_signal);
1146
1147         LASSERT (kranal_data.kra_listener_sock == NULL);
1148         CDEBUG(D_WARNING, "Listener stopped\n");
1149
1150         if (!clear_acceptq)
1151                 return;
1152         
1153         /* Close any unhandled accepts */
1154         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1155
1156         list_add(&zombie_accepts, &kranal_data.kra_connd_acceptq);
1157         list_del_init(&kranal_data.kra_connd_acceptq);
1158
1159         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1160         
1161         while (!list_empty(&zombie_accepts)) {
1162                 ras = list_entry(zombie_accepts.next, 
1163                                  kra_acceptsock_t, ras_list);
1164                 list_del(&ras->ras_list);
1165                 kranal_free_acceptsock(ras);
1166         }
1167 }
1168
1169 int 
1170 kranal_listener_procint(ctl_table *table, int write, struct file *filp,
1171                         void *buffer, size_t *lenp)
1172 {
1173         int   *tunable = (int *)table->data;
1174         int    old_val;
1175         int    rc;
1176
1177         /* No race with nal initialisation since the nal is setup all the time
1178          * it's loaded.  When that changes, change this! */
1179         LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
1180
1181         down(&kranal_data.kra_nid_mutex);
1182
1183         LASSERT (tunable == &kranal_tunables.kra_port ||
1184                  tunable == &kranal_tunables.kra_backlog);
1185         old_val = *tunable;
1186
1187         rc = proc_dointvec(table, write, filp, buffer, lenp);
1188
1189         if (write &&
1190             (*tunable != old_val ||
1191              kranal_data.kra_listener_sock == NULL)) {
1192
1193                 if (kranal_data.kra_listener_sock != NULL)
1194                         kranal_stop_listener(0);
1195
1196                 rc = kranal_start_listener();
1197
1198                 if (rc != 0) {
1199                         CWARN("Unable to start listener with new tunable:"
1200                               " reverting to old value\n");
1201                         *tunable = old_val;
1202                         kranal_start_listener();
1203                 }
1204         }
1205
1206         up(&kranal_data.kra_nid_mutex);
1207
1208         LASSERT (kranal_data.kra_init == RANAL_INIT_ALL);
1209         return rc;
1210 }
1211
1212 int
1213 kranal_set_mynid(ptl_nid_t nid)
1214 {
1215         unsigned long    flags;
1216         lib_ni_t        *ni = &kranal_lib.libnal_ni;
1217         int              rc = 0;
1218
1219         CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n",
1220                nid, ni->ni_pid.nid);
1221
1222         down(&kranal_data.kra_nid_mutex);
1223
1224         if (nid == ni->ni_pid.nid) {
1225                 /* no change of NID */
1226                 up(&kranal_data.kra_nid_mutex);
1227                 return 0;
1228         }
1229
1230         if (kranal_data.kra_listener_sock != NULL)
1231                 kranal_stop_listener(1);
1232
1233         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1234         kranal_data.kra_peerstamp++;
1235         ni->ni_pid.nid = nid;
1236         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1237         
1238         /* Delete all existing peers and their connections after new
1239          * NID/connstamp set to ensure no old connections in our brave
1240          * new world. */
1241         kranal_del_peer(PTL_NID_ANY, 0);
1242
1243         if (nid != PTL_NID_ANY)
1244                 rc = kranal_start_listener();
1245
1246         up(&kranal_data.kra_nid_mutex);
1247         return rc;
1248 }
1249
1250 kra_peer_t *
1251 kranal_create_peer (ptl_nid_t nid)
1252 {
1253         kra_peer_t *peer;
1254
1255         LASSERT (nid != PTL_NID_ANY);
1256
1257         PORTAL_ALLOC(peer, sizeof(*peer));
1258         if (peer == NULL)
1259                 return NULL;
1260
1261         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
1262
1263         peer->rap_nid = nid;
1264         atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
1265
1266         INIT_LIST_HEAD(&peer->rap_list);
1267         INIT_LIST_HEAD(&peer->rap_connd_list);
1268         INIT_LIST_HEAD(&peer->rap_conns);
1269         INIT_LIST_HEAD(&peer->rap_tx_queue);
1270
1271         peer->rap_reconnect_time = CURRENT_SECONDS;
1272         peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
1273
1274         atomic_inc(&kranal_data.kra_npeers);
1275         return peer;
1276 }
1277
1278 void
1279 kranal_destroy_peer (kra_peer_t *peer)
1280 {
1281         CDEBUG(D_NET, "peer "LPX64" %p deleted\n", peer->rap_nid, peer);
1282
1283         LASSERT (atomic_read(&peer->rap_refcount) == 0);
1284         LASSERT (peer->rap_persistence == 0);
1285         LASSERT (!kranal_peer_active(peer));
1286         LASSERT (!peer->rap_connecting);
1287         LASSERT (list_empty(&peer->rap_conns));
1288         LASSERT (list_empty(&peer->rap_tx_queue));
1289         LASSERT (list_empty(&peer->rap_connd_list));
1290
1291         PORTAL_FREE(peer, sizeof(*peer));
1292
1293         /* NB a peer's connections keep a reference on their peer until
1294          * they are destroyed, so we can be assured that _all_ state to do
1295          * with this peer has been cleaned up when its refcount drops to
1296          * zero. */
1297         atomic_dec(&kranal_data.kra_npeers);
1298 }
1299
1300 kra_peer_t *
1301 kranal_find_peer_locked (ptl_nid_t nid)
1302 {
1303         struct list_head *peer_list = kranal_nid2peerlist(nid);
1304         struct list_head *tmp;
1305         kra_peer_t       *peer;
1306
1307         list_for_each (tmp, peer_list) {
1308
1309                 peer = list_entry(tmp, kra_peer_t, rap_list);
1310
1311                 LASSERT (peer->rap_persistence > 0 ||     /* persistent peer */
1312                          !list_empty(&peer->rap_conns));  /* active conn */
1313
1314                 if (peer->rap_nid != nid)
1315                         continue;
1316
1317                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
1318                        peer, nid, atomic_read(&peer->rap_refcount));
1319                 return peer;
1320         }
1321         return NULL;
1322 }
1323
1324 kra_peer_t *
1325 kranal_find_peer (ptl_nid_t nid)
1326 {
1327         kra_peer_t     *peer;
1328
1329         read_lock(&kranal_data.kra_global_lock);
1330         peer = kranal_find_peer_locked(nid);
1331         if (peer != NULL)                       /* +1 ref for caller? */
1332                 kranal_peer_addref(peer);
1333         read_unlock(&kranal_data.kra_global_lock);
1334
1335         return peer;
1336 }
1337
1338 void
1339 kranal_unlink_peer_locked (kra_peer_t *peer)
1340 {
1341         LASSERT (peer->rap_persistence == 0);
1342         LASSERT (list_empty(&peer->rap_conns));
1343
1344         LASSERT (kranal_peer_active(peer));
1345         list_del_init(&peer->rap_list);
1346
1347         /* lose peerlist's ref */
1348         kranal_peer_decref(peer);
1349 }
1350
1351 int
1352 kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, 
1353                       int *persistencep)
1354 {
1355         kra_peer_t        *peer;
1356         struct list_head  *ptmp;
1357         int                i;
1358
1359         read_lock(&kranal_data.kra_global_lock);
1360
1361         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1362
1363                 list_for_each(ptmp, &kranal_data.kra_peers[i]) {
1364
1365                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1366                         LASSERT (peer->rap_persistence > 0 ||
1367                                  !list_empty(&peer->rap_conns));
1368
1369                         if (index-- > 0)
1370                                 continue;
1371
1372                         *nidp = peer->rap_nid;
1373                         *ipp = peer->rap_ip;
1374                         *portp = peer->rap_port;
1375                         *persistencep = peer->rap_persistence;
1376
1377                         read_unlock(&kranal_data.kra_global_lock);
1378                         return 0;
1379                 }
1380         }
1381
1382         read_unlock(&kranal_data.kra_global_lock);
1383         return -ENOENT;
1384 }
1385
1386 int
1387 kranal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port)
1388 {
1389         unsigned long      flags;
1390         kra_peer_t        *peer;
1391         kra_peer_t        *peer2;
1392
1393         if (nid == PTL_NID_ANY)
1394                 return -EINVAL;
1395
1396         peer = kranal_create_peer(nid);
1397         if (peer == NULL)
1398                 return -ENOMEM;
1399
1400         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1401
1402         peer2 = kranal_find_peer_locked(nid);
1403         if (peer2 != NULL) {
1404                 kranal_peer_decref(peer);
1405                 peer = peer2;
1406         } else {
1407                 /* peer table takes existing ref on peer */
1408                 list_add_tail(&peer->rap_list,
1409                               kranal_nid2peerlist(nid));
1410         }
1411
1412         peer->rap_ip = ip;
1413         peer->rap_port = port;
1414         peer->rap_persistence++;
1415
1416         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1417         return 0;
1418 }
1419
1420 void
1421 kranal_del_peer_locked (kra_peer_t *peer, int single_share)
1422 {
1423         struct list_head *ctmp;
1424         struct list_head *cnxt;
1425         kra_conn_t       *conn;
1426
1427         if (!single_share)
1428                 peer->rap_persistence = 0;
1429         else if (peer->rap_persistence > 0)
1430                 peer->rap_persistence--;
1431
1432         if (peer->rap_persistence != 0)
1433                 return;
1434
1435         if (list_empty(&peer->rap_conns)) {
1436                 kranal_unlink_peer_locked(peer);
1437         } else {
1438                 list_for_each_safe(ctmp, cnxt, &peer->rap_conns) {
1439                         conn = list_entry(ctmp, kra_conn_t, rac_list);
1440
1441                         kranal_close_conn_locked(conn, 0);
1442                 }
1443                 /* peer unlinks itself when last conn is closed */
1444         }
1445 }
1446
1447 int
1448 kranal_del_peer (ptl_nid_t nid, int single_share)
1449 {
1450         unsigned long      flags;
1451         struct list_head  *ptmp;
1452         struct list_head  *pnxt;
1453         kra_peer_t        *peer;
1454         int                lo;
1455         int                hi;
1456         int                i;
1457         int                rc = -ENOENT;
1458
1459         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1460
1461         if (nid != PTL_NID_ANY)
1462                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1463         else {
1464                 lo = 0;
1465                 hi = kranal_data.kra_peer_hash_size - 1;
1466         }
1467
1468         for (i = lo; i <= hi; i++) {
1469                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1470                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1471                         LASSERT (peer->rap_persistence > 0 ||
1472                                  !list_empty(&peer->rap_conns));
1473
1474                         if (!(nid == PTL_NID_ANY || peer->rap_nid == nid))
1475                                 continue;
1476
1477                         kranal_del_peer_locked(peer, single_share);
1478                         rc = 0;         /* matched something */
1479
1480                         if (single_share)
1481                                 goto out;
1482                 }
1483         }
1484  out:
1485         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1486
1487         return rc;
1488 }
1489
1490 kra_conn_t *
1491 kranal_get_conn_by_idx (int index)
1492 {
1493         kra_peer_t        *peer;
1494         struct list_head  *ptmp;
1495         kra_conn_t        *conn;
1496         struct list_head  *ctmp;
1497         int                i;
1498
1499         read_lock (&kranal_data.kra_global_lock);
1500
1501         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1502                 list_for_each (ptmp, &kranal_data.kra_peers[i]) {
1503
1504                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1505                         LASSERT (peer->rap_persistence > 0 ||
1506                                  !list_empty(&peer->rap_conns));
1507
1508                         list_for_each (ctmp, &peer->rap_conns) {
1509                                 if (index-- > 0)
1510                                         continue;
1511
1512                                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1513                                 CDEBUG(D_NET, "++conn[%p] -> "LPX64" (%d)\n",
1514                                        conn, conn->rac_peer->rap_nid,
1515                                        atomic_read(&conn->rac_refcount));
1516                                 atomic_inc(&conn->rac_refcount);
1517                                 read_unlock(&kranal_data.kra_global_lock);
1518                                 return conn;
1519                         }
1520                 }
1521         }
1522
1523         read_unlock(&kranal_data.kra_global_lock);
1524         return NULL;
1525 }
1526
1527 int
1528 kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
1529 {
1530         kra_conn_t         *conn;
1531         struct list_head   *ctmp;
1532         struct list_head   *cnxt;
1533         int                 count = 0;
1534
1535         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
1536                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1537
1538                 count++;
1539                 kranal_close_conn_locked(conn, why);
1540         }
1541
1542         return count;
1543 }
1544
1545 int
1546 kranal_close_matching_conns (ptl_nid_t nid)
1547 {
1548         unsigned long       flags;
1549         kra_peer_t         *peer;
1550         struct list_head   *ptmp;
1551         struct list_head   *pnxt;
1552         int                 lo;
1553         int                 hi;
1554         int                 i;
1555         int                 count = 0;
1556
1557         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1558
1559         if (nid != PTL_NID_ANY)
1560                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1561         else {
1562                 lo = 0;
1563                 hi = kranal_data.kra_peer_hash_size - 1;
1564         }
1565
1566         for (i = lo; i <= hi; i++) {
1567                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1568
1569                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1570                         LASSERT (peer->rap_persistence > 0 ||
1571                                  !list_empty(&peer->rap_conns));
1572
1573                         if (!(nid == PTL_NID_ANY || nid == peer->rap_nid))
1574                                 continue;
1575
1576                         count += kranal_close_peer_conns_locked(peer, 0);
1577                 }
1578         }
1579
1580         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1581
1582         /* wildcards always succeed */
1583         if (nid == PTL_NID_ANY)
1584                 return 0;
1585
1586         return (count == 0) ? -ENOENT : 0;
1587 }
1588
1589 int
1590 kranal_cmd(struct portals_cfg *pcfg, void * private)
1591 {
1592         int rc = -EINVAL;
1593
1594         LASSERT (pcfg != NULL);
1595
1596         switch(pcfg->pcfg_command) {
1597         case NAL_CMD_GET_PEER: {
1598                 ptl_nid_t   nid = 0;
1599                 __u32       ip = 0;
1600                 int         port = 0;
1601                 int         share_count = 0;
1602
1603                 rc = kranal_get_peer_info(pcfg->pcfg_count,
1604                                           &nid, &ip, &port, &share_count);
1605                 pcfg->pcfg_nid   = nid;
1606                 pcfg->pcfg_size  = 0;
1607                 pcfg->pcfg_id    = ip;
1608                 pcfg->pcfg_misc  = port;
1609                 pcfg->pcfg_count = 0;
1610                 pcfg->pcfg_wait  = share_count;
1611                 break;
1612         }
1613         case NAL_CMD_ADD_PEER: {
1614                 rc = kranal_add_persistent_peer(pcfg->pcfg_nid,
1615                                                 pcfg->pcfg_id, /* IP */
1616                                                 pcfg->pcfg_misc); /* port */
1617                 break;
1618         }
1619         case NAL_CMD_DEL_PEER: {
1620                 rc = kranal_del_peer(pcfg->pcfg_nid, 
1621                                      /* flags == single_share */
1622                                      pcfg->pcfg_flags != 0);
1623                 break;
1624         }
1625         case NAL_CMD_GET_CONN: {
1626                 kra_conn_t *conn = kranal_get_conn_by_idx(pcfg->pcfg_count);
1627
1628                 if (conn == NULL)
1629                         rc = -ENOENT;
1630                 else {
1631                         rc = 0;
1632                         pcfg->pcfg_nid   = conn->rac_peer->rap_nid;
1633                         pcfg->pcfg_id    = conn->rac_device->rad_id;
1634                         pcfg->pcfg_misc  = 0;
1635                         pcfg->pcfg_flags = 0;
1636                         kranal_conn_decref(conn);
1637                 }
1638                 break;
1639         }
1640         case NAL_CMD_CLOSE_CONNECTION: {
1641                 rc = kranal_close_matching_conns(pcfg->pcfg_nid);
1642                 break;
1643         }
1644         case NAL_CMD_REGISTER_MYNID: {
1645                 if (pcfg->pcfg_nid == PTL_NID_ANY)
1646                         rc = -EINVAL;
1647                 else
1648                         rc = kranal_set_mynid(pcfg->pcfg_nid);
1649                 break;
1650         }
1651         }
1652
1653         return rc;
1654 }
1655
1656 void
1657 kranal_free_txdescs(struct list_head *freelist)
1658 {
1659         kra_tx_t    *tx;
1660
1661         while (!list_empty(freelist)) {
1662                 tx = list_entry(freelist->next, kra_tx_t, tx_list);
1663
1664                 list_del(&tx->tx_list);
1665                 PORTAL_FREE(tx->tx_phys, PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
1666                 PORTAL_FREE(tx, sizeof(*tx));
1667         }
1668 }
1669
1670 int
1671 kranal_alloc_txdescs(struct list_head *freelist, int n)
1672 {
1673         int            isnblk = (freelist == &kranal_data.kra_idle_nblk_txs);
1674         int            i;
1675         kra_tx_t      *tx;
1676
1677         LASSERT (freelist == &kranal_data.kra_idle_txs ||
1678                  freelist == &kranal_data.kra_idle_nblk_txs);
1679         LASSERT (list_empty(freelist));
1680
1681         for (i = 0; i < n; i++) {
1682
1683                 PORTAL_ALLOC(tx, sizeof(*tx));
1684                 if (tx == NULL) {
1685                         CERROR("Can't allocate %stx[%d]\n",
1686                                isnblk ? "nblk " : "", i);
1687                         kranal_free_txdescs(freelist);
1688                         return -ENOMEM;
1689                 }
1690
1691                 PORTAL_ALLOC(tx->tx_phys,
1692                              PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
1693                 if (tx->tx_phys == NULL) {
1694                         CERROR("Can't allocate %stx[%d]->tx_phys\n", 
1695                                isnblk ? "nblk " : "", i);
1696
1697                         PORTAL_FREE(tx, sizeof(*tx));
1698                         kranal_free_txdescs(freelist);
1699                         return -ENOMEM;
1700                 }
1701
1702                 tx->tx_isnblk = isnblk;
1703                 tx->tx_buftype = RANAL_BUF_NONE;
1704                 tx->tx_msg.ram_type = RANAL_MSG_NONE;
1705
1706                 list_add(&tx->tx_list, freelist);
1707         }
1708
1709         return 0;
1710 }
1711
1712 int
1713 kranal_device_init(int id, kra_device_t *dev)
1714 {
1715         const int         total_ntx = RANAL_NTX + RANAL_NTX_NBLK;
1716         RAP_RETURN        rrc;
1717
1718         dev->rad_id = id;
1719         rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
1720                                    &dev->rad_handle);
1721         if (rrc != RAP_SUCCESS) {
1722                 CERROR("Can't get Rapidarray Device %d: %d\n", id, rrc);
1723                 goto failed_0;
1724         }
1725
1726         rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
1727         if (rrc != RAP_SUCCESS) {
1728                 CERROR("Can't reserve %d RDMA descriptors"
1729                        " for device %d: %d\n", total_ntx, id, rrc);
1730                 goto failed_1;
1731         }
1732
1733         rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
1734                            &dev->rad_rdma_cqh);
1735         if (rrc != RAP_SUCCESS) {
1736                 CERROR("Can't create rdma cq size %d"
1737                        " for device %d: %d\n", total_ntx, id, rrc);
1738                 goto failed_1;
1739         }
1740
1741         rrc = RapkCreateCQ(dev->rad_handle, RANAL_FMA_CQ_SIZE, RAP_CQTYPE_RECV,
1742                            &dev->rad_fma_cqh);
1743         if (rrc != RAP_SUCCESS) {
1744                 CERROR("Can't create fma cq size %d"
1745                        " for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc);
1746                 goto failed_2;
1747         }
1748
1749         return 0;
1750
1751  failed_2:
1752         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1753  failed_1:
1754         RapkReleaseDevice(dev->rad_handle);
1755  failed_0:
1756         return -ENODEV;
1757 }
1758
1759 void
1760 kranal_device_fini(kra_device_t *dev)
1761 {
1762         LASSERT(dev->rad_scheduler == NULL);
1763         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
1764         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1765         RapkReleaseDevice(dev->rad_handle);
1766 }
1767
1768 void
1769 kranal_api_shutdown (nal_t *nal)
1770 {
1771         int           i;
1772         unsigned long flags;
1773         
1774         if (nal->nal_refct != 0) {
1775                 /* This module got the first ref */
1776                 PORTAL_MODULE_UNUSE;
1777                 return;
1778         }
1779
1780         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1781                atomic_read(&portal_kmemory));
1782
1783         LASSERT (nal == &kranal_api);
1784
1785         switch (kranal_data.kra_init) {
1786         default:
1787                 CERROR("Unexpected state %d\n", kranal_data.kra_init);
1788                 LBUG();
1789
1790         case RANAL_INIT_ALL:
1791                 /* stop calls to nal_cmd */
1792                 libcfs_nal_cmd_unregister(RANAL);
1793                 /* No new persistent peers */
1794
1795                 /* resetting my NID to unadvertises me, removes my
1796                  * listener and nukes all current peers */
1797                 kranal_set_mynid(PTL_NID_ANY);
1798                 /* no new peers or conns */
1799
1800                 /* Wait for all peer/conn state to clean up */
1801                 i = 2;
1802                 while (atomic_read(&kranal_data.kra_nconns) != 0 ||
1803                        atomic_read(&kranal_data.kra_npeers) != 0) {
1804                         i++;
1805                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1806                                "waiting for %d peers and %d conns to close down\n",
1807                                atomic_read(&kranal_data.kra_npeers),
1808                                atomic_read(&kranal_data.kra_nconns));
1809                         kranal_pause(HZ);
1810                 }
1811                 /* fall through */
1812
1813         case RANAL_INIT_LIB:
1814                 lib_fini(&kranal_lib);
1815                 /* fall through */
1816
1817         case RANAL_INIT_DATA:
1818                 break;
1819         }
1820
1821         /* flag threads to terminate; wake and wait for them to die */
1822         kranal_data.kra_shutdown = 1;
1823
1824         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1825                 kra_device_t *dev = &kranal_data.kra_devices[i];
1826
1827                 LASSERT (list_empty(&dev->rad_connq));
1828
1829                 spin_lock_irqsave(&dev->rad_lock, flags);
1830                 wake_up(&dev->rad_waitq);
1831                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1832         }
1833
1834         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1835         wake_up_all(&kranal_data.kra_reaper_waitq);
1836         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1837
1838         LASSERT (list_empty(&kranal_data.kra_connd_peers));
1839         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); 
1840         wake_up_all(&kranal_data.kra_connd_waitq);
1841         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); 
1842
1843         i = 2;
1844         while (atomic_read(&kranal_data.kra_nthreads) != 0) {
1845                 i++;
1846                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1847                        "Waiting for %d threads to terminate\n",
1848                        atomic_read(&kranal_data.kra_nthreads));
1849                 kranal_pause(HZ);
1850         }
1851
1852         LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
1853         if (kranal_data.kra_peers != NULL) {
1854                 for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1855                         LASSERT (list_empty(&kranal_data.kra_peers[i]));
1856
1857                 PORTAL_FREE(kranal_data.kra_peers,
1858                             sizeof (struct list_head) * 
1859                             kranal_data.kra_peer_hash_size);
1860         }
1861
1862         LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
1863         if (kranal_data.kra_conns != NULL) {
1864                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1865                         LASSERT (list_empty(&kranal_data.kra_conns[i]));
1866
1867                 PORTAL_FREE(kranal_data.kra_conns,
1868                             sizeof (struct list_head) * 
1869                             kranal_data.kra_conn_hash_size);
1870         }
1871
1872         for (i = 0; i < kranal_data.kra_ndevs; i++)
1873                 kranal_device_fini(&kranal_data.kra_devices[i]);
1874
1875         kranal_free_txdescs(&kranal_data.kra_idle_txs);
1876         kranal_free_txdescs(&kranal_data.kra_idle_nblk_txs);
1877
1878         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1879                atomic_read(&portal_kmemory));
1880         printk(KERN_INFO "Lustre: RapidArray NAL unloaded (final mem %d)\n",
1881                atomic_read(&portal_kmemory));
1882
1883         kranal_data.kra_init = RANAL_INIT_NOTHING;
1884 }
1885
1886 int
1887 kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1888                     ptl_ni_limits_t *requested_limits,
1889                     ptl_ni_limits_t *actual_limits)
1890 {
1891         static int        device_ids[] = {RAPK_MAIN_DEVICE_ID,
1892                                           RAPK_EXPANSION_DEVICE_ID};
1893         struct timeval    tv;
1894         ptl_process_id_t  process_id;
1895         int               pkmem = atomic_read(&portal_kmemory);
1896         int               rc;
1897         int               i;
1898         kra_device_t     *dev;
1899
1900         LASSERT (nal == &kranal_api);
1901
1902         if (nal->nal_refct != 0) {
1903                 if (actual_limits != NULL)
1904                         *actual_limits = kranal_lib.libnal_ni.ni_actual_limits;
1905                 /* This module got the first ref */
1906                 PORTAL_MODULE_USE;
1907                 return PTL_OK;
1908         }
1909
1910         LASSERT (kranal_data.kra_init == RANAL_INIT_NOTHING);
1911
1912         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
1913
1914         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
1915          * a unique (for all time) connstamp so we can uniquely identify
1916          * the sender.  The connstamp is an incrementing counter
1917          * initialised with seconds + microseconds at startup time.  So we
1918          * rely on NOT creating connections more frequently on average than
1919          * 1MHz to ensure we don't use old connstamps when we reboot. */
1920         do_gettimeofday(&tv);
1921         kranal_data.kra_connstamp =
1922         kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1923
1924         init_MUTEX(&kranal_data.kra_nid_mutex);
1925         init_MUTEX_LOCKED(&kranal_data.kra_listener_signal);
1926
1927         rwlock_init(&kranal_data.kra_global_lock);
1928
1929         for (i = 0; i < RANAL_MAXDEVS; i++ ) {
1930                 kra_device_t  *dev = &kranal_data.kra_devices[i];
1931
1932                 dev->rad_idx = i;
1933                 INIT_LIST_HEAD(&dev->rad_connq);
1934                 init_waitqueue_head(&dev->rad_waitq);
1935                 spin_lock_init(&dev->rad_lock);
1936         }
1937
1938         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1939         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
1940         spin_lock_init(&kranal_data.kra_reaper_lock);
1941
1942         INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
1943         init_waitqueue_head(&kranal_data.kra_connd_waitq);
1944         spin_lock_init(&kranal_data.kra_connd_lock);
1945
1946         INIT_LIST_HEAD(&kranal_data.kra_idle_txs);
1947         INIT_LIST_HEAD(&kranal_data.kra_idle_nblk_txs);
1948         init_waitqueue_head(&kranal_data.kra_idle_tx_waitq);
1949         spin_lock_init(&kranal_data.kra_tx_lock);
1950
1951         /* OK to call kranal_api_shutdown() to cleanup now */
1952         kranal_data.kra_init = RANAL_INIT_DATA;
1953         
1954         kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
1955         PORTAL_ALLOC(kranal_data.kra_peers,
1956                      sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
1957         if (kranal_data.kra_peers == NULL)
1958                 goto failed;
1959
1960         for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1961                 INIT_LIST_HEAD(&kranal_data.kra_peers[i]);
1962
1963         kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE;
1964         PORTAL_ALLOC(kranal_data.kra_conns,
1965                      sizeof(struct list_head) * kranal_data.kra_conn_hash_size);
1966         if (kranal_data.kra_conns == NULL)
1967                 goto failed;
1968
1969         for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1970                 INIT_LIST_HEAD(&kranal_data.kra_conns[i]);
1971
1972         rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, RANAL_NTX);
1973         if (rc != 0)
1974                 goto failed;
1975
1976         rc = kranal_alloc_txdescs(&kranal_data.kra_idle_nblk_txs,RANAL_NTX_NBLK);
1977         if (rc != 0)
1978                 goto failed;
1979
1980         process_id.pid = requested_pid;
1981         process_id.nid = PTL_NID_ANY;           /* don't know my NID yet */
1982
1983         rc = lib_init(&kranal_lib, nal, process_id,
1984                       requested_limits, actual_limits);
1985         if (rc != PTL_OK) {
1986                 CERROR("lib_init failed: error %d\n", rc);
1987                 goto failed;
1988         }
1989
1990         /* lib interface initialised */
1991         kranal_data.kra_init = RANAL_INIT_LIB;
1992         /*****************************************************/
1993
1994         rc = kranal_thread_start(kranal_reaper, NULL);
1995         if (rc != 0) {
1996                 CERROR("Can't spawn ranal reaper: %d\n", rc);
1997                 goto failed;
1998         }
1999
2000         for (i = 0; i < RANAL_N_CONND; i++) {
2001                 rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i);
2002                 if (rc != 0) {
2003                         CERROR("Can't spawn ranal connd[%d]: %d\n",
2004                                i, rc);
2005                         goto failed;
2006                 }
2007         }
2008
2009         LASSERT(kranal_data.kra_ndevs == 0);
2010         for (i = 0; i < sizeof(device_ids)/sizeof(device_ids[0]); i++) {
2011                 dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
2012
2013                 rc = kranal_device_init(device_ids[i], dev);
2014                 if (rc == 0)
2015                         kranal_data.kra_ndevs++;
2016
2017                 rc = kranal_thread_start(kranal_scheduler, dev);
2018                 if (rc != 0) {
2019                         CERROR("Can't spawn ranal scheduler[%d]: %d\n",
2020                                i, rc);
2021                         goto failed;
2022                 }
2023         }
2024
2025         if (kranal_data.kra_ndevs == 0)
2026                 goto failed;
2027
2028         rc = libcfs_nal_cmd_register(RANAL, &kranal_cmd, NULL);
2029         if (rc != 0) {
2030                 CERROR("Can't initialise command interface (rc = %d)\n", rc);
2031                 goto failed;
2032         }
2033
2034         /* flag everything initialised */
2035         kranal_data.kra_init = RANAL_INIT_ALL;
2036         /*****************************************************/
2037
2038         CDEBUG(D_MALLOC, "initial kmem %d\n", atomic_read(&portal_kmemory));
2039         printk(KERN_INFO "Lustre: RapidArray NAL loaded "
2040                "(initial mem %d)\n", pkmem);
2041
2042         return PTL_OK;
2043
2044  failed:
2045         kranal_api_shutdown(&kranal_api);    
2046         return PTL_FAIL;
2047 }
2048
2049 void __exit
2050 kranal_module_fini (void)
2051 {
2052         if (kranal_tunables.kra_sysctl != NULL)
2053                 unregister_sysctl_table(kranal_tunables.kra_sysctl);
2054
2055         PtlNIFini(kranal_ni);
2056
2057         ptl_unregister_nal(RANAL);
2058 }
2059
2060 int __init
2061 kranal_module_init (void)
2062 {
2063         int    rc;
2064
2065         /* the following must be sizeof(int) for
2066          * proc_dointvec/kranal_listener_procint() */
2067         LASSERT (sizeof(kranal_tunables.kra_timeout) == sizeof(int));
2068         LASSERT (sizeof(kranal_tunables.kra_listener_timeout) == sizeof(int));
2069         LASSERT (sizeof(kranal_tunables.kra_backlog) == sizeof(int));
2070         LASSERT (sizeof(kranal_tunables.kra_port) == sizeof(int));
2071         LASSERT (sizeof(kranal_tunables.kra_max_immediate) == sizeof(int));
2072
2073         kranal_api.nal_ni_init = kranal_api_startup;
2074         kranal_api.nal_ni_fini = kranal_api_shutdown;
2075
2076         /* Initialise dynamic tunables to defaults once only */
2077         kranal_tunables.kra_timeout = RANAL_TIMEOUT;
2078         kranal_tunables.kra_listener_timeout = RANAL_LISTENER_TIMEOUT;
2079         kranal_tunables.kra_backlog = RANAL_BACKLOG;
2080         kranal_tunables.kra_port = RANAL_PORT;
2081         kranal_tunables.kra_max_immediate = RANAL_MAX_IMMEDIATE;
2082
2083         rc = ptl_register_nal(RANAL, &kranal_api);
2084         if (rc != PTL_OK) {
2085                 CERROR("Can't register RANAL: %d\n", rc);
2086                 return -ENOMEM;               /* or something... */
2087         }
2088
2089         /* Pure gateways want the NAL started up at module load time... */
2090         rc = PtlNIInit(RANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni);
2091         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
2092                 ptl_unregister_nal(RANAL);
2093                 return -ENODEV;
2094         }
2095
2096         kranal_tunables.kra_sysctl = 
2097                 register_sysctl_table(kranal_top_ctl_table, 0);
2098         if (kranal_tunables.kra_sysctl == NULL) {
2099                 CERROR("Can't register sysctl table\n");
2100                 PtlNIFini(kranal_ni);
2101                 ptl_unregister_nal(RANAL);
2102                 return -ENOMEM;
2103         }
2104
2105         return 0;
2106 }
2107
2108 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2109 MODULE_DESCRIPTION("Kernel RapidArray NAL v0.01");
2110 MODULE_LICENSE("GPL");
2111
2112 module_init(kranal_module_init);
2113 module_exit(kranal_module_fini);