Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/klnds/ralnd/ralnd.c
35  *
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38 #include "ralnd.h"
39
40 static int        kranal_devids[RANAL_MAXDEVS] = {RAPK_MAIN_DEVICE_ID,
41                                                   RAPK_EXPANSION_DEVICE_ID};
42
43 lnd_t the_kralnd = {
44         .lnd_type       = RALND,
45         .lnd_startup    = kranal_startup,
46         .lnd_shutdown   = kranal_shutdown,
47         .lnd_ctl        = kranal_ctl,
48         .lnd_send       = kranal_send,
49         .lnd_recv       = kranal_recv,
50         .lnd_eager_recv = kranal_eager_recv,
51         .lnd_accept     = kranal_accept,
52 };
53
54 kra_data_t              kranal_data;
55
56 void
57 kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, lnet_nid_t dstnid)
58 {
59         RAP_RETURN   rrc;
60
61         memset(connreq, 0, sizeof(*connreq));
62
63         connreq->racr_magic     = RANAL_MSG_MAGIC;
64         connreq->racr_version   = RANAL_MSG_VERSION;
65
66         if (conn == NULL)                       /* prepping a "stub" reply */
67                 return;
68
69         connreq->racr_devid     = conn->rac_device->rad_id;
70         connreq->racr_srcnid    = kranal_data.kra_ni->ni_nid;
71         connreq->racr_dstnid    = dstnid;
72         connreq->racr_peerstamp = kranal_data.kra_peerstamp;
73         connreq->racr_connstamp = conn->rac_my_connstamp;
74         connreq->racr_timeout   = conn->rac_timeout;
75
76         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
77         LASSERT(rrc == RAP_SUCCESS);
78 }
79
80 int
81 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int active)
82 {
83         int         timeout = active ? *kranal_tunables.kra_timeout :
84                                         lnet_acceptor_timeout();
85         int         swab;
86         int         rc;
87
88         /* return 0 on success, -ve on error, +ve to tell the peer I'm "old" */
89
90         rc = libcfs_sock_read(sock, &connreq->racr_magic, 
91                               sizeof(connreq->racr_magic), timeout);
92         if (rc != 0) {
93                 CERROR("Read(magic) failed(1): %d\n", rc);
94                 return -EIO;
95         }
96
97         if (connreq->racr_magic != RANAL_MSG_MAGIC &&
98             connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {
99                 /* Unexpected magic! */
100                 if (!active &&
101                     (connreq->racr_magic == LNET_PROTO_MAGIC ||
102                      connreq->racr_magic == __swab32(LNET_PROTO_MAGIC))) {
103                         /* future protocol version compatibility!
104                          * When LNET unifies protocols over all LNDs, the first
105                          * thing sent will be a version query.  +ve rc means I
106                          * reply with my current magic/version */
107                         return EPROTO;
108                 }
109
110                 CERROR("Unexpected magic %08x (%s)\n",
111                        connreq->racr_magic, active ? "active" : "passive");
112                 return -EPROTO;
113         }
114
115         swab = (connreq->racr_magic == __swab32(RANAL_MSG_MAGIC));
116
117         rc = libcfs_sock_read(sock, &connreq->racr_version,
118                               sizeof(connreq->racr_version), timeout);
119         if (rc != 0) {
120                 CERROR("Read(version) failed: %d\n", rc);
121                 return -EIO;
122         }
123
124         if (swab)
125                 __swab16s(&connreq->racr_version);
126         
127         if (connreq->racr_version != RANAL_MSG_VERSION) {
128                 if (active) {
129                         CERROR("Unexpected version %d\n", connreq->racr_version);
130                         return -EPROTO;
131                 }
132                 /* If this is a future version of the ralnd protocol, and I'm
133                  * passive (accepted the connection), tell my peer I'm "old"
134                  * (+ve rc) */
135                 return EPROTO;
136         }
137
138         rc = libcfs_sock_read(sock, &connreq->racr_devid,
139                               sizeof(connreq->racr_version) -
140                               offsetof(kra_connreq_t, racr_devid),
141                               timeout);
142         if (rc != 0) {
143                 CERROR("Read(body) failed: %d\n", rc);
144                 return -EIO;
145         }
146
147         if (swab) {
148                 __swab32s(&connreq->racr_magic);
149                 __swab16s(&connreq->racr_version);
150                 __swab16s(&connreq->racr_devid);
151                 __swab64s(&connreq->racr_srcnid);
152                 __swab64s(&connreq->racr_dstnid);
153                 __swab64s(&connreq->racr_peerstamp);
154                 __swab64s(&connreq->racr_connstamp);
155                 __swab32s(&connreq->racr_timeout);
156
157                 __swab32s(&connreq->racr_riparams.HostId);
158                 __swab32s(&connreq->racr_riparams.FmaDomainHndl);
159                 __swab32s(&connreq->racr_riparams.PTag);
160                 __swab32s(&connreq->racr_riparams.CompletionCookie);
161         }
162
163         if (connreq->racr_srcnid == LNET_NID_ANY ||
164             connreq->racr_dstnid == LNET_NID_ANY) {
165                 CERROR("Received LNET_NID_ANY\n");
166                 return -EPROTO;
167         }
168
169         if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {
170                 CERROR("Received timeout %d < MIN %d\n",
171                        connreq->racr_timeout, RANAL_MIN_TIMEOUT);
172                 return -EPROTO;
173         }
174
175         return 0;
176 }
177
178 int
179 kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
180 {
181         kra_conn_t         *conn;
182         cfs_list_t         *ctmp;
183         cfs_list_t         *cnxt;
184         int                 loopback;
185         int                 count = 0;
186
187         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
188
189         cfs_list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
190                 conn = cfs_list_entry(ctmp, kra_conn_t, rac_list);
191
192                 if (conn == newconn)
193                         continue;
194
195                 if (conn->rac_peerstamp != newconn->rac_peerstamp) {
196                         CDEBUG(D_NET, "Closing stale conn nid: %s "
197                                " peerstamp:"LPX64"("LPX64")\n", 
198                                libcfs_nid2str(peer->rap_nid),
199                                conn->rac_peerstamp, newconn->rac_peerstamp);
200                         LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
201                         count++;
202                         kranal_close_conn_locked(conn, -ESTALE);
203                         continue;
204                 }
205
206                 if (conn->rac_device != newconn->rac_device)
207                         continue;
208
209                 if (loopback &&
210                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
211                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
212                         continue;
213
214                 LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
215
216                 CDEBUG(D_NET, "Closing stale conn nid: %s"
217                        " connstamp:"LPX64"("LPX64")\n", 
218                        libcfs_nid2str(peer->rap_nid),
219                        conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
220
221                 count++;
222                 kranal_close_conn_locked(conn, -ESTALE);
223         }
224
225         return count;
226 }
227
228 int
229 kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
230 {
231         kra_conn_t       *conn;
232         cfs_list_t       *tmp;
233         int               loopback;
234
235         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
236
237         cfs_list_for_each(tmp, &peer->rap_conns) {
238                 conn = cfs_list_entry(tmp, kra_conn_t, rac_list);
239
240                 /* 'newconn' is from an earlier version of 'peer'!!! */
241                 if (newconn->rac_peerstamp < conn->rac_peerstamp)
242                         return 1;
243
244                 /* 'conn' is from an earlier version of 'peer': it will be
245                  * removed when we cull stale conns later on... */
246                 if (newconn->rac_peerstamp > conn->rac_peerstamp)
247                         continue;
248
249                 /* Different devices are OK */
250                 if (conn->rac_device != newconn->rac_device)
251                         continue;
252
253                 /* It's me connecting to myself */
254                 if (loopback &&
255                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
256                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
257                         continue;
258
259                 /* 'newconn' is an earlier connection from 'peer'!!! */
260                 if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
261                         return 2;
262
263                 /* 'conn' is an earlier connection from 'peer': it will be
264                  * removed when we cull stale conns later on... */
265                 if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
266                         continue;
267
268                 /* 'newconn' has the SAME connection stamp; 'peer' isn't
269                  * playing the game... */
270                 return 3;
271         }
272
273         return 0;
274 }
275
276 void
277 kranal_set_conn_uniqueness (kra_conn_t *conn)
278 {
279         unsigned long  flags;
280
281         cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
282
283         conn->rac_my_connstamp = kranal_data.kra_connstamp++;
284
285         do {    /* allocate a unique cqid */
286                 conn->rac_cqid = kranal_data.kra_next_cqid++;
287         } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
288
289         cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
290 }
291
292 int
293 kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
294 {
295         kra_conn_t    *conn;
296         RAP_RETURN     rrc;
297
298         LASSERT (!cfs_in_interrupt());
299         LIBCFS_ALLOC(conn, sizeof(*conn));
300
301         if (conn == NULL)
302                 return -ENOMEM;
303
304         memset(conn, 0, sizeof(*conn));
305         cfs_atomic_set(&conn->rac_refcount, 1);
306         CFS_INIT_LIST_HEAD(&conn->rac_list);
307         CFS_INIT_LIST_HEAD(&conn->rac_hashlist);
308         CFS_INIT_LIST_HEAD(&conn->rac_schedlist);
309         CFS_INIT_LIST_HEAD(&conn->rac_fmaq);
310         CFS_INIT_LIST_HEAD(&conn->rac_rdmaq);
311         CFS_INIT_LIST_HEAD(&conn->rac_replyq);
312         cfs_spin_lock_init(&conn->rac_lock);
313
314         kranal_set_conn_uniqueness(conn);
315
316         conn->rac_device = dev;
317         conn->rac_timeout = MAX(*kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
318         kranal_update_reaper_timeout(conn->rac_timeout);
319
320         rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
321                            &conn->rac_rihandle);
322         if (rrc != RAP_SUCCESS) {
323                 CERROR("RapkCreateRi failed: %d\n", rrc);
324                 LIBCFS_FREE(conn, sizeof(*conn));
325                 return -ENETDOWN;
326         }
327
328         cfs_atomic_inc(&kranal_data.kra_nconns);
329         *connp = conn;
330         return 0;
331 }
332
333 void
334 kranal_destroy_conn(kra_conn_t *conn)
335 {
336         RAP_RETURN         rrc;
337
338         LASSERT (!cfs_in_interrupt());
339         LASSERT (!conn->rac_scheduled);
340         LASSERT (cfs_list_empty(&conn->rac_list));
341         LASSERT (cfs_list_empty(&conn->rac_hashlist));
342         LASSERT (cfs_list_empty(&conn->rac_schedlist));
343         LASSERT (cfs_atomic_read(&conn->rac_refcount) == 0);
344         LASSERT (cfs_list_empty(&conn->rac_fmaq));
345         LASSERT (cfs_list_empty(&conn->rac_rdmaq));
346         LASSERT (cfs_list_empty(&conn->rac_replyq));
347
348         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
349                             conn->rac_rihandle);
350         LASSERT (rrc == RAP_SUCCESS);
351
352         if (conn->rac_peer != NULL)
353                 kranal_peer_decref(conn->rac_peer);
354
355         LIBCFS_FREE(conn, sizeof(*conn));
356         cfs_atomic_dec(&kranal_data.kra_nconns);
357 }
358
359 void
360 kranal_terminate_conn_locked (kra_conn_t *conn)
361 {
362         LASSERT (!cfs_in_interrupt());
363         LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
364         LASSERT (!cfs_list_empty(&conn->rac_hashlist));
365         LASSERT (cfs_list_empty(&conn->rac_list));
366
367         /* Remove from conn hash table: no new callbacks */
368         cfs_list_del_init(&conn->rac_hashlist);
369         kranal_conn_decref(conn);
370
371         conn->rac_state = RANAL_CONN_CLOSED;
372
373         /* schedule to clear out all uncompleted comms in context of dev's
374          * scheduler */
375         kranal_schedule_conn(conn);
376 }
377
378 void
379 kranal_close_conn_locked (kra_conn_t *conn, int error)
380 {
381         kra_peer_t        *peer = conn->rac_peer;
382
383         CDEBUG_LIMIT(error == 0 ? D_NET : D_NETERROR,
384                      "closing conn to %s: error %d\n",
385                      libcfs_nid2str(peer->rap_nid), error);
386
387         LASSERT (!cfs_in_interrupt());
388         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
389         LASSERT (!cfs_list_empty(&conn->rac_hashlist));
390         LASSERT (!cfs_list_empty(&conn->rac_list));
391
392         cfs_list_del_init(&conn->rac_list);
393
394         if (cfs_list_empty(&peer->rap_conns) &&
395             peer->rap_persistence == 0) {
396                 /* Non-persistent peer with no more conns... */
397                 kranal_unlink_peer_locked(peer);
398         }
399
400         /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
401          * full timeout.  If we get a CLOSE we know the peer has stopped all
402          * RDMA.  Otherwise if we wait for the full timeout we can also be sure
403          * all RDMA has stopped. */
404         conn->rac_last_rx = jiffies;
405         cfs_mb();
406
407         conn->rac_state = RANAL_CONN_CLOSING;
408         kranal_schedule_conn(conn);             /* schedule sending CLOSE */
409
410         kranal_conn_decref(conn);               /* lose peer's ref */
411 }
412
413 void
414 kranal_close_conn (kra_conn_t *conn, int error)
415 {
416         unsigned long    flags;
417
418
419         cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
420
421         if (conn->rac_state == RANAL_CONN_ESTABLISHED)
422                 kranal_close_conn_locked(conn, error);
423
424         cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
425 }
426
427 int
428 kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
429                        __u32 peer_ip, int peer_port)
430 {
431         kra_device_t  *dev = conn->rac_device;
432         unsigned long  flags;
433         RAP_RETURN     rrc;
434
435         /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive
436          * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */
437         conn->rac_last_tx = jiffies;
438         conn->rac_keepalive = 0;
439
440         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
441         if (rrc != RAP_SUCCESS) {
442                 CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
443                        HIPQUAD(peer_ip), peer_port, rrc);
444                 return -ECONNABORTED;
445         }
446
447         /* Schedule conn on rad_new_conns */
448         kranal_conn_addref(conn);
449         cfs_spin_lock_irqsave(&dev->rad_lock, flags);
450         cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns);
451         cfs_waitq_signal(&dev->rad_waitq);
452         cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
453
454         rrc = RapkWaitToConnect(conn->rac_rihandle);
455         if (rrc != RAP_SUCCESS) {
456                 CERROR("Error waiting to connect to %u.%u.%u.%u/%d: %d\n",
457                        HIPQUAD(peer_ip), peer_port, rrc);
458                 return -ECONNABORTED;
459         }
460
461         /* Scheduler doesn't touch conn apart from to deschedule and decref it
462          * after RapkCompleteSync() return success, so conn is all mine */
463
464         conn->rac_peerstamp = connreq->racr_peerstamp;
465         conn->rac_peer_connstamp = connreq->racr_connstamp;
466         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
467         kranal_update_reaper_timeout(conn->rac_keepalive);
468         return 0;
469 }
470
471 int
472 kranal_passive_conn_handshake (struct socket *sock, lnet_nid_t *src_nidp,
473                                lnet_nid_t *dst_nidp, kra_conn_t **connp)
474 {
475         __u32                peer_ip;
476         unsigned int         peer_port;
477         kra_connreq_t        rx_connreq;
478         kra_connreq_t        tx_connreq;
479         kra_conn_t          *conn;
480         kra_device_t        *dev;
481         int                  rc;
482         int                  i;
483
484         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
485         if (rc != 0) {
486                 CERROR("Can't get peer's IP: %d\n", rc);
487                 return rc;
488         }
489
490         rc = kranal_recv_connreq(sock, &rx_connreq, 0);
491
492         if (rc < 0) {
493                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
494                        HIPQUAD(peer_ip), peer_port, rc);
495                 return rc;
496         }
497
498         if (rc > 0) {
499                 /* Request from "new" peer: send reply with my MAGIC/VERSION to
500                  * tell her I'm old... */
501                 kranal_pack_connreq(&tx_connreq, NULL, LNET_NID_ANY);
502
503                 rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
504                                        lnet_acceptor_timeout());
505                 if (rc != 0)
506                         CERROR("Can't tx stub connreq to %u.%u.%u.%u/%d: %d\n",
507                                HIPQUAD(peer_ip), peer_port, rc);
508
509                 return -EPROTO;
510         }
511
512         for (i = 0;;i++) {
513                 if (i == kranal_data.kra_ndevs) {
514                         CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
515                                rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
516                         return -ENODEV;
517                 }
518                 dev = &kranal_data.kra_devices[i];
519                 if (dev->rad_id == rx_connreq.racr_devid)
520                         break;
521         }
522
523         rc = kranal_create_conn(&conn, dev);
524         if (rc != 0)
525                 return rc;
526
527         kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid);
528
529         rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
530                                lnet_acceptor_timeout());
531         if (rc != 0) {
532                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
533                        HIPQUAD(peer_ip), peer_port, rc);
534                 kranal_conn_decref(conn);
535                 return rc;
536         }
537
538         rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port);
539         if (rc != 0) {
540                 kranal_conn_decref(conn);
541                 return rc;
542         }
543
544         *connp = conn;
545         *src_nidp = rx_connreq.racr_srcnid;
546         *dst_nidp = rx_connreq.racr_dstnid;
547         return 0;
548 }
549
550 int
551 kranal_active_conn_handshake(kra_peer_t *peer,
552                              lnet_nid_t *dst_nidp, kra_conn_t **connp)
553 {
554         kra_connreq_t       connreq;
555         kra_conn_t         *conn;
556         kra_device_t       *dev;
557         struct socket      *sock;
558         int                 rc;
559         unsigned int        idx;
560
561         /* spread connections over all devices using both peer NIDs to ensure
562          * all nids use all devices */
563         idx = peer->rap_nid + kranal_data.kra_ni->ni_nid;
564         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
565
566         rc = kranal_create_conn(&conn, dev);
567         if (rc != 0)
568                 return rc;
569
570         kranal_pack_connreq(&connreq, conn, peer->rap_nid);
571
572         if (the_lnet.ln_testprotocompat != 0) {
573                 /* single-shot proto test */
574                 LNET_LOCK();
575                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
576                         connreq.racr_version++;
577                         the_lnet.ln_testprotocompat &= ~1;
578                 }
579                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
580                         connreq.racr_magic = LNET_PROTO_MAGIC;
581                         the_lnet.ln_testprotocompat &= ~2;
582                 }
583                 LNET_UNLOCK();
584         }
585
586         rc = lnet_connect(&sock, peer->rap_nid,
587                          0, peer->rap_ip, peer->rap_port);
588         if (rc != 0)
589                 goto failed_0;
590
591         /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
592          * immediately after accepting a connection, so we connect and then
593          * send immediately. */
594
595         rc = libcfs_sock_write(sock, &connreq, sizeof(connreq),
596                                lnet_acceptor_timeout());
597         if (rc != 0) {
598                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
599                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
600                 goto failed_2;
601         }
602
603         rc = kranal_recv_connreq(sock, &connreq, 1);
604         if (rc != 0) {
605                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
606                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
607                 goto failed_2;
608         }
609
610         libcfs_sock_release(sock);
611         rc = -EPROTO;
612
613         if (connreq.racr_srcnid != peer->rap_nid) {
614                 CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
615                        "received %s expected %s\n",
616                        HIPQUAD(peer->rap_ip), peer->rap_port,
617                        libcfs_nid2str(connreq.racr_srcnid), 
618                        libcfs_nid2str(peer->rap_nid));
619                 goto failed_1;
620         }
621
622         if (connreq.racr_devid != dev->rad_id) {
623                 CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
624                        "received %d expected %d\n",
625                        HIPQUAD(peer->rap_ip), peer->rap_port,
626                        connreq.racr_devid, dev->rad_id);
627                 goto failed_1;
628         }
629
630         rc = kranal_set_conn_params(conn, &connreq,
631                                     peer->rap_ip, peer->rap_port);
632         if (rc != 0)
633                 goto failed_1;
634
635         *connp = conn;
636         *dst_nidp = connreq.racr_dstnid;
637         return 0;
638
639  failed_2:
640         libcfs_sock_release(sock);
641  failed_1:
642         lnet_connect_console_error(rc, peer->rap_nid,
643                                   peer->rap_ip, peer->rap_port);
644  failed_0:
645         kranal_conn_decref(conn);
646         return rc;
647 }
648
649 int
650 kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
651 {
652         kra_peer_t        *peer2;
653         kra_tx_t          *tx;
654         lnet_nid_t         peer_nid;
655         lnet_nid_t         dst_nid;
656         unsigned long      flags;
657         kra_conn_t        *conn;
658         int                rc;
659         int                nstale;
660         int                new_peer = 0;
661
662         if (sock == NULL) {
663                 /* active: connd wants to connect to 'peer' */
664                 LASSERT (peer != NULL);
665                 LASSERT (peer->rap_connecting);
666
667                 rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
668                 if (rc != 0)
669                         return rc;
670
671                 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
672
673                 if (!kranal_peer_active(peer)) {
674                         /* raced with peer getting unlinked */
675                         cfs_write_unlock_irqrestore(&kranal_data. \
676                                                     kra_global_lock,
677                                                     flags);
678                         kranal_conn_decref(conn);
679                         return -ESTALE;
680                 }
681
682                 peer_nid = peer->rap_nid;
683         } else {
684                 /* passive: listener accepted 'sock' */
685                 LASSERT (peer == NULL);
686
687                 rc = kranal_passive_conn_handshake(sock, &peer_nid,
688                                                    &dst_nid, &conn);
689                 if (rc != 0)
690                         return rc;
691
692                 /* assume this is a new peer */
693                 rc = kranal_create_peer(&peer, peer_nid);
694                 if (rc != 0) {
695                         CERROR("Can't create conn for %s\n", 
696                                libcfs_nid2str(peer_nid));
697                         kranal_conn_decref(conn);
698                         return -ENOMEM;
699                 }
700
701                 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
702
703                 peer2 = kranal_find_peer_locked(peer_nid);
704                 if (peer2 == NULL) {
705                         new_peer = 1;
706                 } else {
707                         /* peer_nid already in the peer table */
708                         kranal_peer_decref(peer);
709                         peer = peer2;
710                 }
711         }
712
713         LASSERT ((!new_peer) != (!kranal_peer_active(peer)));
714
715         /* Refuse connection if peer thinks we are a different NID.  We check
716          * this while holding the global lock, to synch with connection
717          * destruction on NID change. */
718         if (kranal_data.kra_ni->ni_nid != dst_nid) {
719                 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
720                                             flags);
721
722                 CERROR("Stale/bad connection with %s: dst_nid %s, expected %s\n",
723                        libcfs_nid2str(peer_nid), libcfs_nid2str(dst_nid), 
724                        libcfs_nid2str(kranal_data.kra_ni->ni_nid));
725                 rc = -ESTALE;
726                 goto failed;
727         }
728
729         /* Refuse to duplicate an existing connection (both sides might try to
730          * connect at once).  NB we return success!  We _are_ connected so we
731          * _don't_ have any blocked txs to complete with failure. */
732         rc = kranal_conn_isdup_locked(peer, conn);
733         if (rc != 0) {
734                 LASSERT (!cfs_list_empty(&peer->rap_conns));
735                 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
736                 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
737                                             flags);
738                 CWARN("Not creating duplicate connection to %s: %d\n",
739                       libcfs_nid2str(peer_nid), rc);
740                 rc = 0;
741                 goto failed;
742         }
743
744         if (new_peer) {
745                 /* peer table takes my ref on the new peer */
746                 cfs_list_add_tail(&peer->rap_list,
747                                   kranal_nid2peerlist(peer_nid));
748         }
749
750         /* initialise timestamps before reaper looks at them */
751         conn->rac_last_tx = conn->rac_last_rx = jiffies;
752
753         kranal_peer_addref(peer);               /* +1 ref for conn */
754         conn->rac_peer = peer;
755         cfs_list_add_tail(&conn->rac_list, &peer->rap_conns);
756
757         kranal_conn_addref(conn);               /* +1 ref for conn table */
758         cfs_list_add_tail(&conn->rac_hashlist,
759                           kranal_cqid2connlist(conn->rac_cqid));
760
761         /* Schedule all packets blocking for a connection */
762         while (!cfs_list_empty(&peer->rap_tx_queue)) {
763                 tx = cfs_list_entry(peer->rap_tx_queue.next,
764                                     kra_tx_t, tx_list);
765
766                 cfs_list_del(&tx->tx_list);
767                 kranal_post_fma(conn, tx);
768         }
769
770         nstale = kranal_close_stale_conns_locked(peer, conn);
771
772         cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
773
774         /* CAVEAT EMPTOR: passive peer can disappear NOW */
775
776         if (nstale != 0)
777                 CWARN("Closed %d stale conns to %s\n", nstale, 
778                       libcfs_nid2str(peer_nid));
779
780         CWARN("New connection to %s on devid[%d] = %d\n",
781                libcfs_nid2str(peer_nid), 
782                conn->rac_device->rad_idx, conn->rac_device->rad_id);
783
784         /* Ensure conn gets checked.  Transmits may have been queued and an
785          * FMA event may have happened before it got in the cq hash table */
786         kranal_schedule_conn(conn);
787         return 0;
788
789  failed:
790         if (new_peer)
791                 kranal_peer_decref(peer);
792         kranal_conn_decref(conn);
793         return rc;
794 }
795
796 void
797 kranal_connect (kra_peer_t *peer)
798 {
799         kra_tx_t          *tx;
800         unsigned long      flags;
801         cfs_list_t         zombies;
802         int                rc;
803
804         LASSERT (peer->rap_connecting);
805
806         CDEBUG(D_NET, "About to handshake %s\n", 
807                libcfs_nid2str(peer->rap_nid));
808
809         rc = kranal_conn_handshake(NULL, peer);
810
811         CDEBUG(D_NET, "Done handshake %s:%d \n", 
812                libcfs_nid2str(peer->rap_nid), rc);
813
814         cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
815
816         LASSERT (peer->rap_connecting);
817         peer->rap_connecting = 0;
818
819         if (rc == 0) {
820                 /* kranal_conn_handshake() queues blocked txs immediately on
821                  * success to avoid messages jumping the queue */
822                 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
823
824                 peer->rap_reconnect_interval = 0; /* OK to reconnect at any time */
825
826                 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
827                                             flags);
828                 return;
829         }
830
831         peer->rap_reconnect_interval *= 2;
832         peer->rap_reconnect_interval =
833                 MAX(peer->rap_reconnect_interval,
834                     *kranal_tunables.kra_min_reconnect_interval);
835         peer->rap_reconnect_interval =
836                 MIN(peer->rap_reconnect_interval,
837                     *kranal_tunables.kra_max_reconnect_interval);
838
839         peer->rap_reconnect_time = jiffies + peer->rap_reconnect_interval *
840                 CFS_HZ;
841
842         /* Grab all blocked packets while we have the global lock */
843         cfs_list_add(&zombies, &peer->rap_tx_queue);
844         cfs_list_del_init(&peer->rap_tx_queue);
845
846         cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
847
848         if (cfs_list_empty(&zombies))
849                 return;
850
851         CNETERR("Dropping packets for %s: connection failed\n",
852                 libcfs_nid2str(peer->rap_nid));
853
854         do {
855                 tx = cfs_list_entry(zombies.next, kra_tx_t, tx_list);
856
857                 cfs_list_del(&tx->tx_list);
858                 kranal_tx_done(tx, -EHOSTUNREACH);
859
860         } while (!cfs_list_empty(&zombies));
861 }
862
863 void
864 kranal_free_acceptsock (kra_acceptsock_t *ras)
865 {
866         libcfs_sock_release(ras->ras_sock);
867         LIBCFS_FREE(ras, sizeof(*ras));
868 }
869
870 int
871 kranal_accept (lnet_ni_t *ni, struct socket *sock)
872 {
873         kra_acceptsock_t  *ras;
874         int                rc;
875         __u32              peer_ip;
876         int                peer_port;
877         unsigned long      flags;
878
879         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
880         LASSERT (rc == 0);                      /* we succeeded before */
881
882         LIBCFS_ALLOC(ras, sizeof(*ras));
883         if (ras == NULL) {
884                 CERROR("ENOMEM allocating connection request from "
885                        "%u.%u.%u.%u\n", HIPQUAD(peer_ip));
886                 return -ENOMEM;
887         }
888
889         ras->ras_sock = sock;
890
891         cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
892
893         cfs_list_add_tail(&ras->ras_list, &kranal_data.kra_connd_acceptq);
894         cfs_waitq_signal(&kranal_data.kra_connd_waitq);
895
896         cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
897         return 0;
898 }
899
900 int
901 kranal_create_peer (kra_peer_t **peerp, lnet_nid_t nid)
902 {
903         kra_peer_t    *peer;
904         unsigned long  flags;
905
906         LASSERT (nid != LNET_NID_ANY);
907
908         LIBCFS_ALLOC(peer, sizeof(*peer));
909         if (peer == NULL)
910                 return -ENOMEM;
911
912         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
913
914         peer->rap_nid = nid;
915         cfs_atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
916
917         CFS_INIT_LIST_HEAD(&peer->rap_list);
918         CFS_INIT_LIST_HEAD(&peer->rap_connd_list);
919         CFS_INIT_LIST_HEAD(&peer->rap_conns);
920         CFS_INIT_LIST_HEAD(&peer->rap_tx_queue);
921
922         peer->rap_reconnect_interval = 0;       /* OK to connect at any time */
923
924         cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
925
926         if (kranal_data.kra_nonewpeers) {
927                 /* shutdown has started already */
928                 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
929                                             flags);
930
931                 LIBCFS_FREE(peer, sizeof(*peer));
932                 CERROR("Can't create peer: network shutdown\n");
933                 return -ESHUTDOWN;
934         }
935
936         cfs_atomic_inc(&kranal_data.kra_npeers);
937
938         cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
939
940         *peerp = peer;
941         return 0;
942 }
943
944 void
945 kranal_destroy_peer (kra_peer_t *peer)
946 {
947         CDEBUG(D_NET, "peer %s %p deleted\n", 
948                libcfs_nid2str(peer->rap_nid), peer);
949
950         LASSERT (cfs_atomic_read(&peer->rap_refcount) == 0);
951         LASSERT (peer->rap_persistence == 0);
952         LASSERT (!kranal_peer_active(peer));
953         LASSERT (!peer->rap_connecting);
954         LASSERT (cfs_list_empty(&peer->rap_conns));
955         LASSERT (cfs_list_empty(&peer->rap_tx_queue));
956         LASSERT (cfs_list_empty(&peer->rap_connd_list));
957
958         LIBCFS_FREE(peer, sizeof(*peer));
959
960         /* NB a peer's connections keep a reference on their peer until
961          * they are destroyed, so we can be assured that _all_ state to do
962          * with this peer has been cleaned up when its refcount drops to
963          * zero. */
964         cfs_atomic_dec(&kranal_data.kra_npeers);
965 }
966
967 kra_peer_t *
968 kranal_find_peer_locked (lnet_nid_t nid)
969 {
970         cfs_list_t       *peer_list = kranal_nid2peerlist(nid);
971         cfs_list_t       *tmp;
972         kra_peer_t       *peer;
973
974         cfs_list_for_each (tmp, peer_list) {
975
976                 peer = cfs_list_entry(tmp, kra_peer_t, rap_list);
977
978                 LASSERT (peer->rap_persistence > 0 ||     /* persistent peer */
979                          !cfs_list_empty(&peer->rap_conns));  /* active conn */
980
981                 if (peer->rap_nid != nid)
982                         continue;
983
984                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
985                        peer, libcfs_nid2str(nid), 
986                        cfs_atomic_read(&peer->rap_refcount));
987                 return peer;
988         }
989         return NULL;
990 }
991
992 kra_peer_t *
993 kranal_find_peer (lnet_nid_t nid)
994 {
995         kra_peer_t     *peer;
996
997         cfs_read_lock(&kranal_data.kra_global_lock);
998         peer = kranal_find_peer_locked(nid);
999         if (peer != NULL)                       /* +1 ref for caller? */
1000                 kranal_peer_addref(peer);
1001         cfs_read_unlock(&kranal_data.kra_global_lock);
1002
1003         return peer;
1004 }
1005
1006 void
1007 kranal_unlink_peer_locked (kra_peer_t *peer)
1008 {
1009         LASSERT (peer->rap_persistence == 0);
1010         LASSERT (cfs_list_empty(&peer->rap_conns));
1011
1012         LASSERT (kranal_peer_active(peer));
1013         cfs_list_del_init(&peer->rap_list);
1014
1015         /* lose peerlist's ref */
1016         kranal_peer_decref(peer);
1017 }
1018
1019 int
1020 kranal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,
1021                       int *persistencep)
1022 {
1023         kra_peer_t        *peer;
1024         cfs_list_t        *ptmp;
1025         int                i;
1026
1027         cfs_read_lock(&kranal_data.kra_global_lock);
1028
1029         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1030
1031                 cfs_list_for_each(ptmp, &kranal_data.kra_peers[i]) {
1032
1033                         peer = cfs_list_entry(ptmp, kra_peer_t, rap_list);
1034                         LASSERT (peer->rap_persistence > 0 ||
1035                                  !cfs_list_empty(&peer->rap_conns));
1036
1037                         if (index-- > 0)
1038                                 continue;
1039
1040                         *nidp = peer->rap_nid;
1041                         *ipp = peer->rap_ip;
1042                         *portp = peer->rap_port;
1043                         *persistencep = peer->rap_persistence;
1044
1045                         cfs_read_unlock(&kranal_data.kra_global_lock);
1046                         return 0;
1047                 }
1048         }
1049
1050         cfs_read_unlock(&kranal_data.kra_global_lock);
1051         return -ENOENT;
1052 }
1053
1054 int
1055 kranal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port)
1056 {
1057         unsigned long      flags;
1058         kra_peer_t        *peer;
1059         kra_peer_t        *peer2;
1060         int                rc;
1061
1062         if (nid == LNET_NID_ANY)
1063                 return -EINVAL;
1064
1065         rc = kranal_create_peer(&peer, nid);
1066         if (rc != 0)
1067                 return rc;
1068
1069         cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1070
1071         peer2 = kranal_find_peer_locked(nid);
1072         if (peer2 != NULL) {
1073                 kranal_peer_decref(peer);
1074                 peer = peer2;
1075         } else {
1076                 /* peer table takes existing ref on peer */
1077                 cfs_list_add_tail(&peer->rap_list,
1078                               kranal_nid2peerlist(nid));
1079         }
1080
1081         peer->rap_ip = ip;
1082         peer->rap_port = port;
1083         peer->rap_persistence++;
1084
1085         cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1086         return 0;
1087 }
1088
1089 void
1090 kranal_del_peer_locked (kra_peer_t *peer)
1091 {
1092         cfs_list_t       *ctmp;
1093         cfs_list_t       *cnxt;
1094         kra_conn_t       *conn;
1095
1096         peer->rap_persistence = 0;
1097
1098         if (cfs_list_empty(&peer->rap_conns)) {
1099                 kranal_unlink_peer_locked(peer);
1100         } else {
1101                 cfs_list_for_each_safe(ctmp, cnxt, &peer->rap_conns) {
1102                         conn = cfs_list_entry(ctmp, kra_conn_t, rac_list);
1103
1104                         kranal_close_conn_locked(conn, 0);
1105                 }
1106                 /* peer unlinks itself when last conn is closed */
1107         }
1108 }
1109
1110 int
1111 kranal_del_peer (lnet_nid_t nid)
1112 {
1113         unsigned long      flags;
1114         cfs_list_t        *ptmp;
1115         cfs_list_t        *pnxt;
1116         kra_peer_t        *peer;
1117         int                lo;
1118         int                hi;
1119         int                i;
1120         int                rc = -ENOENT;
1121
1122         cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1123
1124         if (nid != LNET_NID_ANY)
1125                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1126         else {
1127                 lo = 0;
1128                 hi = kranal_data.kra_peer_hash_size - 1;
1129         }
1130
1131         for (i = lo; i <= hi; i++) {
1132                 cfs_list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1133                         peer = cfs_list_entry(ptmp, kra_peer_t, rap_list);
1134                         LASSERT (peer->rap_persistence > 0 ||
1135                                  !cfs_list_empty(&peer->rap_conns));
1136
1137                         if (!(nid == LNET_NID_ANY || peer->rap_nid == nid))
1138                                 continue;
1139
1140                         kranal_del_peer_locked(peer);
1141                         rc = 0;         /* matched something */
1142                 }
1143         }
1144
1145         cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1146
1147         return rc;
1148 }
1149
1150 kra_conn_t *
1151 kranal_get_conn_by_idx (int index)
1152 {
1153         kra_peer_t        *peer;
1154         cfs_list_t        *ptmp;
1155         kra_conn_t        *conn;
1156         cfs_list_t        *ctmp;
1157         int                i;
1158
1159         cfs_read_lock (&kranal_data.kra_global_lock);
1160
1161         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1162                 cfs_list_for_each (ptmp, &kranal_data.kra_peers[i]) {
1163
1164                         peer = cfs_list_entry(ptmp, kra_peer_t, rap_list);
1165                         LASSERT (peer->rap_persistence > 0 ||
1166                                  !cfs_list_empty(&peer->rap_conns));
1167
1168                         cfs_list_for_each (ctmp, &peer->rap_conns) {
1169                                 if (index-- > 0)
1170                                         continue;
1171
1172                                 conn = cfs_list_entry(ctmp, kra_conn_t,
1173                                                       rac_list);
1174                                 CDEBUG(D_NET, "++conn[%p] -> %s (%d)\n", conn,
1175                                        libcfs_nid2str(conn->rac_peer->rap_nid),
1176                                        cfs_atomic_read(&conn->rac_refcount));
1177                                 cfs_atomic_inc(&conn->rac_refcount);
1178                                 cfs_read_unlock(&kranal_data.kra_global_lock);
1179                                 return conn;
1180                         }
1181                 }
1182         }
1183
1184         cfs_read_unlock(&kranal_data.kra_global_lock);
1185         return NULL;
1186 }
1187
1188 int
1189 kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
1190 {
1191         kra_conn_t         *conn;
1192         cfs_list_t         *ctmp;
1193         cfs_list_t         *cnxt;
1194         int                 count = 0;
1195
1196         cfs_list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
1197                 conn = cfs_list_entry(ctmp, kra_conn_t, rac_list);
1198
1199                 count++;
1200                 kranal_close_conn_locked(conn, why);
1201         }
1202
1203         return count;
1204 }
1205
1206 int
1207 kranal_close_matching_conns (lnet_nid_t nid)
1208 {
1209         unsigned long       flags;
1210         kra_peer_t         *peer;
1211         cfs_list_t         *ptmp;
1212         cfs_list_t         *pnxt;
1213         int                 lo;
1214         int                 hi;
1215         int                 i;
1216         int                 count = 0;
1217
1218         cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1219
1220         if (nid != LNET_NID_ANY)
1221                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1222         else {
1223                 lo = 0;
1224                 hi = kranal_data.kra_peer_hash_size - 1;
1225         }
1226
1227         for (i = lo; i <= hi; i++) {
1228                 cfs_list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1229
1230                         peer = cfs_list_entry(ptmp, kra_peer_t, rap_list);
1231                         LASSERT (peer->rap_persistence > 0 ||
1232                                  !cfs_list_empty(&peer->rap_conns));
1233
1234                         if (!(nid == LNET_NID_ANY || nid == peer->rap_nid))
1235                                 continue;
1236
1237                         count += kranal_close_peer_conns_locked(peer, 0);
1238                 }
1239         }
1240
1241         cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1242
1243         /* wildcards always succeed */
1244         if (nid == LNET_NID_ANY)
1245                 return 0;
1246
1247         return (count == 0) ? -ENOENT : 0;
1248 }
1249
1250 int
1251 kranal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1252 {
1253         struct libcfs_ioctl_data *data = arg;
1254         int                       rc = -EINVAL;
1255
1256         LASSERT (ni == kranal_data.kra_ni);
1257
1258         switch(cmd) {
1259         case IOC_LIBCFS_GET_PEER: {
1260                 lnet_nid_t   nid = 0;
1261                 __u32       ip = 0;
1262                 int         port = 0;
1263                 int         share_count = 0;
1264
1265                 rc = kranal_get_peer_info(data->ioc_count,
1266                                           &nid, &ip, &port, &share_count);
1267                 data->ioc_nid    = nid;
1268                 data->ioc_count  = share_count;
1269                 data->ioc_u32[0] = ip;
1270                 data->ioc_u32[1] = port;
1271                 break;
1272         }
1273         case IOC_LIBCFS_ADD_PEER: {
1274                 rc = kranal_add_persistent_peer(data->ioc_nid,
1275                                                 data->ioc_u32[0], /* IP */
1276                                                 data->ioc_u32[1]); /* port */
1277                 break;
1278         }
1279         case IOC_LIBCFS_DEL_PEER: {
1280                 rc = kranal_del_peer(data->ioc_nid);
1281                 break;
1282         }
1283         case IOC_LIBCFS_GET_CONN: {
1284                 kra_conn_t *conn = kranal_get_conn_by_idx(data->ioc_count);
1285
1286                 if (conn == NULL)
1287                         rc = -ENOENT;
1288                 else {
1289                         rc = 0;
1290                         data->ioc_nid    = conn->rac_peer->rap_nid;
1291                         data->ioc_u32[0] = conn->rac_device->rad_id;
1292                         kranal_conn_decref(conn);
1293                 }
1294                 break;
1295         }
1296         case IOC_LIBCFS_CLOSE_CONNECTION: {
1297                 rc = kranal_close_matching_conns(data->ioc_nid);
1298                 break;
1299         }
1300         case IOC_LIBCFS_REGISTER_MYNID: {
1301                 /* Ignore if this is a noop */
1302                 if (data->ioc_nid == ni->ni_nid) {
1303                         rc = 0;
1304                 } else {
1305                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1306                                libcfs_nid2str(data->ioc_nid),
1307                                libcfs_nid2str(ni->ni_nid));
1308                         rc = -EINVAL;
1309                 }
1310                 break;
1311         }
1312         }
1313
1314         return rc;
1315 }
1316
1317 void
1318 kranal_free_txdescs(cfs_list_t *freelist)
1319 {
1320         kra_tx_t    *tx;
1321
1322         while (!cfs_list_empty(freelist)) {
1323                 tx = cfs_list_entry(freelist->next, kra_tx_t, tx_list);
1324
1325                 cfs_list_del(&tx->tx_list);
1326                 LIBCFS_FREE(tx->tx_phys, LNET_MAX_IOV * sizeof(*tx->tx_phys));
1327                 LIBCFS_FREE(tx, sizeof(*tx));
1328         }
1329 }
1330
1331 int
1332 kranal_alloc_txdescs(cfs_list_t *freelist, int n)
1333 {
1334         int            i;
1335         kra_tx_t      *tx;
1336
1337         LASSERT (freelist == &kranal_data.kra_idle_txs);
1338         LASSERT (cfs_list_empty(freelist));
1339
1340         for (i = 0; i < n; i++) {
1341
1342                 LIBCFS_ALLOC(tx, sizeof(*tx));
1343                 if (tx == NULL) {
1344                         CERROR("Can't allocate tx[%d]\n", i);
1345                         kranal_free_txdescs(freelist);
1346                         return -ENOMEM;
1347                 }
1348
1349                 LIBCFS_ALLOC(tx->tx_phys,
1350                              LNET_MAX_IOV * sizeof(*tx->tx_phys));
1351                 if (tx->tx_phys == NULL) {
1352                         CERROR("Can't allocate tx[%d]->tx_phys\n", i);
1353
1354                         LIBCFS_FREE(tx, sizeof(*tx));
1355                         kranal_free_txdescs(freelist);
1356                         return -ENOMEM;
1357                 }
1358
1359                 tx->tx_buftype = RANAL_BUF_NONE;
1360                 tx->tx_msg.ram_type = RANAL_MSG_NONE;
1361
1362                 cfs_list_add(&tx->tx_list, freelist);
1363         }
1364
1365         return 0;
1366 }
1367
1368 int
1369 kranal_device_init(int id, kra_device_t *dev)
1370 {
1371         int               total_ntx = *kranal_tunables.kra_ntx;
1372         RAP_RETURN        rrc;
1373
1374         dev->rad_id = id;
1375         rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
1376                                    &dev->rad_handle);
1377         if (rrc != RAP_SUCCESS) {
1378                 CERROR("Can't get Rapidarray Device %d: %d\n", id, rrc);
1379                 goto failed_0;
1380         }
1381
1382         rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
1383         if (rrc != RAP_SUCCESS) {
1384                 CERROR("Can't reserve %d RDMA descriptors"
1385                        " for device %d: %d\n", total_ntx, id, rrc);
1386                 goto failed_1;
1387         }
1388
1389         rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
1390                            &dev->rad_rdma_cqh);
1391         if (rrc != RAP_SUCCESS) {
1392                 CERROR("Can't create rdma cq size %d for device %d: %d\n",
1393                        total_ntx, id, rrc);
1394                 goto failed_1;
1395         }
1396
1397         rrc = RapkCreateCQ(dev->rad_handle, 
1398                            *kranal_tunables.kra_fma_cq_size, 
1399                            RAP_CQTYPE_RECV, &dev->rad_fma_cqh);
1400         if (rrc != RAP_SUCCESS) {
1401                 CERROR("Can't create fma cq size %d for device %d: %d\n", 
1402                        *kranal_tunables.kra_fma_cq_size, id, rrc);
1403                 goto failed_2;
1404         }
1405
1406         return 0;
1407
1408  failed_2:
1409         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1410  failed_1:
1411         RapkReleaseDevice(dev->rad_handle);
1412  failed_0:
1413         return -ENODEV;
1414 }
1415
1416 void
1417 kranal_device_fini(kra_device_t *dev)
1418 {
1419         LASSERT (cfs_list_empty(&dev->rad_ready_conns));
1420         LASSERT (cfs_list_empty(&dev->rad_new_conns));
1421         LASSERT (dev->rad_nphysmap == 0);
1422         LASSERT (dev->rad_nppphysmap == 0);
1423         LASSERT (dev->rad_nvirtmap == 0);
1424         LASSERT (dev->rad_nobvirtmap == 0);
1425
1426         LASSERT(dev->rad_scheduler == NULL);
1427         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
1428         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1429         RapkReleaseDevice(dev->rad_handle);
1430 }
1431
1432 void
1433 kranal_shutdown (lnet_ni_t *ni)
1434 {
1435         int           i;
1436         unsigned long flags;
1437
1438         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1439                cfs_atomic_read(&libcfs_kmemory));
1440
1441         LASSERT (ni == kranal_data.kra_ni);
1442         LASSERT (ni->ni_data == &kranal_data);
1443
1444         switch (kranal_data.kra_init) {
1445         default:
1446                 CERROR("Unexpected state %d\n", kranal_data.kra_init);
1447                 LBUG();
1448
1449         case RANAL_INIT_ALL:
1450                 /* Prevent new peers from being created */
1451                 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1452                 kranal_data.kra_nonewpeers = 1;
1453                 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1454                                             flags);
1455
1456                 /* Remove all existing peers from the peer table */
1457                 kranal_del_peer(LNET_NID_ANY);
1458
1459                 /* Wait for pending conn reqs to be handled */
1460                 i = 2;
1461                 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1462                 while (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
1463                         cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1464                                                    flags);
1465                         i++;
1466                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1467                                "waiting for conn reqs to clean up\n");
1468                         cfs_pause(cfs_time_seconds(1));
1469
1470                         cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock,
1471                                               flags);
1472                 }
1473                 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1474
1475                 /* Wait for all peers to be freed */
1476                 i = 2;
1477                 while (cfs_atomic_read(&kranal_data.kra_npeers) != 0) {
1478                         i++;
1479                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1480                                "waiting for %d peers to close down\n",
1481                                cfs_atomic_read(&kranal_data.kra_npeers));
1482                         cfs_pause(cfs_time_seconds(1));
1483                 }
1484                 /* fall through */
1485
1486         case RANAL_INIT_DATA:
1487                 break;
1488         }
1489
1490         /* Peer state all cleaned up BEFORE setting shutdown, so threads don't
1491          * have to worry about shutdown races.  NB connections may be created
1492          * while there are still active connds, but these will be temporary
1493          * since peer creation always fails after the listener has started to
1494          * shut down. */
1495         LASSERT (cfs_atomic_read(&kranal_data.kra_npeers) == 0);
1496         
1497         /* Flag threads to terminate */
1498         kranal_data.kra_shutdown = 1;
1499
1500         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1501                 kra_device_t *dev = &kranal_data.kra_devices[i];
1502
1503                 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1504                 cfs_waitq_signal(&dev->rad_waitq);
1505                 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1506         }
1507
1508         cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1509         cfs_waitq_broadcast(&kranal_data.kra_reaper_waitq);
1510         cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1511
1512         LASSERT (cfs_list_empty(&kranal_data.kra_connd_peers));
1513         cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1514         cfs_waitq_broadcast(&kranal_data.kra_connd_waitq);
1515         cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1516
1517         /* Wait for threads to exit */
1518         i = 2;
1519         while (cfs_atomic_read(&kranal_data.kra_nthreads) != 0) {
1520                 i++;
1521                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1522                        "Waiting for %d threads to terminate\n",
1523                        cfs_atomic_read(&kranal_data.kra_nthreads));
1524                 cfs_pause(cfs_time_seconds(1));
1525         }
1526
1527         LASSERT (cfs_atomic_read(&kranal_data.kra_npeers) == 0);
1528         if (kranal_data.kra_peers != NULL) {
1529                 for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1530                         LASSERT (cfs_list_empty(&kranal_data.kra_peers[i]));
1531
1532                 LIBCFS_FREE(kranal_data.kra_peers,
1533                             sizeof (cfs_list_t) *
1534                             kranal_data.kra_peer_hash_size);
1535         }
1536
1537         LASSERT (cfs_atomic_read(&kranal_data.kra_nconns) == 0);
1538         if (kranal_data.kra_conns != NULL) {
1539                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1540                         LASSERT (cfs_list_empty(&kranal_data.kra_conns[i]));
1541
1542                 LIBCFS_FREE(kranal_data.kra_conns,
1543                             sizeof (cfs_list_t) *
1544                             kranal_data.kra_conn_hash_size);
1545         }
1546
1547         for (i = 0; i < kranal_data.kra_ndevs; i++)
1548                 kranal_device_fini(&kranal_data.kra_devices[i]);
1549
1550         kranal_free_txdescs(&kranal_data.kra_idle_txs);
1551
1552         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1553                cfs_atomic_read(&libcfs_kmemory));
1554
1555         kranal_data.kra_init = RANAL_INIT_NOTHING;
1556         PORTAL_MODULE_UNUSE;
1557 }
1558
1559 int
1560 kranal_startup (lnet_ni_t *ni)
1561 {
1562         struct timeval    tv;
1563         int               pkmem = cfs_atomic_read(&libcfs_kmemory);
1564         int               rc;
1565         int               i;
1566         kra_device_t     *dev;
1567
1568         LASSERT (ni->ni_lnd == &the_kralnd);
1569
1570         /* Only 1 instance supported */
1571         if (kranal_data.kra_init != RANAL_INIT_NOTHING) {
1572                 CERROR ("Only 1 instance supported\n");
1573                 return -EPERM;
1574         }
1575
1576         if (lnet_set_ip_niaddr(ni) != 0) {
1577                 CERROR ("Can't determine my NID\n");
1578                 return -EPERM;
1579         }
1580
1581         if (*kranal_tunables.kra_credits > *kranal_tunables.kra_ntx) {
1582                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1583                         *kranal_tunables.kra_credits,
1584                         *kranal_tunables.kra_ntx);
1585                 return -EINVAL;
1586         }
1587         
1588         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
1589
1590         ni->ni_maxtxcredits = *kranal_tunables.kra_credits;
1591         ni->ni_peertxcredits = *kranal_tunables.kra_peercredits;
1592
1593         ni->ni_data = &kranal_data;
1594         kranal_data.kra_ni = ni;
1595
1596         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
1597          * a unique (for all time) connstamp so we can uniquely identify
1598          * the sender.  The connstamp is an incrementing counter
1599          * initialised with seconds + microseconds at startup time.  So we
1600          * rely on NOT creating connections more frequently on average than
1601          * 1MHz to ensure we don't use old connstamps when we reboot. */
1602         cfs_gettimeofday(&tv);
1603         kranal_data.kra_connstamp =
1604         kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1605
1606         cfs_rwlock_init(&kranal_data.kra_global_lock);
1607
1608         for (i = 0; i < RANAL_MAXDEVS; i++ ) {
1609                 kra_device_t  *dev = &kranal_data.kra_devices[i];
1610
1611                 dev->rad_idx = i;
1612                 CFS_INIT_LIST_HEAD(&dev->rad_ready_conns);
1613                 CFS_INIT_LIST_HEAD(&dev->rad_new_conns);
1614                 cfs_waitq_init(&dev->rad_waitq);
1615                 cfs_spin_lock_init(&dev->rad_lock);
1616         }
1617
1618         kranal_data.kra_new_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1619         cfs_waitq_init(&kranal_data.kra_reaper_waitq);
1620         cfs_spin_lock_init(&kranal_data.kra_reaper_lock);
1621
1622         CFS_INIT_LIST_HEAD(&kranal_data.kra_connd_acceptq);
1623         CFS_INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
1624         cfs_waitq_init(&kranal_data.kra_connd_waitq);
1625         cfs_spin_lock_init(&kranal_data.kra_connd_lock);
1626
1627         CFS_INIT_LIST_HEAD(&kranal_data.kra_idle_txs);
1628         cfs_spin_lock_init(&kranal_data.kra_tx_lock);
1629
1630         /* OK to call kranal_api_shutdown() to cleanup now */
1631         kranal_data.kra_init = RANAL_INIT_DATA;
1632         PORTAL_MODULE_USE;
1633
1634         kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
1635         LIBCFS_ALLOC(kranal_data.kra_peers,
1636                      sizeof(cfs_list_t) *
1637                             kranal_data.kra_peer_hash_size);
1638         if (kranal_data.kra_peers == NULL)
1639                 goto failed;
1640
1641         for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1642                 CFS_INIT_LIST_HEAD(&kranal_data.kra_peers[i]);
1643
1644         kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE;
1645         LIBCFS_ALLOC(kranal_data.kra_conns,
1646                      sizeof(cfs_list_t) *
1647                             kranal_data.kra_conn_hash_size);
1648         if (kranal_data.kra_conns == NULL)
1649                 goto failed;
1650
1651         for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1652                 CFS_INIT_LIST_HEAD(&kranal_data.kra_conns[i]);
1653
1654         rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, 
1655                                   *kranal_tunables.kra_ntx);
1656         if (rc != 0)
1657                 goto failed;
1658
1659         rc = kranal_thread_start(kranal_reaper, NULL);
1660         if (rc != 0) {
1661                 CERROR("Can't spawn ranal reaper: %d\n", rc);
1662                 goto failed;
1663         }
1664
1665         for (i = 0; i < *kranal_tunables.kra_n_connd; i++) {
1666                 rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i);
1667                 if (rc != 0) {
1668                         CERROR("Can't spawn ranal connd[%d]: %d\n",
1669                                i, rc);
1670                         goto failed;
1671                 }
1672         }
1673
1674         LASSERT (kranal_data.kra_ndevs == 0);
1675
1676         /* Use all available RapidArray devices */
1677         for (i = 0; i < RANAL_MAXDEVS; i++) {
1678                 dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
1679
1680                 rc = kranal_device_init(kranal_devids[i], dev);
1681                 if (rc == 0)
1682                         kranal_data.kra_ndevs++;
1683         }
1684
1685         if (kranal_data.kra_ndevs == 0) {
1686                 CERROR("Can't initialise any RapidArray devices\n");
1687                 goto failed;
1688         }
1689         
1690         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1691                 dev = &kranal_data.kra_devices[i];
1692                 rc = kranal_thread_start(kranal_scheduler, dev);
1693                 if (rc != 0) {
1694                         CERROR("Can't spawn ranal scheduler[%d]: %d\n",
1695                                i, rc);
1696                         goto failed;
1697                 }
1698         }
1699
1700         /* flag everything initialised */
1701         kranal_data.kra_init = RANAL_INIT_ALL;
1702         /*****************************************************/
1703
1704         CDEBUG(D_MALLOC, "initial kmem %d\n", pkmem);
1705         return 0;
1706
1707  failed:
1708         kranal_shutdown(ni);
1709         return -ENETDOWN;
1710 }
1711
1712 void __exit
1713 kranal_module_fini (void)
1714 {
1715         lnet_unregister_lnd(&the_kralnd);
1716         kranal_tunables_fini();
1717 }
1718
1719 int __init
1720 kranal_module_init (void)
1721 {
1722         int    rc;
1723
1724         rc = kranal_tunables_init();
1725         if (rc != 0)
1726                 return rc;
1727
1728         lnet_register_lnd(&the_kralnd);
1729
1730         return 0;
1731 }
1732
1733 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1734 MODULE_DESCRIPTION("Kernel RapidArray LND v0.01");
1735 MODULE_LICENSE("GPL");
1736
1737 module_init(kranal_module_init);
1738 module_exit(kranal_module_fini);