Whamcloud - gitweb
This update includes Hex error ID's and checksum calculation for console error messages.
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23 #include "ralnd.h"
24
25 static int        kranal_devids[RANAL_MAXDEVS] = {RAPK_MAIN_DEVICE_ID,
26                                                   RAPK_EXPANSION_DEVICE_ID};
27
28 lnd_t the_kralnd = {
29         .lnd_type       = RALND,
30         .lnd_startup    = kranal_startup,
31         .lnd_shutdown   = kranal_shutdown,
32         .lnd_ctl        = kranal_ctl,
33         .lnd_send       = kranal_send,
34         .lnd_recv       = kranal_recv,
35         .lnd_eager_recv = kranal_eager_recv,
36         .lnd_accept     = kranal_accept,
37 };
38
39 kra_data_t              kranal_data;
40
41 void
42 kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, lnet_nid_t dstnid)
43 {
44         RAP_RETURN   rrc;
45
46         memset(connreq, 0, sizeof(*connreq));
47
48         connreq->racr_magic     = RANAL_MSG_MAGIC;
49         connreq->racr_version   = RANAL_MSG_VERSION;
50
51         if (conn == NULL)                       /* prepping a "stub" reply */
52                 return;
53
54         connreq->racr_devid     = conn->rac_device->rad_id;
55         connreq->racr_srcnid    = lnet_ptlcompat_srcnid(kranal_data.kra_ni->ni_nid,
56                                                         dstnid);
57         connreq->racr_dstnid    = dstnid;
58         connreq->racr_peerstamp = kranal_data.kra_peerstamp;
59         connreq->racr_connstamp = conn->rac_my_connstamp;
60         connreq->racr_timeout   = conn->rac_timeout;
61
62         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
63         LASSERT(rrc == RAP_SUCCESS);
64 }
65
66 int
67 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int active)
68 {
69         int         timeout = active ? *kranal_tunables.kra_timeout :
70                                         lnet_acceptor_timeout();
71         int         swab;
72         int         rc;
73
74         /* return 0 on success, -ve on error, +ve to tell the peer I'm "old" */
75
76         rc = libcfs_sock_read(sock, &connreq->racr_magic, 
77                               sizeof(connreq->racr_magic), timeout);
78         if (rc != 0) {
79                 CERROR("Read(magic) failed(1): %d\n", rc);
80                 return -EIO;
81         }
82
83         if (connreq->racr_magic != RANAL_MSG_MAGIC &&
84             connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {
85                 /* Unexpected magic! */
86                 if (!active &&
87                     the_lnet.ln_ptlcompat == 0 &&
88                     (connreq->racr_magic == LNET_PROTO_MAGIC ||
89                      connreq->racr_magic == __swab32(LNET_PROTO_MAGIC))) {
90                         /* future protocol version compatibility!
91                          * When LNET unifies protocols over all LNDs, the first
92                          * thing sent will be a version query.  +ve rc means I
93                          * reply with my current magic/version */
94                         return EPROTO;
95                 }
96
97                 if (active ||
98                     the_lnet.ln_ptlcompat == 0) {
99                         CERROR("Unexpected magic %08x (1)\n",
100                                connreq->racr_magic);
101                         return -EPROTO;
102                 }
103
104                 /* When portals compatibility is set, I may be passed a new
105                  * connection "blindly" by the acceptor, and I have to
106                  * determine if my peer has sent an acceptor connection request
107                  * or not.  This isn't a connreq, so I'll get the acceptor to
108                  * look at it... */
109                 rc = lnet_accept(kranal_data.kra_ni, sock, connreq->racr_magic);
110                 if (rc != 0)
111                         return -EPROTO;
112
113                 /* ...and if it's OK I'm back to looking for a connreq... */
114                 rc = libcfs_sock_read(sock, &connreq->racr_magic,
115                                       sizeof(connreq->racr_magic), timeout);
116                 if (rc != 0) {
117                         CERROR("Read(magic) failed(2): %d\n", rc);
118                         return -EIO;
119                 }
120
121                 if (connreq->racr_magic != RANAL_MSG_MAGIC &&
122                     connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {
123                         CERROR("Unexpected magic %08x(2)\n",
124                                connreq->racr_magic);
125                         return -EPROTO;
126                 }
127         }
128
129         swab = (connreq->racr_magic == __swab32(RANAL_MSG_MAGIC));
130
131         rc = libcfs_sock_read(sock, &connreq->racr_version,
132                               sizeof(connreq->racr_version), timeout);
133         if (rc != 0) {
134                 CERROR("Read(version) failed: %d\n", rc);
135                 return -EIO;
136         }
137
138         if (swab)
139                 __swab16s(&connreq->racr_version);
140         
141         if (connreq->racr_version != RANAL_MSG_VERSION) {
142                 if (active) {
143                         CERROR("Unexpected version %d\n", connreq->racr_version);
144                         return -EPROTO;
145                 }
146                 /* If this is a future version of the ralnd protocol, and I'm
147                  * passive (accepted the connection), tell my peer I'm "old"
148                  * (+ve rc) */
149                 return EPROTO;
150         }
151
152         rc = libcfs_sock_read(sock, &connreq->racr_devid,
153                               sizeof(connreq->racr_version) -
154                               offsetof(kra_connreq_t, racr_devid),
155                               timeout);
156         if (rc != 0) {
157                 CERROR("Read(body) failed: %d\n", rc);
158                 return -EIO;
159         }
160
161         if (swab) {
162                 __swab32s(&connreq->racr_magic);
163                 __swab16s(&connreq->racr_version);
164                 __swab16s(&connreq->racr_devid);
165                 __swab64s(&connreq->racr_srcnid);
166                 __swab64s(&connreq->racr_dstnid);
167                 __swab64s(&connreq->racr_peerstamp);
168                 __swab64s(&connreq->racr_connstamp);
169                 __swab32s(&connreq->racr_timeout);
170
171                 __swab32s(&connreq->racr_riparams.HostId);
172                 __swab32s(&connreq->racr_riparams.FmaDomainHndl);
173                 __swab32s(&connreq->racr_riparams.PTag);
174                 __swab32s(&connreq->racr_riparams.CompletionCookie);
175         }
176
177         if (connreq->racr_srcnid == LNET_NID_ANY ||
178             connreq->racr_dstnid == LNET_NID_ANY) {
179                 CERROR("Received LNET_NID_ANY\n");
180                 return -EPROTO;
181         }
182
183         if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {
184                 CERROR("Received timeout %d < MIN %d\n",
185                        connreq->racr_timeout, RANAL_MIN_TIMEOUT);
186                 return -EPROTO;
187         }
188
189         return 0;
190 }
191
192 int
193 kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
194 {
195         kra_conn_t         *conn;
196         struct list_head   *ctmp;
197         struct list_head   *cnxt;
198         int                 loopback;
199         int                 count = 0;
200
201         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
202
203         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
204                 conn = list_entry(ctmp, kra_conn_t, rac_list);
205
206                 if (conn == newconn)
207                         continue;
208
209                 if (conn->rac_peerstamp != newconn->rac_peerstamp) {
210                         CDEBUG(D_NET, "Closing stale conn nid: %s "
211                                " peerstamp:"LPX64"("LPX64")\n", 
212                                libcfs_nid2str(peer->rap_nid),
213                                conn->rac_peerstamp, newconn->rac_peerstamp);
214                         LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
215                         count++;
216                         kranal_close_conn_locked(conn, -ESTALE);
217                         continue;
218                 }
219
220                 if (conn->rac_device != newconn->rac_device)
221                         continue;
222
223                 if (loopback &&
224                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
225                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
226                         continue;
227
228                 LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
229
230                 CDEBUG(D_NET, "Closing stale conn nid: %s"
231                        " connstamp:"LPX64"("LPX64")\n", 
232                        libcfs_nid2str(peer->rap_nid),
233                        conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
234
235                 count++;
236                 kranal_close_conn_locked(conn, -ESTALE);
237         }
238
239         return count;
240 }
241
242 int
243 kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
244 {
245         kra_conn_t       *conn;
246         struct list_head *tmp;
247         int               loopback;
248
249         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
250
251         list_for_each(tmp, &peer->rap_conns) {
252                 conn = list_entry(tmp, kra_conn_t, rac_list);
253
254                 /* 'newconn' is from an earlier version of 'peer'!!! */
255                 if (newconn->rac_peerstamp < conn->rac_peerstamp)
256                         return 1;
257
258                 /* 'conn' is from an earlier version of 'peer': it will be
259                  * removed when we cull stale conns later on... */
260                 if (newconn->rac_peerstamp > conn->rac_peerstamp)
261                         continue;
262
263                 /* Different devices are OK */
264                 if (conn->rac_device != newconn->rac_device)
265                         continue;
266
267                 /* It's me connecting to myself */
268                 if (loopback &&
269                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
270                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
271                         continue;
272
273                 /* 'newconn' is an earlier connection from 'peer'!!! */
274                 if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
275                         return 2;
276
277                 /* 'conn' is an earlier connection from 'peer': it will be
278                  * removed when we cull stale conns later on... */
279                 if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
280                         continue;
281
282                 /* 'newconn' has the SAME connection stamp; 'peer' isn't
283                  * playing the game... */
284                 return 3;
285         }
286
287         return 0;
288 }
289
290 void
291 kranal_set_conn_uniqueness (kra_conn_t *conn)
292 {
293         unsigned long  flags;
294
295         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
296
297         conn->rac_my_connstamp = kranal_data.kra_connstamp++;
298
299         do {    /* allocate a unique cqid */
300                 conn->rac_cqid = kranal_data.kra_next_cqid++;
301         } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
302
303         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
304 }
305
306 int
307 kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
308 {
309         kra_conn_t    *conn;
310         RAP_RETURN     rrc;
311
312         LASSERT (!in_interrupt());
313         LIBCFS_ALLOC(conn, sizeof(*conn));
314
315         if (conn == NULL)
316                 return -ENOMEM;
317
318         memset(conn, 0, sizeof(*conn));
319         atomic_set(&conn->rac_refcount, 1);
320         INIT_LIST_HEAD(&conn->rac_list);
321         INIT_LIST_HEAD(&conn->rac_hashlist);
322         INIT_LIST_HEAD(&conn->rac_schedlist);
323         INIT_LIST_HEAD(&conn->rac_fmaq);
324         INIT_LIST_HEAD(&conn->rac_rdmaq);
325         INIT_LIST_HEAD(&conn->rac_replyq);
326         spin_lock_init(&conn->rac_lock);
327
328         kranal_set_conn_uniqueness(conn);
329
330         conn->rac_device = dev;
331         conn->rac_timeout = MAX(*kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
332         kranal_update_reaper_timeout(conn->rac_timeout);
333
334         rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
335                            &conn->rac_rihandle);
336         if (rrc != RAP_SUCCESS) {
337                 CERROR("RapkCreateRi failed: %d\n", rrc);
338                 LIBCFS_FREE(conn, sizeof(*conn));
339                 return -ENETDOWN;
340         }
341
342         atomic_inc(&kranal_data.kra_nconns);
343         *connp = conn;
344         return 0;
345 }
346
347 void
348 kranal_destroy_conn(kra_conn_t *conn)
349 {
350         RAP_RETURN         rrc;
351
352         LASSERT (!in_interrupt());
353         LASSERT (!conn->rac_scheduled);
354         LASSERT (list_empty(&conn->rac_list));
355         LASSERT (list_empty(&conn->rac_hashlist));
356         LASSERT (list_empty(&conn->rac_schedlist));
357         LASSERT (atomic_read(&conn->rac_refcount) == 0);
358         LASSERT (list_empty(&conn->rac_fmaq));
359         LASSERT (list_empty(&conn->rac_rdmaq));
360         LASSERT (list_empty(&conn->rac_replyq));
361
362         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
363                             conn->rac_rihandle);
364         LASSERT (rrc == RAP_SUCCESS);
365
366         if (conn->rac_peer != NULL)
367                 kranal_peer_decref(conn->rac_peer);
368
369         LIBCFS_FREE(conn, sizeof(*conn));
370         atomic_dec(&kranal_data.kra_nconns);
371 }
372
373 void
374 kranal_terminate_conn_locked (kra_conn_t *conn)
375 {
376         LASSERT (!in_interrupt());
377         LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
378         LASSERT (!list_empty(&conn->rac_hashlist));
379         LASSERT (list_empty(&conn->rac_list));
380
381         /* Remove from conn hash table: no new callbacks */
382         list_del_init(&conn->rac_hashlist);
383         kranal_conn_decref(conn);
384
385         conn->rac_state = RANAL_CONN_CLOSED;
386
387         /* schedule to clear out all uncompleted comms in context of dev's
388          * scheduler */
389         kranal_schedule_conn(conn);
390 }
391
392 void
393 kranal_close_conn_locked (kra_conn_t *conn, int error)
394 {
395         kra_peer_t        *peer = conn->rac_peer;
396
397         CDEBUG(error == 0 ? D_NET : D_NETERROR,
398                "closing conn to %s: error %d\n", 
399                libcfs_nid2str(peer->rap_nid), error);
400
401         LASSERT (!in_interrupt());
402         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
403         LASSERT (!list_empty(&conn->rac_hashlist));
404         LASSERT (!list_empty(&conn->rac_list));
405
406         list_del_init(&conn->rac_list);
407
408         if (list_empty(&peer->rap_conns) &&
409             peer->rap_persistence == 0) {
410                 /* Non-persistent peer with no more conns... */
411                 kranal_unlink_peer_locked(peer);
412         }
413
414         /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
415          * full timeout.  If we get a CLOSE we know the peer has stopped all
416          * RDMA.  Otherwise if we wait for the full timeout we can also be sure
417          * all RDMA has stopped. */
418         conn->rac_last_rx = jiffies;
419         mb();
420
421         conn->rac_state = RANAL_CONN_CLOSING;
422         kranal_schedule_conn(conn);             /* schedule sending CLOSE */
423
424         kranal_conn_decref(conn);               /* lose peer's ref */
425 }
426
427 void
428 kranal_close_conn (kra_conn_t *conn, int error)
429 {
430         unsigned long    flags;
431
432
433         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
434
435         if (conn->rac_state == RANAL_CONN_ESTABLISHED)
436                 kranal_close_conn_locked(conn, error);
437
438         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
439 }
440
441 int
442 kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
443                        __u32 peer_ip, int peer_port)
444 {
445         kra_device_t  *dev = conn->rac_device;
446         unsigned long  flags;
447         RAP_RETURN     rrc;
448
449         /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive
450          * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */
451         conn->rac_last_tx = jiffies;
452         conn->rac_keepalive = 0;
453
454         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
455         if (rrc != RAP_SUCCESS) {
456                 CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
457                        HIPQUAD(peer_ip), peer_port, rrc);
458                 return -ECONNABORTED;
459         }
460
461         /* Schedule conn on rad_new_conns */
462         kranal_conn_addref(conn);
463         spin_lock_irqsave(&dev->rad_lock, flags);
464         list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns);
465         wake_up(&dev->rad_waitq);
466         spin_unlock_irqrestore(&dev->rad_lock, flags);
467
468         rrc = RapkWaitToConnect(conn->rac_rihandle);
469         if (rrc != RAP_SUCCESS) {
470                 CERROR("Error waiting to connect to %u.%u.%u.%u/%d: %d\n",
471                        HIPQUAD(peer_ip), peer_port, rrc);
472                 return -ECONNABORTED;
473         }
474
475         /* Scheduler doesn't touch conn apart from to deschedule and decref it
476          * after RapkCompleteSync() return success, so conn is all mine */
477
478         conn->rac_peerstamp = connreq->racr_peerstamp;
479         conn->rac_peer_connstamp = connreq->racr_connstamp;
480         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
481         kranal_update_reaper_timeout(conn->rac_keepalive);
482         return 0;
483 }
484
485 int
486 kranal_passive_conn_handshake (struct socket *sock, lnet_nid_t *src_nidp,
487                                lnet_nid_t *dst_nidp, kra_conn_t **connp)
488 {
489         __u32                peer_ip;
490         unsigned int         peer_port;
491         kra_connreq_t        rx_connreq;
492         kra_connreq_t        tx_connreq;
493         kra_conn_t          *conn;
494         kra_device_t        *dev;
495         int                  rc;
496         int                  i;
497
498         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
499         if (rc != 0) {
500                 CERROR("Can't get peer's IP: %d\n", rc);
501                 return rc;
502         }
503
504         rc = kranal_recv_connreq(sock, &rx_connreq, 0);
505
506         if (rc < 0) {
507                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
508                        HIPQUAD(peer_ip), peer_port, rc);
509                 return rc;
510         }
511
512         if (rc > 0) {
513                 /* Request from "new" peer: send reply with my MAGIC/VERSION to
514                  * tell her I'm old... */
515                 kranal_pack_connreq(&tx_connreq, NULL, LNET_NID_ANY);
516
517                 rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
518                                        lnet_acceptor_timeout());
519                 if (rc != 0)
520                         CERROR("Can't tx stub connreq to %u.%u.%u.%u/%d: %d\n",
521                                HIPQUAD(peer_ip), peer_port, rc);
522
523                 return -EPROTO;
524         }
525
526         for (i = 0;;i++) {
527                 if (i == kranal_data.kra_ndevs) {
528                         CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
529                                rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
530                         return -ENODEV;
531                 }
532                 dev = &kranal_data.kra_devices[i];
533                 if (dev->rad_id == rx_connreq.racr_devid)
534                         break;
535         }
536
537         rc = kranal_create_conn(&conn, dev);
538         if (rc != 0)
539                 return rc;
540
541         kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid);
542
543         rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
544                                lnet_acceptor_timeout());
545         if (rc != 0) {
546                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
547                        HIPQUAD(peer_ip), peer_port, rc);
548                 kranal_conn_decref(conn);
549                 return rc;
550         }
551
552         rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port);
553         if (rc != 0) {
554                 kranal_conn_decref(conn);
555                 return rc;
556         }
557
558         *connp = conn;
559         *src_nidp = rx_connreq.racr_srcnid;
560         *dst_nidp = rx_connreq.racr_dstnid;
561         return 0;
562 }
563
564 int
565 kranal_active_conn_handshake(kra_peer_t *peer,
566                              lnet_nid_t *dst_nidp, kra_conn_t **connp)
567 {
568         kra_connreq_t       connreq;
569         kra_conn_t         *conn;
570         kra_device_t       *dev;
571         struct socket      *sock;
572         int                 rc;
573         unsigned int        idx;
574
575         /* spread connections over all devices using both peer NIDs to ensure
576          * all nids use all devices */
577         idx = peer->rap_nid + kranal_data.kra_ni->ni_nid;
578         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
579
580         rc = kranal_create_conn(&conn, dev);
581         if (rc != 0)
582                 return rc;
583
584         kranal_pack_connreq(&connreq, conn, peer->rap_nid);
585
586         if (the_lnet.ln_testprotocompat != 0) {
587                 /* single-shot proto test */
588                 LNET_LOCK();
589                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
590                         connreq.racr_version++;
591                         the_lnet.ln_testprotocompat &= ~1;
592                 }
593                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
594                         connreq.racr_magic = LNET_PROTO_MAGIC;
595                         the_lnet.ln_testprotocompat &= ~2;
596                 }
597                 LNET_UNLOCK();
598         }
599
600         rc = lnet_connect(&sock, peer->rap_nid,
601                          0, peer->rap_ip, peer->rap_port);
602         if (rc != 0)
603                 goto failed_0;
604
605         /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
606          * immediately after accepting a connection, so we connect and then
607          * send immediately. */
608
609         rc = libcfs_sock_write(sock, &connreq, sizeof(connreq),
610                                lnet_acceptor_timeout());
611         if (rc != 0) {
612                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
613                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
614                 goto failed_2;
615         }
616
617         rc = kranal_recv_connreq(sock, &connreq, 1);
618         if (rc != 0) {
619                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
620                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
621                 goto failed_2;
622         }
623
624         libcfs_sock_release(sock);
625         rc = -EPROTO;
626
627         if (connreq.racr_srcnid != peer->rap_nid) {
628                 CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
629                        "received %s expected %s\n",
630                        HIPQUAD(peer->rap_ip), peer->rap_port,
631                        libcfs_nid2str(connreq.racr_srcnid), 
632                        libcfs_nid2str(peer->rap_nid));
633                 goto failed_1;
634         }
635
636         if (connreq.racr_devid != dev->rad_id) {
637                 CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
638                        "received %d expected %d\n",
639                        HIPQUAD(peer->rap_ip), peer->rap_port,
640                        connreq.racr_devid, dev->rad_id);
641                 goto failed_1;
642         }
643
644         rc = kranal_set_conn_params(conn, &connreq,
645                                     peer->rap_ip, peer->rap_port);
646         if (rc != 0)
647                 goto failed_1;
648
649         *connp = conn;
650         *dst_nidp = connreq.racr_dstnid;
651         return 0;
652
653  failed_2:
654         libcfs_sock_release(sock);
655  failed_1:
656         lnet_connect_console_error(rc, peer->rap_nid,
657                                   peer->rap_ip, peer->rap_port);
658  failed_0:
659         kranal_conn_decref(conn);
660         return rc;
661 }
662
663 int
664 kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
665 {
666         kra_peer_t        *peer2;
667         kra_tx_t          *tx;
668         lnet_nid_t          peer_nid;
669         lnet_nid_t          dst_nid;
670         unsigned long      flags;
671         kra_conn_t        *conn;
672         int                rc;
673         int                nstale;
674         int                new_peer = 0;
675
676         if (sock == NULL) {
677                 /* active: connd wants to connect to 'peer' */
678                 LASSERT (peer != NULL);
679                 LASSERT (peer->rap_connecting);
680
681                 rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
682                 if (rc != 0)
683                         return rc;
684
685                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
686
687                 if (!kranal_peer_active(peer)) {
688                         /* raced with peer getting unlinked */
689                         write_unlock_irqrestore(&kranal_data.kra_global_lock,
690                                                 flags);
691                         kranal_conn_decref(conn);
692                         return -ESTALE;
693                 }
694
695                 peer_nid = peer->rap_nid;
696         } else {
697                 /* passive: listener accepted 'sock' */
698                 LASSERT (peer == NULL);
699
700                 rc = kranal_passive_conn_handshake(sock, &peer_nid,
701                                                    &dst_nid, &conn);
702                 if (rc != 0)
703                         return rc;
704
705                 /* assume this is a new peer */
706                 rc = kranal_create_peer(&peer, peer_nid);
707                 if (rc != 0) {
708                         CERROR("Can't create conn for %s\n", 
709                                libcfs_nid2str(peer_nid));
710                         kranal_conn_decref(conn);
711                         return -ENOMEM;
712                 }
713
714                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
715
716                 peer2 = kranal_find_peer_locked(peer_nid);
717                 if (peer2 == NULL) {
718                         new_peer = 1;
719                 } else {
720                         /* peer_nid already in the peer table */
721                         kranal_peer_decref(peer);
722                         peer = peer2;
723                 }
724         }
725
726         LASSERT ((!new_peer) != (!kranal_peer_active(peer)));
727
728         /* Refuse connection if peer thinks we are a different NID.  We check
729          * this while holding the global lock, to synch with connection
730          * destruction on NID change. */
731         if (!lnet_ptlcompat_matchnid(kranal_data.kra_ni->ni_nid, dst_nid)) {
732                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
733
734                 CERROR("Stale/bad connection with %s: dst_nid %s, expected %s\n",
735                        libcfs_nid2str(peer_nid), libcfs_nid2str(dst_nid), 
736                        libcfs_nid2str(kranal_data.kra_ni->ni_nid));
737                 rc = -ESTALE;
738                 goto failed;
739         }
740
741         /* Refuse to duplicate an existing connection (both sides might try to
742          * connect at once).  NB we return success!  We _are_ connected so we
743          * _don't_ have any blocked txs to complete with failure. */
744         rc = kranal_conn_isdup_locked(peer, conn);
745         if (rc != 0) {
746                 LASSERT (!list_empty(&peer->rap_conns));
747                 LASSERT (list_empty(&peer->rap_tx_queue));
748                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
749                 CWARN("Not creating duplicate connection to %s: %d\n",
750                       libcfs_nid2str(peer_nid), rc);
751                 rc = 0;
752                 goto failed;
753         }
754
755         if (new_peer) {
756                 /* peer table takes my ref on the new peer */
757                 list_add_tail(&peer->rap_list,
758                               kranal_nid2peerlist(peer_nid));
759         }
760
761         /* initialise timestamps before reaper looks at them */
762         conn->rac_last_tx = conn->rac_last_rx = jiffies;
763
764         kranal_peer_addref(peer);               /* +1 ref for conn */
765         conn->rac_peer = peer;
766         list_add_tail(&conn->rac_list, &peer->rap_conns);
767
768         kranal_conn_addref(conn);               /* +1 ref for conn table */
769         list_add_tail(&conn->rac_hashlist,
770                       kranal_cqid2connlist(conn->rac_cqid));
771
772         /* Schedule all packets blocking for a connection */
773         while (!list_empty(&peer->rap_tx_queue)) {
774                 tx = list_entry(peer->rap_tx_queue.next,
775                                 kra_tx_t, tx_list);
776
777                 list_del(&tx->tx_list);
778                 kranal_post_fma(conn, tx);
779         }
780
781         nstale = kranal_close_stale_conns_locked(peer, conn);
782
783         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
784
785         /* CAVEAT EMPTOR: passive peer can disappear NOW */
786
787         if (nstale != 0)
788                 CWARN("Closed %d stale conns to %s\n", nstale, 
789                       libcfs_nid2str(peer_nid));
790
791         CWARN("New connection to %s on devid[%d] = %d\n",
792                libcfs_nid2str(peer_nid), 
793                conn->rac_device->rad_idx, conn->rac_device->rad_id);
794
795         /* Ensure conn gets checked.  Transmits may have been queued and an
796          * FMA event may have happened before it got in the cq hash table */
797         kranal_schedule_conn(conn);
798         return 0;
799
800  failed:
801         if (new_peer)
802                 kranal_peer_decref(peer);
803         kranal_conn_decref(conn);
804         return rc;
805 }
806
807 void
808 kranal_connect (kra_peer_t *peer)
809 {
810         kra_tx_t          *tx;
811         unsigned long      flags;
812         struct list_head   zombies;
813         int                rc;
814
815         LASSERT (peer->rap_connecting);
816
817         CDEBUG(D_NET, "About to handshake %s\n", 
818                libcfs_nid2str(peer->rap_nid));
819
820         rc = kranal_conn_handshake(NULL, peer);
821
822         CDEBUG(D_NET, "Done handshake %s:%d \n", 
823                libcfs_nid2str(peer->rap_nid), rc);
824
825         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
826
827         LASSERT (peer->rap_connecting);
828         peer->rap_connecting = 0;
829
830         if (rc == 0) {
831                 /* kranal_conn_handshake() queues blocked txs immediately on
832                  * success to avoid messages jumping the queue */
833                 LASSERT (list_empty(&peer->rap_tx_queue));
834
835                 peer->rap_reconnect_interval = 0; /* OK to reconnect at any time */
836
837                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
838                 return;
839         }
840
841         peer->rap_reconnect_interval *= 2;
842         peer->rap_reconnect_interval =
843                 MAX(peer->rap_reconnect_interval,
844                     *kranal_tunables.kra_min_reconnect_interval);
845         peer->rap_reconnect_interval =
846                 MIN(peer->rap_reconnect_interval,
847                     *kranal_tunables.kra_max_reconnect_interval);
848
849         peer->rap_reconnect_time = jiffies + peer->rap_reconnect_interval * HZ;
850
851         /* Grab all blocked packets while we have the global lock */
852         list_add(&zombies, &peer->rap_tx_queue);
853         list_del_init(&peer->rap_tx_queue);
854
855         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
856
857         if (list_empty(&zombies))
858                 return;
859
860         CDEBUG(D_NETERROR, "Dropping packets for %s: connection failed\n",
861                libcfs_nid2str(peer->rap_nid));
862
863         do {
864                 tx = list_entry(zombies.next, kra_tx_t, tx_list);
865
866                 list_del(&tx->tx_list);
867                 kranal_tx_done(tx, -EHOSTUNREACH);
868
869         } while (!list_empty(&zombies));
870 }
871
872 void
873 kranal_free_acceptsock (kra_acceptsock_t *ras)
874 {
875         libcfs_sock_release(ras->ras_sock);
876         LIBCFS_FREE(ras, sizeof(*ras));
877 }
878
879 int
880 kranal_accept (lnet_ni_t *ni, struct socket *sock)
881 {
882         kra_acceptsock_t  *ras;
883         int                rc;
884         __u32              peer_ip;
885         int                peer_port;
886         unsigned long      flags;
887
888         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
889         LASSERT (rc == 0);                      /* we succeeded before */
890
891         LIBCFS_ALLOC(ras, sizeof(*ras));
892         if (ras == NULL) {
893                 CERROR("ENOMEM allocating connection request from "
894                        "%u.%u.%u.%u\n", HIPQUAD(peer_ip));
895                 return -ENOMEM;
896         }
897
898         ras->ras_sock = sock;
899
900         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
901
902         list_add_tail(&ras->ras_list, &kranal_data.kra_connd_acceptq);
903         wake_up(&kranal_data.kra_connd_waitq);
904
905         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
906         return 0;
907 }
908
909 int
910 kranal_create_peer (kra_peer_t **peerp, lnet_nid_t nid)
911 {
912         kra_peer_t    *peer;
913         unsigned long  flags;
914
915         LASSERT (nid != LNET_NID_ANY);
916
917         LIBCFS_ALLOC(peer, sizeof(*peer));
918         if (peer == NULL)
919                 return -ENOMEM;
920
921         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
922
923         peer->rap_nid = nid;
924         atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
925
926         INIT_LIST_HEAD(&peer->rap_list);
927         INIT_LIST_HEAD(&peer->rap_connd_list);
928         INIT_LIST_HEAD(&peer->rap_conns);
929         INIT_LIST_HEAD(&peer->rap_tx_queue);
930
931         peer->rap_reconnect_interval = 0;       /* OK to connect at any time */
932
933         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
934
935         if (kranal_data.kra_nonewpeers) {
936                 /* shutdown has started already */
937                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
938                 
939                 LIBCFS_FREE(peer, sizeof(*peer));
940                 CERROR("Can't create peer: network shutdown\n");
941                 return -ESHUTDOWN;
942         }
943
944         atomic_inc(&kranal_data.kra_npeers);
945
946         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
947
948         *peerp = peer;
949         return 0;
950 }
951
952 void
953 kranal_destroy_peer (kra_peer_t *peer)
954 {
955         CDEBUG(D_NET, "peer %s %p deleted\n", 
956                libcfs_nid2str(peer->rap_nid), peer);
957
958         LASSERT (atomic_read(&peer->rap_refcount) == 0);
959         LASSERT (peer->rap_persistence == 0);
960         LASSERT (!kranal_peer_active(peer));
961         LASSERT (!peer->rap_connecting);
962         LASSERT (list_empty(&peer->rap_conns));
963         LASSERT (list_empty(&peer->rap_tx_queue));
964         LASSERT (list_empty(&peer->rap_connd_list));
965
966         LIBCFS_FREE(peer, sizeof(*peer));
967
968         /* NB a peer's connections keep a reference on their peer until
969          * they are destroyed, so we can be assured that _all_ state to do
970          * with this peer has been cleaned up when its refcount drops to
971          * zero. */
972         atomic_dec(&kranal_data.kra_npeers);
973 }
974
975 kra_peer_t *
976 kranal_find_peer_locked (lnet_nid_t nid)
977 {
978         struct list_head *peer_list = kranal_nid2peerlist(nid);
979         struct list_head *tmp;
980         kra_peer_t       *peer;
981
982         list_for_each (tmp, peer_list) {
983
984                 peer = list_entry(tmp, kra_peer_t, rap_list);
985
986                 LASSERT (peer->rap_persistence > 0 ||     /* persistent peer */
987                          !list_empty(&peer->rap_conns));  /* active conn */
988
989                 if (peer->rap_nid != nid)
990                         continue;
991
992                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
993                        peer, libcfs_nid2str(nid), 
994                        atomic_read(&peer->rap_refcount));
995                 return peer;
996         }
997         return NULL;
998 }
999
1000 kra_peer_t *
1001 kranal_find_peer (lnet_nid_t nid)
1002 {
1003         kra_peer_t     *peer;
1004
1005         read_lock(&kranal_data.kra_global_lock);
1006         peer = kranal_find_peer_locked(nid);
1007         if (peer != NULL)                       /* +1 ref for caller? */
1008                 kranal_peer_addref(peer);
1009         read_unlock(&kranal_data.kra_global_lock);
1010
1011         return peer;
1012 }
1013
1014 void
1015 kranal_unlink_peer_locked (kra_peer_t *peer)
1016 {
1017         LASSERT (peer->rap_persistence == 0);
1018         LASSERT (list_empty(&peer->rap_conns));
1019
1020         LASSERT (kranal_peer_active(peer));
1021         list_del_init(&peer->rap_list);
1022
1023         /* lose peerlist's ref */
1024         kranal_peer_decref(peer);
1025 }
1026
1027 int
1028 kranal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,
1029                       int *persistencep)
1030 {
1031         kra_peer_t        *peer;
1032         struct list_head  *ptmp;
1033         int                i;
1034
1035         read_lock(&kranal_data.kra_global_lock);
1036
1037         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1038
1039                 list_for_each(ptmp, &kranal_data.kra_peers[i]) {
1040
1041                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1042                         LASSERT (peer->rap_persistence > 0 ||
1043                                  !list_empty(&peer->rap_conns));
1044
1045                         if (index-- > 0)
1046                                 continue;
1047
1048                         *nidp = peer->rap_nid;
1049                         *ipp = peer->rap_ip;
1050                         *portp = peer->rap_port;
1051                         *persistencep = peer->rap_persistence;
1052
1053                         read_unlock(&kranal_data.kra_global_lock);
1054                         return 0;
1055                 }
1056         }
1057
1058         read_unlock(&kranal_data.kra_global_lock);
1059         return -ENOENT;
1060 }
1061
1062 int
1063 kranal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port)
1064 {
1065         unsigned long      flags;
1066         kra_peer_t        *peer;
1067         kra_peer_t        *peer2;
1068         int                rc;
1069
1070         if (nid == LNET_NID_ANY)
1071                 return -EINVAL;
1072
1073         rc = kranal_create_peer(&peer, nid);
1074         if (rc != 0)
1075                 return rc;
1076
1077         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1078
1079         peer2 = kranal_find_peer_locked(nid);
1080         if (peer2 != NULL) {
1081                 kranal_peer_decref(peer);
1082                 peer = peer2;
1083         } else {
1084                 /* peer table takes existing ref on peer */
1085                 list_add_tail(&peer->rap_list,
1086                               kranal_nid2peerlist(nid));
1087         }
1088
1089         peer->rap_ip = ip;
1090         peer->rap_port = port;
1091         peer->rap_persistence++;
1092
1093         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1094         return 0;
1095 }
1096
1097 void
1098 kranal_del_peer_locked (kra_peer_t *peer)
1099 {
1100         struct list_head *ctmp;
1101         struct list_head *cnxt;
1102         kra_conn_t       *conn;
1103
1104         peer->rap_persistence = 0;
1105
1106         if (list_empty(&peer->rap_conns)) {
1107                 kranal_unlink_peer_locked(peer);
1108         } else {
1109                 list_for_each_safe(ctmp, cnxt, &peer->rap_conns) {
1110                         conn = list_entry(ctmp, kra_conn_t, rac_list);
1111
1112                         kranal_close_conn_locked(conn, 0);
1113                 }
1114                 /* peer unlinks itself when last conn is closed */
1115         }
1116 }
1117
1118 int
1119 kranal_del_peer (lnet_nid_t nid)
1120 {
1121         unsigned long      flags;
1122         struct list_head  *ptmp;
1123         struct list_head  *pnxt;
1124         kra_peer_t        *peer;
1125         int                lo;
1126         int                hi;
1127         int                i;
1128         int                rc = -ENOENT;
1129
1130         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1131
1132         if (nid != LNET_NID_ANY)
1133                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1134         else {
1135                 lo = 0;
1136                 hi = kranal_data.kra_peer_hash_size - 1;
1137         }
1138
1139         for (i = lo; i <= hi; i++) {
1140                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1141                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1142                         LASSERT (peer->rap_persistence > 0 ||
1143                                  !list_empty(&peer->rap_conns));
1144
1145                         if (!(nid == LNET_NID_ANY || peer->rap_nid == nid))
1146                                 continue;
1147
1148                         kranal_del_peer_locked(peer);
1149                         rc = 0;         /* matched something */
1150                 }
1151         }
1152
1153         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1154
1155         return rc;
1156 }
1157
1158 kra_conn_t *
1159 kranal_get_conn_by_idx (int index)
1160 {
1161         kra_peer_t        *peer;
1162         struct list_head  *ptmp;
1163         kra_conn_t        *conn;
1164         struct list_head  *ctmp;
1165         int                i;
1166
1167         read_lock (&kranal_data.kra_global_lock);
1168
1169         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1170                 list_for_each (ptmp, &kranal_data.kra_peers[i]) {
1171
1172                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1173                         LASSERT (peer->rap_persistence > 0 ||
1174                                  !list_empty(&peer->rap_conns));
1175
1176                         list_for_each (ctmp, &peer->rap_conns) {
1177                                 if (index-- > 0)
1178                                         continue;
1179
1180                                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1181                                 CDEBUG(D_NET, "++conn[%p] -> %s (%d)\n", conn, 
1182                                        libcfs_nid2str(conn->rac_peer->rap_nid),
1183                                        atomic_read(&conn->rac_refcount));
1184                                 atomic_inc(&conn->rac_refcount);
1185                                 read_unlock(&kranal_data.kra_global_lock);
1186                                 return conn;
1187                         }
1188                 }
1189         }
1190
1191         read_unlock(&kranal_data.kra_global_lock);
1192         return NULL;
1193 }
1194
1195 int
1196 kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
1197 {
1198         kra_conn_t         *conn;
1199         struct list_head   *ctmp;
1200         struct list_head   *cnxt;
1201         int                 count = 0;
1202
1203         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
1204                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1205
1206                 count++;
1207                 kranal_close_conn_locked(conn, why);
1208         }
1209
1210         return count;
1211 }
1212
1213 int
1214 kranal_close_matching_conns (lnet_nid_t nid)
1215 {
1216         unsigned long       flags;
1217         kra_peer_t         *peer;
1218         struct list_head   *ptmp;
1219         struct list_head   *pnxt;
1220         int                 lo;
1221         int                 hi;
1222         int                 i;
1223         int                 count = 0;
1224
1225         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1226
1227         if (nid != LNET_NID_ANY)
1228                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1229         else {
1230                 lo = 0;
1231                 hi = kranal_data.kra_peer_hash_size - 1;
1232         }
1233
1234         for (i = lo; i <= hi; i++) {
1235                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1236
1237                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1238                         LASSERT (peer->rap_persistence > 0 ||
1239                                  !list_empty(&peer->rap_conns));
1240
1241                         if (!(nid == LNET_NID_ANY || nid == peer->rap_nid))
1242                                 continue;
1243
1244                         count += kranal_close_peer_conns_locked(peer, 0);
1245                 }
1246         }
1247
1248         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1249
1250         /* wildcards always succeed */
1251         if (nid == LNET_NID_ANY)
1252                 return 0;
1253
1254         return (count == 0) ? -ENOENT : 0;
1255 }
1256
1257 int
1258 kranal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1259 {
1260         struct libcfs_ioctl_data *data = arg;
1261         int                       rc = -EINVAL;
1262
1263         LASSERT (ni == kranal_data.kra_ni);
1264
1265         switch(cmd) {
1266         case IOC_LIBCFS_GET_PEER: {
1267                 lnet_nid_t   nid = 0;
1268                 __u32       ip = 0;
1269                 int         port = 0;
1270                 int         share_count = 0;
1271
1272                 rc = kranal_get_peer_info(data->ioc_count,
1273                                           &nid, &ip, &port, &share_count);
1274                 data->ioc_nid    = nid;
1275                 data->ioc_count  = share_count;
1276                 data->ioc_u32[0] = ip;
1277                 data->ioc_u32[1] = port;
1278                 break;
1279         }
1280         case IOC_LIBCFS_ADD_PEER: {
1281                 rc = kranal_add_persistent_peer(data->ioc_nid,
1282                                                 data->ioc_u32[0], /* IP */
1283                                                 data->ioc_u32[1]); /* port */
1284                 break;
1285         }
1286         case IOC_LIBCFS_DEL_PEER: {
1287                 rc = kranal_del_peer(data->ioc_nid);
1288                 break;
1289         }
1290         case IOC_LIBCFS_GET_CONN: {
1291                 kra_conn_t *conn = kranal_get_conn_by_idx(data->ioc_count);
1292
1293                 if (conn == NULL)
1294                         rc = -ENOENT;
1295                 else {
1296                         rc = 0;
1297                         data->ioc_nid    = conn->rac_peer->rap_nid;
1298                         data->ioc_u32[0] = conn->rac_device->rad_id;
1299                         kranal_conn_decref(conn);
1300                 }
1301                 break;
1302         }
1303         case IOC_LIBCFS_CLOSE_CONNECTION: {
1304                 rc = kranal_close_matching_conns(data->ioc_nid);
1305                 break;
1306         }
1307         case IOC_LIBCFS_REGISTER_MYNID: {
1308                 /* Ignore if this is a noop */
1309                 if (data->ioc_nid == ni->ni_nid) {
1310                         rc = 0;
1311                 } else {
1312                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1313                                libcfs_nid2str(data->ioc_nid),
1314                                libcfs_nid2str(ni->ni_nid));
1315                         rc = -EINVAL;
1316                 }
1317                 break;
1318         }
1319         }
1320
1321         return rc;
1322 }
1323
1324 void
1325 kranal_free_txdescs(struct list_head *freelist)
1326 {
1327         kra_tx_t    *tx;
1328
1329         while (!list_empty(freelist)) {
1330                 tx = list_entry(freelist->next, kra_tx_t, tx_list);
1331
1332                 list_del(&tx->tx_list);
1333                 LIBCFS_FREE(tx->tx_phys, LNET_MAX_IOV * sizeof(*tx->tx_phys));
1334                 LIBCFS_FREE(tx, sizeof(*tx));
1335         }
1336 }
1337
1338 int
1339 kranal_alloc_txdescs(struct list_head *freelist, int n)
1340 {
1341         int            i;
1342         kra_tx_t      *tx;
1343
1344         LASSERT (freelist == &kranal_data.kra_idle_txs);
1345         LASSERT (list_empty(freelist));
1346
1347         for (i = 0; i < n; i++) {
1348
1349                 LIBCFS_ALLOC(tx, sizeof(*tx));
1350                 if (tx == NULL) {
1351                         CERROR("Can't allocate tx[%d]\n", i);
1352                         kranal_free_txdescs(freelist);
1353                         return -ENOMEM;
1354                 }
1355
1356                 LIBCFS_ALLOC(tx->tx_phys,
1357                              LNET_MAX_IOV * sizeof(*tx->tx_phys));
1358                 if (tx->tx_phys == NULL) {
1359                         CERROR("Can't allocate tx[%d]->tx_phys\n", i);
1360
1361                         LIBCFS_FREE(tx, sizeof(*tx));
1362                         kranal_free_txdescs(freelist);
1363                         return -ENOMEM;
1364                 }
1365
1366                 tx->tx_buftype = RANAL_BUF_NONE;
1367                 tx->tx_msg.ram_type = RANAL_MSG_NONE;
1368
1369                 list_add(&tx->tx_list, freelist);
1370         }
1371
1372         return 0;
1373 }
1374
1375 int
1376 kranal_device_init(int id, kra_device_t *dev)
1377 {
1378         int               total_ntx = *kranal_tunables.kra_ntx;
1379         RAP_RETURN        rrc;
1380
1381         dev->rad_id = id;
1382         rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
1383                                    &dev->rad_handle);
1384         if (rrc != RAP_SUCCESS) {
1385                 CERROR("Can't get Rapidarray Device %d: %d\n", id, rrc);
1386                 goto failed_0;
1387         }
1388
1389         rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
1390         if (rrc != RAP_SUCCESS) {
1391                 CERROR("Can't reserve %d RDMA descriptors"
1392                        " for device %d: %d\n", total_ntx, id, rrc);
1393                 goto failed_1;
1394         }
1395
1396         rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
1397                            &dev->rad_rdma_cqh);
1398         if (rrc != RAP_SUCCESS) {
1399                 CERROR("Can't create rdma cq size %d for device %d: %d\n",
1400                        total_ntx, id, rrc);
1401                 goto failed_1;
1402         }
1403
1404         rrc = RapkCreateCQ(dev->rad_handle, 
1405                            *kranal_tunables.kra_fma_cq_size, 
1406                            RAP_CQTYPE_RECV, &dev->rad_fma_cqh);
1407         if (rrc != RAP_SUCCESS) {
1408                 CERROR("Can't create fma cq size %d for device %d: %d\n", 
1409                        *kranal_tunables.kra_fma_cq_size, id, rrc);
1410                 goto failed_2;
1411         }
1412
1413         return 0;
1414
1415  failed_2:
1416         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1417  failed_1:
1418         RapkReleaseDevice(dev->rad_handle);
1419  failed_0:
1420         return -ENODEV;
1421 }
1422
1423 void
1424 kranal_device_fini(kra_device_t *dev)
1425 {
1426         LASSERT (list_empty(&dev->rad_ready_conns));
1427         LASSERT (list_empty(&dev->rad_new_conns));
1428         LASSERT (dev->rad_nphysmap == 0);
1429         LASSERT (dev->rad_nppphysmap == 0);
1430         LASSERT (dev->rad_nvirtmap == 0);
1431         LASSERT (dev->rad_nobvirtmap == 0);
1432                 
1433         LASSERT(dev->rad_scheduler == NULL);
1434         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
1435         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1436         RapkReleaseDevice(dev->rad_handle);
1437 }
1438
1439 void
1440 kranal_shutdown (lnet_ni_t *ni)
1441 {
1442         int           i;
1443         unsigned long flags;
1444
1445         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1446                atomic_read(&libcfs_kmemory));
1447
1448         LASSERT (ni == kranal_data.kra_ni);
1449         LASSERT (ni->ni_data == &kranal_data);
1450
1451         switch (kranal_data.kra_init) {
1452         default:
1453                 CERROR("Unexpected state %d\n", kranal_data.kra_init);
1454                 LBUG();
1455
1456         case RANAL_INIT_ALL:
1457                 /* Prevent new peers from being created */
1458                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1459                 kranal_data.kra_nonewpeers = 1;
1460                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1461                 
1462                 /* Remove all existing peers from the peer table */
1463                 kranal_del_peer(LNET_NID_ANY);
1464
1465                 /* Wait for pending conn reqs to be handled */
1466                 i = 2;
1467                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1468                 while (!list_empty(&kranal_data.kra_connd_acceptq)) {
1469                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, 
1470                                                flags);
1471                         i++;
1472                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1473                                "waiting for conn reqs to clean up\n");
1474                         cfs_pause(cfs_time_seconds(1));
1475
1476                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1477                 }
1478                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1479
1480                 /* Wait for all peers to be freed */
1481                 i = 2;
1482                 while (atomic_read(&kranal_data.kra_npeers) != 0) {
1483                         i++;
1484                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1485                                "waiting for %d peers to close down\n",
1486                                atomic_read(&kranal_data.kra_npeers));
1487                         cfs_pause(cfs_time_seconds(1));
1488                 }
1489                 /* fall through */
1490
1491         case RANAL_INIT_DATA:
1492                 break;
1493         }
1494
1495         /* Peer state all cleaned up BEFORE setting shutdown, so threads don't
1496          * have to worry about shutdown races.  NB connections may be created
1497          * while there are still active connds, but these will be temporary
1498          * since peer creation always fails after the listener has started to
1499          * shut down. */
1500         LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
1501         
1502         /* Flag threads to terminate */
1503         kranal_data.kra_shutdown = 1;
1504
1505         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1506                 kra_device_t *dev = &kranal_data.kra_devices[i];
1507
1508                 spin_lock_irqsave(&dev->rad_lock, flags);
1509                 wake_up(&dev->rad_waitq);
1510                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1511         }
1512
1513         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1514         wake_up_all(&kranal_data.kra_reaper_waitq);
1515         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1516
1517         LASSERT (list_empty(&kranal_data.kra_connd_peers));
1518         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1519         wake_up_all(&kranal_data.kra_connd_waitq);
1520         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1521
1522         /* Wait for threads to exit */
1523         i = 2;
1524         while (atomic_read(&kranal_data.kra_nthreads) != 0) {
1525                 i++;
1526                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1527                        "Waiting for %d threads to terminate\n",
1528                        atomic_read(&kranal_data.kra_nthreads));
1529                 cfs_pause(cfs_time_seconds(1));
1530         }
1531
1532         LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
1533         if (kranal_data.kra_peers != NULL) {
1534                 for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1535                         LASSERT (list_empty(&kranal_data.kra_peers[i]));
1536
1537                 LIBCFS_FREE(kranal_data.kra_peers,
1538                             sizeof (struct list_head) *
1539                             kranal_data.kra_peer_hash_size);
1540         }
1541
1542         LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
1543         if (kranal_data.kra_conns != NULL) {
1544                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1545                         LASSERT (list_empty(&kranal_data.kra_conns[i]));
1546
1547                 LIBCFS_FREE(kranal_data.kra_conns,
1548                             sizeof (struct list_head) *
1549                             kranal_data.kra_conn_hash_size);
1550         }
1551
1552         for (i = 0; i < kranal_data.kra_ndevs; i++)
1553                 kranal_device_fini(&kranal_data.kra_devices[i]);
1554
1555         kranal_free_txdescs(&kranal_data.kra_idle_txs);
1556
1557         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1558                atomic_read(&libcfs_kmemory));
1559
1560         kranal_data.kra_init = RANAL_INIT_NOTHING;
1561         PORTAL_MODULE_UNUSE;
1562 }
1563
1564 int
1565 kranal_startup (lnet_ni_t *ni)
1566 {
1567         struct timeval    tv;
1568         int               pkmem = atomic_read(&libcfs_kmemory);
1569         int               rc;
1570         int               i;
1571         kra_device_t     *dev;
1572
1573         LASSERT (ni->ni_lnd == &the_kralnd);
1574
1575         /* Only 1 instance supported */
1576         if (kranal_data.kra_init != RANAL_INIT_NOTHING) {
1577                 CERROR ("Only 1 instance supported\n");
1578                 return -EPERM;
1579         }
1580
1581         if (lnet_set_ip_niaddr(ni) != 0) {
1582                 CERROR ("Can't determine my NID\n");
1583                 return -EPERM;
1584         }
1585
1586         if (*kranal_tunables.kra_credits > *kranal_tunables.kra_ntx) {
1587                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1588                         *kranal_tunables.kra_credits,
1589                         *kranal_tunables.kra_ntx);
1590                 return -EINVAL;
1591         }
1592         
1593         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
1594
1595         ni->ni_maxtxcredits = *kranal_tunables.kra_credits;
1596         ni->ni_peertxcredits = *kranal_tunables.kra_peercredits;
1597
1598         ni->ni_data = &kranal_data;
1599         kranal_data.kra_ni = ni;
1600
1601         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
1602          * a unique (for all time) connstamp so we can uniquely identify
1603          * the sender.  The connstamp is an incrementing counter
1604          * initialised with seconds + microseconds at startup time.  So we
1605          * rely on NOT creating connections more frequently on average than
1606          * 1MHz to ensure we don't use old connstamps when we reboot. */
1607         do_gettimeofday(&tv);
1608         kranal_data.kra_connstamp =
1609         kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1610
1611         rwlock_init(&kranal_data.kra_global_lock);
1612
1613         for (i = 0; i < RANAL_MAXDEVS; i++ ) {
1614                 kra_device_t  *dev = &kranal_data.kra_devices[i];
1615
1616                 dev->rad_idx = i;
1617                 INIT_LIST_HEAD(&dev->rad_ready_conns);
1618                 INIT_LIST_HEAD(&dev->rad_new_conns);
1619                 init_waitqueue_head(&dev->rad_waitq);
1620                 spin_lock_init(&dev->rad_lock);
1621         }
1622
1623         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1624         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
1625         spin_lock_init(&kranal_data.kra_reaper_lock);
1626
1627         INIT_LIST_HEAD(&kranal_data.kra_connd_acceptq);
1628         INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
1629         init_waitqueue_head(&kranal_data.kra_connd_waitq);
1630         spin_lock_init(&kranal_data.kra_connd_lock);
1631
1632         INIT_LIST_HEAD(&kranal_data.kra_idle_txs);
1633         spin_lock_init(&kranal_data.kra_tx_lock);
1634
1635         /* OK to call kranal_api_shutdown() to cleanup now */
1636         kranal_data.kra_init = RANAL_INIT_DATA;
1637         PORTAL_MODULE_USE;
1638
1639         kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
1640         LIBCFS_ALLOC(kranal_data.kra_peers,
1641                      sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
1642         if (kranal_data.kra_peers == NULL)
1643                 goto failed;
1644
1645         for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1646                 INIT_LIST_HEAD(&kranal_data.kra_peers[i]);
1647
1648         kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE;
1649         LIBCFS_ALLOC(kranal_data.kra_conns,
1650                      sizeof(struct list_head) * kranal_data.kra_conn_hash_size);
1651         if (kranal_data.kra_conns == NULL)
1652                 goto failed;
1653
1654         for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1655                 INIT_LIST_HEAD(&kranal_data.kra_conns[i]);
1656
1657         rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, 
1658                                   *kranal_tunables.kra_ntx);
1659         if (rc != 0)
1660                 goto failed;
1661
1662         rc = kranal_thread_start(kranal_reaper, NULL);
1663         if (rc != 0) {
1664                 CERROR("Can't spawn ranal reaper: %d\n", rc);
1665                 goto failed;
1666         }
1667
1668         for (i = 0; i < *kranal_tunables.kra_n_connd; i++) {
1669                 rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i);
1670                 if (rc != 0) {
1671                         CERROR("Can't spawn ranal connd[%d]: %d\n",
1672                                i, rc);
1673                         goto failed;
1674                 }
1675         }
1676
1677         LASSERT (kranal_data.kra_ndevs == 0);
1678
1679         /* Use all available RapidArray devices */
1680         for (i = 0; i < RANAL_MAXDEVS; i++) {
1681                 dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
1682
1683                 rc = kranal_device_init(kranal_devids[i], dev);
1684                 if (rc == 0)
1685                         kranal_data.kra_ndevs++;
1686         }
1687
1688         if (kranal_data.kra_ndevs == 0) {
1689                 CERROR("Can't initialise any RapidArray devices\n");
1690                 goto failed;
1691         }
1692         
1693         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1694                 dev = &kranal_data.kra_devices[i];
1695                 rc = kranal_thread_start(kranal_scheduler, dev);
1696                 if (rc != 0) {
1697                         CERROR("Can't spawn ranal scheduler[%d]: %d\n",
1698                                i, rc);
1699                         goto failed;
1700                 }
1701         }
1702
1703         /* flag everything initialised */
1704         kranal_data.kra_init = RANAL_INIT_ALL;
1705         /*****************************************************/
1706
1707         CDEBUG(D_MALLOC, "initial kmem %d\n", pkmem);
1708         return 0;
1709
1710  failed:
1711         kranal_shutdown(ni);
1712         return -ENETDOWN;
1713 }
1714
1715 void __exit
1716 kranal_module_fini (void)
1717 {
1718         lnet_unregister_lnd(&the_kralnd);
1719         kranal_tunables_fini();
1720 }
1721
1722 int __init
1723 kranal_module_init (void)
1724 {
1725         int    rc;
1726
1727         rc = kranal_tunables_init();
1728         if (rc != 0)
1729                 return rc;
1730
1731         lnet_register_lnd(&the_kralnd);
1732
1733         return 0;
1734 }
1735
1736 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1737 MODULE_DESCRIPTION("Kernel RapidArray LND v0.01");
1738 MODULE_LICENSE("GPL");
1739
1740 module_init(kranal_module_init);
1741 module_exit(kranal_module_fini);