Whamcloud - gitweb
LU-1346 libcfs: cleanup libcfs primitive (linux-prim.h)
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ralnd/ralnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40 #include "ralnd.h"
41
42 static int        kranal_devids[RANAL_MAXDEVS] = {RAPK_MAIN_DEVICE_ID,
43                                                   RAPK_EXPANSION_DEVICE_ID};
44
45 lnd_t the_kralnd = {
46         .lnd_type       = RALND,
47         .lnd_startup    = kranal_startup,
48         .lnd_shutdown   = kranal_shutdown,
49         .lnd_ctl        = kranal_ctl,
50         .lnd_send       = kranal_send,
51         .lnd_recv       = kranal_recv,
52         .lnd_eager_recv = kranal_eager_recv,
53         .lnd_accept     = kranal_accept,
54 };
55
56 kra_data_t              kranal_data;
57
58 void
59 kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, lnet_nid_t dstnid)
60 {
61         RAP_RETURN   rrc;
62
63         memset(connreq, 0, sizeof(*connreq));
64
65         connreq->racr_magic     = RANAL_MSG_MAGIC;
66         connreq->racr_version   = RANAL_MSG_VERSION;
67
68         if (conn == NULL)                       /* prepping a "stub" reply */
69                 return;
70
71         connreq->racr_devid     = conn->rac_device->rad_id;
72         connreq->racr_srcnid    = kranal_data.kra_ni->ni_nid;
73         connreq->racr_dstnid    = dstnid;
74         connreq->racr_peerstamp = kranal_data.kra_peerstamp;
75         connreq->racr_connstamp = conn->rac_my_connstamp;
76         connreq->racr_timeout   = conn->rac_timeout;
77
78         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
79         LASSERT(rrc == RAP_SUCCESS);
80 }
81
82 int
83 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int active)
84 {
85         int         timeout = active ? *kranal_tunables.kra_timeout :
86                                         lnet_acceptor_timeout();
87         int         swab;
88         int         rc;
89
90         /* return 0 on success, -ve on error, +ve to tell the peer I'm "old" */
91
92         rc = libcfs_sock_read(sock, &connreq->racr_magic, 
93                               sizeof(connreq->racr_magic), timeout);
94         if (rc != 0) {
95                 CERROR("Read(magic) failed(1): %d\n", rc);
96                 return -EIO;
97         }
98
99         if (connreq->racr_magic != RANAL_MSG_MAGIC &&
100             connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {
101                 /* Unexpected magic! */
102                 if (!active &&
103                     (connreq->racr_magic == LNET_PROTO_MAGIC ||
104                      connreq->racr_magic == __swab32(LNET_PROTO_MAGIC))) {
105                         /* future protocol version compatibility!
106                          * When LNET unifies protocols over all LNDs, the first
107                          * thing sent will be a version query.  +ve rc means I
108                          * reply with my current magic/version */
109                         return EPROTO;
110                 }
111
112                 CERROR("Unexpected magic %08x (%s)\n",
113                        connreq->racr_magic, active ? "active" : "passive");
114                 return -EPROTO;
115         }
116
117         swab = (connreq->racr_magic == __swab32(RANAL_MSG_MAGIC));
118
119         rc = libcfs_sock_read(sock, &connreq->racr_version,
120                               sizeof(connreq->racr_version), timeout);
121         if (rc != 0) {
122                 CERROR("Read(version) failed: %d\n", rc);
123                 return -EIO;
124         }
125
126         if (swab)
127                 __swab16s(&connreq->racr_version);
128         
129         if (connreq->racr_version != RANAL_MSG_VERSION) {
130                 if (active) {
131                         CERROR("Unexpected version %d\n", connreq->racr_version);
132                         return -EPROTO;
133                 }
134                 /* If this is a future version of the ralnd protocol, and I'm
135                  * passive (accepted the connection), tell my peer I'm "old"
136                  * (+ve rc) */
137                 return EPROTO;
138         }
139
140         rc = libcfs_sock_read(sock, &connreq->racr_devid,
141                               sizeof(connreq->racr_version) -
142                               offsetof(kra_connreq_t, racr_devid),
143                               timeout);
144         if (rc != 0) {
145                 CERROR("Read(body) failed: %d\n", rc);
146                 return -EIO;
147         }
148
149         if (swab) {
150                 __swab32s(&connreq->racr_magic);
151                 __swab16s(&connreq->racr_version);
152                 __swab16s(&connreq->racr_devid);
153                 __swab64s(&connreq->racr_srcnid);
154                 __swab64s(&connreq->racr_dstnid);
155                 __swab64s(&connreq->racr_peerstamp);
156                 __swab64s(&connreq->racr_connstamp);
157                 __swab32s(&connreq->racr_timeout);
158
159                 __swab32s(&connreq->racr_riparams.HostId);
160                 __swab32s(&connreq->racr_riparams.FmaDomainHndl);
161                 __swab32s(&connreq->racr_riparams.PTag);
162                 __swab32s(&connreq->racr_riparams.CompletionCookie);
163         }
164
165         if (connreq->racr_srcnid == LNET_NID_ANY ||
166             connreq->racr_dstnid == LNET_NID_ANY) {
167                 CERROR("Received LNET_NID_ANY\n");
168                 return -EPROTO;
169         }
170
171         if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {
172                 CERROR("Received timeout %d < MIN %d\n",
173                        connreq->racr_timeout, RANAL_MIN_TIMEOUT);
174                 return -EPROTO;
175         }
176
177         return 0;
178 }
179
180 int
181 kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
182 {
183         kra_conn_t         *conn;
184         cfs_list_t         *ctmp;
185         cfs_list_t         *cnxt;
186         int                 loopback;
187         int                 count = 0;
188
189         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
190
191         cfs_list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
192                 conn = cfs_list_entry(ctmp, kra_conn_t, rac_list);
193
194                 if (conn == newconn)
195                         continue;
196
197                 if (conn->rac_peerstamp != newconn->rac_peerstamp) {
198                         CDEBUG(D_NET, "Closing stale conn nid: %s "
199                                " peerstamp:"LPX64"("LPX64")\n", 
200                                libcfs_nid2str(peer->rap_nid),
201                                conn->rac_peerstamp, newconn->rac_peerstamp);
202                         LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
203                         count++;
204                         kranal_close_conn_locked(conn, -ESTALE);
205                         continue;
206                 }
207
208                 if (conn->rac_device != newconn->rac_device)
209                         continue;
210
211                 if (loopback &&
212                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
213                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
214                         continue;
215
216                 LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
217
218                 CDEBUG(D_NET, "Closing stale conn nid: %s"
219                        " connstamp:"LPX64"("LPX64")\n", 
220                        libcfs_nid2str(peer->rap_nid),
221                        conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
222
223                 count++;
224                 kranal_close_conn_locked(conn, -ESTALE);
225         }
226
227         return count;
228 }
229
230 int
231 kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
232 {
233         kra_conn_t       *conn;
234         cfs_list_t       *tmp;
235         int               loopback;
236
237         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
238
239         cfs_list_for_each(tmp, &peer->rap_conns) {
240                 conn = cfs_list_entry(tmp, kra_conn_t, rac_list);
241
242                 /* 'newconn' is from an earlier version of 'peer'!!! */
243                 if (newconn->rac_peerstamp < conn->rac_peerstamp)
244                         return 1;
245
246                 /* 'conn' is from an earlier version of 'peer': it will be
247                  * removed when we cull stale conns later on... */
248                 if (newconn->rac_peerstamp > conn->rac_peerstamp)
249                         continue;
250
251                 /* Different devices are OK */
252                 if (conn->rac_device != newconn->rac_device)
253                         continue;
254
255                 /* It's me connecting to myself */
256                 if (loopback &&
257                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
258                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
259                         continue;
260
261                 /* 'newconn' is an earlier connection from 'peer'!!! */
262                 if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
263                         return 2;
264
265                 /* 'conn' is an earlier connection from 'peer': it will be
266                  * removed when we cull stale conns later on... */
267                 if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
268                         continue;
269
270                 /* 'newconn' has the SAME connection stamp; 'peer' isn't
271                  * playing the game... */
272                 return 3;
273         }
274
275         return 0;
276 }
277
278 void
279 kranal_set_conn_uniqueness (kra_conn_t *conn)
280 {
281         unsigned long  flags;
282
283         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
284
285         conn->rac_my_connstamp = kranal_data.kra_connstamp++;
286
287         do {    /* allocate a unique cqid */
288                 conn->rac_cqid = kranal_data.kra_next_cqid++;
289         } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
290
291         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
292 }
293
294 int
295 kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
296 {
297         kra_conn_t    *conn;
298         RAP_RETURN     rrc;
299
300         LASSERT (!in_interrupt());
301         LIBCFS_ALLOC(conn, sizeof(*conn));
302
303         if (conn == NULL)
304                 return -ENOMEM;
305
306         memset(conn, 0, sizeof(*conn));
307         cfs_atomic_set(&conn->rac_refcount, 1);
308         CFS_INIT_LIST_HEAD(&conn->rac_list);
309         CFS_INIT_LIST_HEAD(&conn->rac_hashlist);
310         CFS_INIT_LIST_HEAD(&conn->rac_schedlist);
311         CFS_INIT_LIST_HEAD(&conn->rac_fmaq);
312         CFS_INIT_LIST_HEAD(&conn->rac_rdmaq);
313         CFS_INIT_LIST_HEAD(&conn->rac_replyq);
314         spin_lock_init(&conn->rac_lock);
315
316         kranal_set_conn_uniqueness(conn);
317
318         conn->rac_device = dev;
319         conn->rac_timeout = MAX(*kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
320         kranal_update_reaper_timeout(conn->rac_timeout);
321
322         rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
323                            &conn->rac_rihandle);
324         if (rrc != RAP_SUCCESS) {
325                 CERROR("RapkCreateRi failed: %d\n", rrc);
326                 LIBCFS_FREE(conn, sizeof(*conn));
327                 return -ENETDOWN;
328         }
329
330         cfs_atomic_inc(&kranal_data.kra_nconns);
331         *connp = conn;
332         return 0;
333 }
334
335 void
336 kranal_destroy_conn(kra_conn_t *conn)
337 {
338         RAP_RETURN         rrc;
339
340         LASSERT (!in_interrupt());
341         LASSERT (!conn->rac_scheduled);
342         LASSERT (cfs_list_empty(&conn->rac_list));
343         LASSERT (cfs_list_empty(&conn->rac_hashlist));
344         LASSERT (cfs_list_empty(&conn->rac_schedlist));
345         LASSERT (cfs_atomic_read(&conn->rac_refcount) == 0);
346         LASSERT (cfs_list_empty(&conn->rac_fmaq));
347         LASSERT (cfs_list_empty(&conn->rac_rdmaq));
348         LASSERT (cfs_list_empty(&conn->rac_replyq));
349
350         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
351                             conn->rac_rihandle);
352         LASSERT (rrc == RAP_SUCCESS);
353
354         if (conn->rac_peer != NULL)
355                 kranal_peer_decref(conn->rac_peer);
356
357         LIBCFS_FREE(conn, sizeof(*conn));
358         cfs_atomic_dec(&kranal_data.kra_nconns);
359 }
360
361 void
362 kranal_terminate_conn_locked (kra_conn_t *conn)
363 {
364         LASSERT (!in_interrupt());
365         LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
366         LASSERT (!cfs_list_empty(&conn->rac_hashlist));
367         LASSERT (cfs_list_empty(&conn->rac_list));
368
369         /* Remove from conn hash table: no new callbacks */
370         cfs_list_del_init(&conn->rac_hashlist);
371         kranal_conn_decref(conn);
372
373         conn->rac_state = RANAL_CONN_CLOSED;
374
375         /* schedule to clear out all uncompleted comms in context of dev's
376          * scheduler */
377         kranal_schedule_conn(conn);
378 }
379
380 void
381 kranal_close_conn_locked (kra_conn_t *conn, int error)
382 {
383         kra_peer_t        *peer = conn->rac_peer;
384
385         CDEBUG_LIMIT(error == 0 ? D_NET : D_NETERROR,
386                      "closing conn to %s: error %d\n",
387                      libcfs_nid2str(peer->rap_nid), error);
388
389         LASSERT (!in_interrupt());
390         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
391         LASSERT (!cfs_list_empty(&conn->rac_hashlist));
392         LASSERT (!cfs_list_empty(&conn->rac_list));
393
394         cfs_list_del_init(&conn->rac_list);
395
396         if (cfs_list_empty(&peer->rap_conns) &&
397             peer->rap_persistence == 0) {
398                 /* Non-persistent peer with no more conns... */
399                 kranal_unlink_peer_locked(peer);
400         }
401
402         /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
403          * full timeout.  If we get a CLOSE we know the peer has stopped all
404          * RDMA.  Otherwise if we wait for the full timeout we can also be sure
405          * all RDMA has stopped. */
406         conn->rac_last_rx = jiffies;
407         smp_mb();
408
409         conn->rac_state = RANAL_CONN_CLOSING;
410         kranal_schedule_conn(conn);             /* schedule sending CLOSE */
411
412         kranal_conn_decref(conn);               /* lose peer's ref */
413 }
414
415 void
416 kranal_close_conn (kra_conn_t *conn, int error)
417 {
418         unsigned long    flags;
419
420
421         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
422
423         if (conn->rac_state == RANAL_CONN_ESTABLISHED)
424                 kranal_close_conn_locked(conn, error);
425
426         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
427 }
428
429 int
430 kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
431                        __u32 peer_ip, int peer_port)
432 {
433         kra_device_t  *dev = conn->rac_device;
434         unsigned long  flags;
435         RAP_RETURN     rrc;
436
437         /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive
438          * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */
439         conn->rac_last_tx = jiffies;
440         conn->rac_keepalive = 0;
441
442         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
443         if (rrc != RAP_SUCCESS) {
444                 CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
445                        HIPQUAD(peer_ip), peer_port, rrc);
446                 return -ECONNABORTED;
447         }
448
449         /* Schedule conn on rad_new_conns */
450         kranal_conn_addref(conn);
451         spin_lock_irqsave(&dev->rad_lock, flags);
452         cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns);
453         wake_up(&dev->rad_waitq);
454         spin_unlock_irqrestore(&dev->rad_lock, flags);
455
456         rrc = RapkWaitToConnect(conn->rac_rihandle);
457         if (rrc != RAP_SUCCESS) {
458                 CERROR("Error waiting to connect to %u.%u.%u.%u/%d: %d\n",
459                        HIPQUAD(peer_ip), peer_port, rrc);
460                 return -ECONNABORTED;
461         }
462
463         /* Scheduler doesn't touch conn apart from to deschedule and decref it
464          * after RapkCompleteSync() return success, so conn is all mine */
465
466         conn->rac_peerstamp = connreq->racr_peerstamp;
467         conn->rac_peer_connstamp = connreq->racr_connstamp;
468         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
469         kranal_update_reaper_timeout(conn->rac_keepalive);
470         return 0;
471 }
472
473 int
474 kranal_passive_conn_handshake (struct socket *sock, lnet_nid_t *src_nidp,
475                                lnet_nid_t *dst_nidp, kra_conn_t **connp)
476 {
477         __u32                peer_ip;
478         unsigned int         peer_port;
479         kra_connreq_t        rx_connreq;
480         kra_connreq_t        tx_connreq;
481         kra_conn_t          *conn;
482         kra_device_t        *dev;
483         int                  rc;
484         int                  i;
485
486         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
487         if (rc != 0) {
488                 CERROR("Can't get peer's IP: %d\n", rc);
489                 return rc;
490         }
491
492         rc = kranal_recv_connreq(sock, &rx_connreq, 0);
493
494         if (rc < 0) {
495                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
496                        HIPQUAD(peer_ip), peer_port, rc);
497                 return rc;
498         }
499
500         if (rc > 0) {
501                 /* Request from "new" peer: send reply with my MAGIC/VERSION to
502                  * tell her I'm old... */
503                 kranal_pack_connreq(&tx_connreq, NULL, LNET_NID_ANY);
504
505                 rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
506                                        lnet_acceptor_timeout());
507                 if (rc != 0)
508                         CERROR("Can't tx stub connreq to %u.%u.%u.%u/%d: %d\n",
509                                HIPQUAD(peer_ip), peer_port, rc);
510
511                 return -EPROTO;
512         }
513
514         for (i = 0;;i++) {
515                 if (i == kranal_data.kra_ndevs) {
516                         CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
517                                rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
518                         return -ENODEV;
519                 }
520                 dev = &kranal_data.kra_devices[i];
521                 if (dev->rad_id == rx_connreq.racr_devid)
522                         break;
523         }
524
525         rc = kranal_create_conn(&conn, dev);
526         if (rc != 0)
527                 return rc;
528
529         kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid);
530
531         rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
532                                lnet_acceptor_timeout());
533         if (rc != 0) {
534                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
535                        HIPQUAD(peer_ip), peer_port, rc);
536                 kranal_conn_decref(conn);
537                 return rc;
538         }
539
540         rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port);
541         if (rc != 0) {
542                 kranal_conn_decref(conn);
543                 return rc;
544         }
545
546         *connp = conn;
547         *src_nidp = rx_connreq.racr_srcnid;
548         *dst_nidp = rx_connreq.racr_dstnid;
549         return 0;
550 }
551
552 int
553 kranal_active_conn_handshake(kra_peer_t *peer,
554                              lnet_nid_t *dst_nidp, kra_conn_t **connp)
555 {
556         kra_connreq_t       connreq;
557         kra_conn_t         *conn;
558         kra_device_t       *dev;
559         struct socket      *sock;
560         int                 rc;
561         unsigned int        idx;
562
563         /* spread connections over all devices using both peer NIDs to ensure
564          * all nids use all devices */
565         idx = peer->rap_nid + kranal_data.kra_ni->ni_nid;
566         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
567
568         rc = kranal_create_conn(&conn, dev);
569         if (rc != 0)
570                 return rc;
571
572         kranal_pack_connreq(&connreq, conn, peer->rap_nid);
573
574         if (the_lnet.ln_testprotocompat != 0) {
575                 /* single-shot proto test */
576                 LNET_LOCK();
577                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
578                         connreq.racr_version++;
579                         the_lnet.ln_testprotocompat &= ~1;
580                 }
581                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
582                         connreq.racr_magic = LNET_PROTO_MAGIC;
583                         the_lnet.ln_testprotocompat &= ~2;
584                 }
585                 LNET_UNLOCK();
586         }
587
588         rc = lnet_connect(&sock, peer->rap_nid,
589                          0, peer->rap_ip, peer->rap_port);
590         if (rc != 0)
591                 goto failed_0;
592
593         /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
594          * immediately after accepting a connection, so we connect and then
595          * send immediately. */
596
597         rc = libcfs_sock_write(sock, &connreq, sizeof(connreq),
598                                lnet_acceptor_timeout());
599         if (rc != 0) {
600                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
601                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
602                 goto failed_2;
603         }
604
605         rc = kranal_recv_connreq(sock, &connreq, 1);
606         if (rc != 0) {
607                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
608                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
609                 goto failed_2;
610         }
611
612         libcfs_sock_release(sock);
613         rc = -EPROTO;
614
615         if (connreq.racr_srcnid != peer->rap_nid) {
616                 CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
617                        "received %s expected %s\n",
618                        HIPQUAD(peer->rap_ip), peer->rap_port,
619                        libcfs_nid2str(connreq.racr_srcnid), 
620                        libcfs_nid2str(peer->rap_nid));
621                 goto failed_1;
622         }
623
624         if (connreq.racr_devid != dev->rad_id) {
625                 CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
626                        "received %d expected %d\n",
627                        HIPQUAD(peer->rap_ip), peer->rap_port,
628                        connreq.racr_devid, dev->rad_id);
629                 goto failed_1;
630         }
631
632         rc = kranal_set_conn_params(conn, &connreq,
633                                     peer->rap_ip, peer->rap_port);
634         if (rc != 0)
635                 goto failed_1;
636
637         *connp = conn;
638         *dst_nidp = connreq.racr_dstnid;
639         return 0;
640
641  failed_2:
642         libcfs_sock_release(sock);
643  failed_1:
644         lnet_connect_console_error(rc, peer->rap_nid,
645                                   peer->rap_ip, peer->rap_port);
646  failed_0:
647         kranal_conn_decref(conn);
648         return rc;
649 }
650
651 int
652 kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
653 {
654         kra_peer_t        *peer2;
655         kra_tx_t          *tx;
656         lnet_nid_t         peer_nid;
657         lnet_nid_t         dst_nid;
658         unsigned long      flags;
659         kra_conn_t        *conn;
660         int                rc;
661         int                nstale;
662         int                new_peer = 0;
663
664         if (sock == NULL) {
665                 /* active: connd wants to connect to 'peer' */
666                 LASSERT (peer != NULL);
667                 LASSERT (peer->rap_connecting);
668
669                 rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
670                 if (rc != 0)
671                         return rc;
672
673                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
674
675                 if (!kranal_peer_active(peer)) {
676                         /* raced with peer getting unlinked */
677                         write_unlock_irqrestore(&kranal_data. \
678                                                     kra_global_lock,
679                                                     flags);
680                         kranal_conn_decref(conn);
681                         return -ESTALE;
682                 }
683
684                 peer_nid = peer->rap_nid;
685         } else {
686                 /* passive: listener accepted 'sock' */
687                 LASSERT (peer == NULL);
688
689                 rc = kranal_passive_conn_handshake(sock, &peer_nid,
690                                                    &dst_nid, &conn);
691                 if (rc != 0)
692                         return rc;
693
694                 /* assume this is a new peer */
695                 rc = kranal_create_peer(&peer, peer_nid);
696                 if (rc != 0) {
697                         CERROR("Can't create conn for %s\n", 
698                                libcfs_nid2str(peer_nid));
699                         kranal_conn_decref(conn);
700                         return -ENOMEM;
701                 }
702
703                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
704
705                 peer2 = kranal_find_peer_locked(peer_nid);
706                 if (peer2 == NULL) {
707                         new_peer = 1;
708                 } else {
709                         /* peer_nid already in the peer table */
710                         kranal_peer_decref(peer);
711                         peer = peer2;
712                 }
713         }
714
715         LASSERT ((!new_peer) != (!kranal_peer_active(peer)));
716
717         /* Refuse connection if peer thinks we are a different NID.  We check
718          * this while holding the global lock, to synch with connection
719          * destruction on NID change. */
720         if (kranal_data.kra_ni->ni_nid != dst_nid) {
721                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
722                                             flags);
723
724                 CERROR("Stale/bad connection with %s: dst_nid %s, expected %s\n",
725                        libcfs_nid2str(peer_nid), libcfs_nid2str(dst_nid), 
726                        libcfs_nid2str(kranal_data.kra_ni->ni_nid));
727                 rc = -ESTALE;
728                 goto failed;
729         }
730
731         /* Refuse to duplicate an existing connection (both sides might try to
732          * connect at once).  NB we return success!  We _are_ connected so we
733          * _don't_ have any blocked txs to complete with failure. */
734         rc = kranal_conn_isdup_locked(peer, conn);
735         if (rc != 0) {
736                 LASSERT (!cfs_list_empty(&peer->rap_conns));
737                 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
738                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
739                                             flags);
740                 CWARN("Not creating duplicate connection to %s: %d\n",
741                       libcfs_nid2str(peer_nid), rc);
742                 rc = 0;
743                 goto failed;
744         }
745
746         if (new_peer) {
747                 /* peer table takes my ref on the new peer */
748                 cfs_list_add_tail(&peer->rap_list,
749                                   kranal_nid2peerlist(peer_nid));
750         }
751
752         /* initialise timestamps before reaper looks at them */
753         conn->rac_last_tx = conn->rac_last_rx = jiffies;
754
755         kranal_peer_addref(peer);               /* +1 ref for conn */
756         conn->rac_peer = peer;
757         cfs_list_add_tail(&conn->rac_list, &peer->rap_conns);
758
759         kranal_conn_addref(conn);               /* +1 ref for conn table */
760         cfs_list_add_tail(&conn->rac_hashlist,
761                           kranal_cqid2connlist(conn->rac_cqid));
762
763         /* Schedule all packets blocking for a connection */
764         while (!cfs_list_empty(&peer->rap_tx_queue)) {
765                 tx = cfs_list_entry(peer->rap_tx_queue.next,
766                                     kra_tx_t, tx_list);
767
768                 cfs_list_del(&tx->tx_list);
769                 kranal_post_fma(conn, tx);
770         }
771
772         nstale = kranal_close_stale_conns_locked(peer, conn);
773
774         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
775
776         /* CAVEAT EMPTOR: passive peer can disappear NOW */
777
778         if (nstale != 0)
779                 CWARN("Closed %d stale conns to %s\n", nstale, 
780                       libcfs_nid2str(peer_nid));
781
782         CWARN("New connection to %s on devid[%d] = %d\n",
783                libcfs_nid2str(peer_nid), 
784                conn->rac_device->rad_idx, conn->rac_device->rad_id);
785
786         /* Ensure conn gets checked.  Transmits may have been queued and an
787          * FMA event may have happened before it got in the cq hash table */
788         kranal_schedule_conn(conn);
789         return 0;
790
791  failed:
792         if (new_peer)
793                 kranal_peer_decref(peer);
794         kranal_conn_decref(conn);
795         return rc;
796 }
797
798 void
799 kranal_connect (kra_peer_t *peer)
800 {
801         kra_tx_t          *tx;
802         unsigned long      flags;
803         cfs_list_t         zombies;
804         int                rc;
805
806         LASSERT (peer->rap_connecting);
807
808         CDEBUG(D_NET, "About to handshake %s\n", 
809                libcfs_nid2str(peer->rap_nid));
810
811         rc = kranal_conn_handshake(NULL, peer);
812
813         CDEBUG(D_NET, "Done handshake %s:%d \n", 
814                libcfs_nid2str(peer->rap_nid), rc);
815
816         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
817
818         LASSERT (peer->rap_connecting);
819         peer->rap_connecting = 0;
820
821         if (rc == 0) {
822                 /* kranal_conn_handshake() queues blocked txs immediately on
823                  * success to avoid messages jumping the queue */
824                 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
825
826                 peer->rap_reconnect_interval = 0; /* OK to reconnect at any time */
827
828                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
829                                             flags);
830                 return;
831         }
832
833         peer->rap_reconnect_interval *= 2;
834         peer->rap_reconnect_interval =
835                 MAX(peer->rap_reconnect_interval,
836                     *kranal_tunables.kra_min_reconnect_interval);
837         peer->rap_reconnect_interval =
838                 MIN(peer->rap_reconnect_interval,
839                     *kranal_tunables.kra_max_reconnect_interval);
840
841         peer->rap_reconnect_time = jiffies + peer->rap_reconnect_interval * HZ;
842
843         /* Grab all blocked packets while we have the global lock */
844         cfs_list_add(&zombies, &peer->rap_tx_queue);
845         cfs_list_del_init(&peer->rap_tx_queue);
846
847         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
848
849         if (cfs_list_empty(&zombies))
850                 return;
851
852         CNETERR("Dropping packets for %s: connection failed\n",
853                 libcfs_nid2str(peer->rap_nid));
854
855         do {
856                 tx = cfs_list_entry(zombies.next, kra_tx_t, tx_list);
857
858                 cfs_list_del(&tx->tx_list);
859                 kranal_tx_done(tx, -EHOSTUNREACH);
860
861         } while (!cfs_list_empty(&zombies));
862 }
863
864 void
865 kranal_free_acceptsock (kra_acceptsock_t *ras)
866 {
867         libcfs_sock_release(ras->ras_sock);
868         LIBCFS_FREE(ras, sizeof(*ras));
869 }
870
871 int
872 kranal_accept (lnet_ni_t *ni, struct socket *sock)
873 {
874         kra_acceptsock_t  *ras;
875         int                rc;
876         __u32              peer_ip;
877         int                peer_port;
878         unsigned long      flags;
879
880         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
881         LASSERT (rc == 0);                      /* we succeeded before */
882
883         LIBCFS_ALLOC(ras, sizeof(*ras));
884         if (ras == NULL) {
885                 CERROR("ENOMEM allocating connection request from "
886                        "%u.%u.%u.%u\n", HIPQUAD(peer_ip));
887                 return -ENOMEM;
888         }
889
890         ras->ras_sock = sock;
891
892         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
893
894         cfs_list_add_tail(&ras->ras_list, &kranal_data.kra_connd_acceptq);
895         wake_up(&kranal_data.kra_connd_waitq);
896
897         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
898         return 0;
899 }
900
901 int
902 kranal_create_peer (kra_peer_t **peerp, lnet_nid_t nid)
903 {
904         kra_peer_t    *peer;
905         unsigned long  flags;
906
907         LASSERT (nid != LNET_NID_ANY);
908
909         LIBCFS_ALLOC(peer, sizeof(*peer));
910         if (peer == NULL)
911                 return -ENOMEM;
912
913         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
914
915         peer->rap_nid = nid;
916         cfs_atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
917
918         CFS_INIT_LIST_HEAD(&peer->rap_list);
919         CFS_INIT_LIST_HEAD(&peer->rap_connd_list);
920         CFS_INIT_LIST_HEAD(&peer->rap_conns);
921         CFS_INIT_LIST_HEAD(&peer->rap_tx_queue);
922
923         peer->rap_reconnect_interval = 0;       /* OK to connect at any time */
924
925         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
926
927         if (kranal_data.kra_nonewpeers) {
928                 /* shutdown has started already */
929                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
930                                             flags);
931
932                 LIBCFS_FREE(peer, sizeof(*peer));
933                 CERROR("Can't create peer: network shutdown\n");
934                 return -ESHUTDOWN;
935         }
936
937         cfs_atomic_inc(&kranal_data.kra_npeers);
938
939         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
940
941         *peerp = peer;
942         return 0;
943 }
944
945 void
946 kranal_destroy_peer (kra_peer_t *peer)
947 {
948         CDEBUG(D_NET, "peer %s %p deleted\n", 
949                libcfs_nid2str(peer->rap_nid), peer);
950
951         LASSERT (cfs_atomic_read(&peer->rap_refcount) == 0);
952         LASSERT (peer->rap_persistence == 0);
953         LASSERT (!kranal_peer_active(peer));
954         LASSERT (!peer->rap_connecting);
955         LASSERT (cfs_list_empty(&peer->rap_conns));
956         LASSERT (cfs_list_empty(&peer->rap_tx_queue));
957         LASSERT (cfs_list_empty(&peer->rap_connd_list));
958
959         LIBCFS_FREE(peer, sizeof(*peer));
960
961         /* NB a peer's connections keep a reference on their peer until
962          * they are destroyed, so we can be assured that _all_ state to do
963          * with this peer has been cleaned up when its refcount drops to
964          * zero. */
965         cfs_atomic_dec(&kranal_data.kra_npeers);
966 }
967
968 kra_peer_t *
969 kranal_find_peer_locked (lnet_nid_t nid)
970 {
971         cfs_list_t       *peer_list = kranal_nid2peerlist(nid);
972         cfs_list_t       *tmp;
973         kra_peer_t       *peer;
974
975         cfs_list_for_each (tmp, peer_list) {
976
977                 peer = cfs_list_entry(tmp, kra_peer_t, rap_list);
978
979                 LASSERT (peer->rap_persistence > 0 ||     /* persistent peer */
980                          !cfs_list_empty(&peer->rap_conns));  /* active conn */
981
982                 if (peer->rap_nid != nid)
983                         continue;
984
985                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
986                        peer, libcfs_nid2str(nid), 
987                        cfs_atomic_read(&peer->rap_refcount));
988                 return peer;
989         }
990         return NULL;
991 }
992
993 kra_peer_t *
994 kranal_find_peer (lnet_nid_t nid)
995 {
996         kra_peer_t     *peer;
997
998         read_lock(&kranal_data.kra_global_lock);
999         peer = kranal_find_peer_locked(nid);
1000         if (peer != NULL)                       /* +1 ref for caller? */
1001                 kranal_peer_addref(peer);
1002         read_unlock(&kranal_data.kra_global_lock);
1003
1004         return peer;
1005 }
1006
1007 void
1008 kranal_unlink_peer_locked (kra_peer_t *peer)
1009 {
1010         LASSERT (peer->rap_persistence == 0);
1011         LASSERT (cfs_list_empty(&peer->rap_conns));
1012
1013         LASSERT (kranal_peer_active(peer));
1014         cfs_list_del_init(&peer->rap_list);
1015
1016         /* lose peerlist's ref */
1017         kranal_peer_decref(peer);
1018 }
1019
1020 int
1021 kranal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,
1022                       int *persistencep)
1023 {
1024         kra_peer_t        *peer;
1025         cfs_list_t        *ptmp;
1026         int                i;
1027
1028         read_lock(&kranal_data.kra_global_lock);
1029
1030         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1031
1032                 cfs_list_for_each(ptmp, &kranal_data.kra_peers[i]) {
1033
1034                         peer = cfs_list_entry(ptmp, kra_peer_t, rap_list);
1035                         LASSERT (peer->rap_persistence > 0 ||
1036                                  !cfs_list_empty(&peer->rap_conns));
1037
1038                         if (index-- > 0)
1039                                 continue;
1040
1041                         *nidp = peer->rap_nid;
1042                         *ipp = peer->rap_ip;
1043                         *portp = peer->rap_port;
1044                         *persistencep = peer->rap_persistence;
1045
1046                         read_unlock(&kranal_data.kra_global_lock);
1047                         return 0;
1048                 }
1049         }
1050
1051         read_unlock(&kranal_data.kra_global_lock);
1052         return -ENOENT;
1053 }
1054
1055 int
1056 kranal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port)
1057 {
1058         unsigned long      flags;
1059         kra_peer_t        *peer;
1060         kra_peer_t        *peer2;
1061         int                rc;
1062
1063         if (nid == LNET_NID_ANY)
1064                 return -EINVAL;
1065
1066         rc = kranal_create_peer(&peer, nid);
1067         if (rc != 0)
1068                 return rc;
1069
1070         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1071
1072         peer2 = kranal_find_peer_locked(nid);
1073         if (peer2 != NULL) {
1074                 kranal_peer_decref(peer);
1075                 peer = peer2;
1076         } else {
1077                 /* peer table takes existing ref on peer */
1078                 cfs_list_add_tail(&peer->rap_list,
1079                               kranal_nid2peerlist(nid));
1080         }
1081
1082         peer->rap_ip = ip;
1083         peer->rap_port = port;
1084         peer->rap_persistence++;
1085
1086         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1087         return 0;
1088 }
1089
1090 void
1091 kranal_del_peer_locked (kra_peer_t *peer)
1092 {
1093         cfs_list_t       *ctmp;
1094         cfs_list_t       *cnxt;
1095         kra_conn_t       *conn;
1096
1097         peer->rap_persistence = 0;
1098
1099         if (cfs_list_empty(&peer->rap_conns)) {
1100                 kranal_unlink_peer_locked(peer);
1101         } else {
1102                 cfs_list_for_each_safe(ctmp, cnxt, &peer->rap_conns) {
1103                         conn = cfs_list_entry(ctmp, kra_conn_t, rac_list);
1104
1105                         kranal_close_conn_locked(conn, 0);
1106                 }
1107                 /* peer unlinks itself when last conn is closed */
1108         }
1109 }
1110
1111 int
1112 kranal_del_peer (lnet_nid_t nid)
1113 {
1114         unsigned long      flags;
1115         cfs_list_t        *ptmp;
1116         cfs_list_t        *pnxt;
1117         kra_peer_t        *peer;
1118         int                lo;
1119         int                hi;
1120         int                i;
1121         int                rc = -ENOENT;
1122
1123         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1124
1125         if (nid != LNET_NID_ANY)
1126                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1127         else {
1128                 lo = 0;
1129                 hi = kranal_data.kra_peer_hash_size - 1;
1130         }
1131
1132         for (i = lo; i <= hi; i++) {
1133                 cfs_list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1134                         peer = cfs_list_entry(ptmp, kra_peer_t, rap_list);
1135                         LASSERT (peer->rap_persistence > 0 ||
1136                                  !cfs_list_empty(&peer->rap_conns));
1137
1138                         if (!(nid == LNET_NID_ANY || peer->rap_nid == nid))
1139                                 continue;
1140
1141                         kranal_del_peer_locked(peer);
1142                         rc = 0;         /* matched something */
1143                 }
1144         }
1145
1146         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1147
1148         return rc;
1149 }
1150
1151 kra_conn_t *
1152 kranal_get_conn_by_idx (int index)
1153 {
1154         kra_peer_t        *peer;
1155         cfs_list_t        *ptmp;
1156         kra_conn_t        *conn;
1157         cfs_list_t        *ctmp;
1158         int                i;
1159
1160         read_lock(&kranal_data.kra_global_lock);
1161
1162         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1163                 cfs_list_for_each (ptmp, &kranal_data.kra_peers[i]) {
1164
1165                         peer = cfs_list_entry(ptmp, kra_peer_t, rap_list);
1166                         LASSERT (peer->rap_persistence > 0 ||
1167                                  !cfs_list_empty(&peer->rap_conns));
1168
1169                         cfs_list_for_each (ctmp, &peer->rap_conns) {
1170                                 if (index-- > 0)
1171                                         continue;
1172
1173                                 conn = cfs_list_entry(ctmp, kra_conn_t,
1174                                                       rac_list);
1175                                 CDEBUG(D_NET, "++conn[%p] -> %s (%d)\n", conn,
1176                                        libcfs_nid2str(conn->rac_peer->rap_nid),
1177                                        cfs_atomic_read(&conn->rac_refcount));
1178                                 cfs_atomic_inc(&conn->rac_refcount);
1179                                 read_unlock(&kranal_data.kra_global_lock);
1180                                 return conn;
1181                         }
1182                 }
1183         }
1184
1185         read_unlock(&kranal_data.kra_global_lock);
1186         return NULL;
1187 }
1188
1189 int
1190 kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
1191 {
1192         kra_conn_t         *conn;
1193         cfs_list_t         *ctmp;
1194         cfs_list_t         *cnxt;
1195         int                 count = 0;
1196
1197         cfs_list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
1198                 conn = cfs_list_entry(ctmp, kra_conn_t, rac_list);
1199
1200                 count++;
1201                 kranal_close_conn_locked(conn, why);
1202         }
1203
1204         return count;
1205 }
1206
1207 int
1208 kranal_close_matching_conns (lnet_nid_t nid)
1209 {
1210         unsigned long       flags;
1211         kra_peer_t         *peer;
1212         cfs_list_t         *ptmp;
1213         cfs_list_t         *pnxt;
1214         int                 lo;
1215         int                 hi;
1216         int                 i;
1217         int                 count = 0;
1218
1219         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1220
1221         if (nid != LNET_NID_ANY)
1222                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1223         else {
1224                 lo = 0;
1225                 hi = kranal_data.kra_peer_hash_size - 1;
1226         }
1227
1228         for (i = lo; i <= hi; i++) {
1229                 cfs_list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1230
1231                         peer = cfs_list_entry(ptmp, kra_peer_t, rap_list);
1232                         LASSERT (peer->rap_persistence > 0 ||
1233                                  !cfs_list_empty(&peer->rap_conns));
1234
1235                         if (!(nid == LNET_NID_ANY || nid == peer->rap_nid))
1236                                 continue;
1237
1238                         count += kranal_close_peer_conns_locked(peer, 0);
1239                 }
1240         }
1241
1242         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1243
1244         /* wildcards always succeed */
1245         if (nid == LNET_NID_ANY)
1246                 return 0;
1247
1248         return (count == 0) ? -ENOENT : 0;
1249 }
1250
1251 int
1252 kranal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1253 {
1254         struct libcfs_ioctl_data *data = arg;
1255         int                       rc = -EINVAL;
1256
1257         LASSERT (ni == kranal_data.kra_ni);
1258
1259         switch(cmd) {
1260         case IOC_LIBCFS_GET_PEER: {
1261                 lnet_nid_t   nid = 0;
1262                 __u32       ip = 0;
1263                 int         port = 0;
1264                 int         share_count = 0;
1265
1266                 rc = kranal_get_peer_info(data->ioc_count,
1267                                           &nid, &ip, &port, &share_count);
1268                 data->ioc_nid    = nid;
1269                 data->ioc_count  = share_count;
1270                 data->ioc_u32[0] = ip;
1271                 data->ioc_u32[1] = port;
1272                 break;
1273         }
1274         case IOC_LIBCFS_ADD_PEER: {
1275                 rc = kranal_add_persistent_peer(data->ioc_nid,
1276                                                 data->ioc_u32[0], /* IP */
1277                                                 data->ioc_u32[1]); /* port */
1278                 break;
1279         }
1280         case IOC_LIBCFS_DEL_PEER: {
1281                 rc = kranal_del_peer(data->ioc_nid);
1282                 break;
1283         }
1284         case IOC_LIBCFS_GET_CONN: {
1285                 kra_conn_t *conn = kranal_get_conn_by_idx(data->ioc_count);
1286
1287                 if (conn == NULL)
1288                         rc = -ENOENT;
1289                 else {
1290                         rc = 0;
1291                         data->ioc_nid    = conn->rac_peer->rap_nid;
1292                         data->ioc_u32[0] = conn->rac_device->rad_id;
1293                         kranal_conn_decref(conn);
1294                 }
1295                 break;
1296         }
1297         case IOC_LIBCFS_CLOSE_CONNECTION: {
1298                 rc = kranal_close_matching_conns(data->ioc_nid);
1299                 break;
1300         }
1301         case IOC_LIBCFS_REGISTER_MYNID: {
1302                 /* Ignore if this is a noop */
1303                 if (data->ioc_nid == ni->ni_nid) {
1304                         rc = 0;
1305                 } else {
1306                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1307                                libcfs_nid2str(data->ioc_nid),
1308                                libcfs_nid2str(ni->ni_nid));
1309                         rc = -EINVAL;
1310                 }
1311                 break;
1312         }
1313         }
1314
1315         return rc;
1316 }
1317
1318 void
1319 kranal_free_txdescs(cfs_list_t *freelist)
1320 {
1321         kra_tx_t    *tx;
1322
1323         while (!cfs_list_empty(freelist)) {
1324                 tx = cfs_list_entry(freelist->next, kra_tx_t, tx_list);
1325
1326                 cfs_list_del(&tx->tx_list);
1327                 LIBCFS_FREE(tx->tx_phys, LNET_MAX_IOV * sizeof(*tx->tx_phys));
1328                 LIBCFS_FREE(tx, sizeof(*tx));
1329         }
1330 }
1331
1332 int
1333 kranal_alloc_txdescs(cfs_list_t *freelist, int n)
1334 {
1335         int            i;
1336         kra_tx_t      *tx;
1337
1338         LASSERT (freelist == &kranal_data.kra_idle_txs);
1339         LASSERT (cfs_list_empty(freelist));
1340
1341         for (i = 0; i < n; i++) {
1342
1343                 LIBCFS_ALLOC(tx, sizeof(*tx));
1344                 if (tx == NULL) {
1345                         CERROR("Can't allocate tx[%d]\n", i);
1346                         kranal_free_txdescs(freelist);
1347                         return -ENOMEM;
1348                 }
1349
1350                 LIBCFS_ALLOC(tx->tx_phys,
1351                              LNET_MAX_IOV * sizeof(*tx->tx_phys));
1352                 if (tx->tx_phys == NULL) {
1353                         CERROR("Can't allocate tx[%d]->tx_phys\n", i);
1354
1355                         LIBCFS_FREE(tx, sizeof(*tx));
1356                         kranal_free_txdescs(freelist);
1357                         return -ENOMEM;
1358                 }
1359
1360                 tx->tx_buftype = RANAL_BUF_NONE;
1361                 tx->tx_msg.ram_type = RANAL_MSG_NONE;
1362
1363                 cfs_list_add(&tx->tx_list, freelist);
1364         }
1365
1366         return 0;
1367 }
1368
1369 int
1370 kranal_device_init(int id, kra_device_t *dev)
1371 {
1372         int               total_ntx = *kranal_tunables.kra_ntx;
1373         RAP_RETURN        rrc;
1374
1375         dev->rad_id = id;
1376         rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
1377                                    &dev->rad_handle);
1378         if (rrc != RAP_SUCCESS) {
1379                 CERROR("Can't get Rapidarray Device %d: %d\n", id, rrc);
1380                 goto failed_0;
1381         }
1382
1383         rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
1384         if (rrc != RAP_SUCCESS) {
1385                 CERROR("Can't reserve %d RDMA descriptors"
1386                        " for device %d: %d\n", total_ntx, id, rrc);
1387                 goto failed_1;
1388         }
1389
1390         rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
1391                            &dev->rad_rdma_cqh);
1392         if (rrc != RAP_SUCCESS) {
1393                 CERROR("Can't create rdma cq size %d for device %d: %d\n",
1394                        total_ntx, id, rrc);
1395                 goto failed_1;
1396         }
1397
1398         rrc = RapkCreateCQ(dev->rad_handle, 
1399                            *kranal_tunables.kra_fma_cq_size, 
1400                            RAP_CQTYPE_RECV, &dev->rad_fma_cqh);
1401         if (rrc != RAP_SUCCESS) {
1402                 CERROR("Can't create fma cq size %d for device %d: %d\n", 
1403                        *kranal_tunables.kra_fma_cq_size, id, rrc);
1404                 goto failed_2;
1405         }
1406
1407         return 0;
1408
1409  failed_2:
1410         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1411  failed_1:
1412         RapkReleaseDevice(dev->rad_handle);
1413  failed_0:
1414         return -ENODEV;
1415 }
1416
1417 void
1418 kranal_device_fini(kra_device_t *dev)
1419 {
1420         LASSERT (cfs_list_empty(&dev->rad_ready_conns));
1421         LASSERT (cfs_list_empty(&dev->rad_new_conns));
1422         LASSERT (dev->rad_nphysmap == 0);
1423         LASSERT (dev->rad_nppphysmap == 0);
1424         LASSERT (dev->rad_nvirtmap == 0);
1425         LASSERT (dev->rad_nobvirtmap == 0);
1426
1427         LASSERT(dev->rad_scheduler == NULL);
1428         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
1429         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1430         RapkReleaseDevice(dev->rad_handle);
1431 }
1432
1433 void
1434 kranal_shutdown (lnet_ni_t *ni)
1435 {
1436         int           i;
1437         unsigned long flags;
1438
1439         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1440                cfs_atomic_read(&libcfs_kmemory));
1441
1442         LASSERT (ni == kranal_data.kra_ni);
1443         LASSERT (ni->ni_data == &kranal_data);
1444
1445         switch (kranal_data.kra_init) {
1446         default:
1447                 CERROR("Unexpected state %d\n", kranal_data.kra_init);
1448                 LBUG();
1449
1450         case RANAL_INIT_ALL:
1451                 /* Prevent new peers from being created */
1452                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1453                 kranal_data.kra_nonewpeers = 1;
1454                 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1455                                             flags);
1456
1457                 /* Remove all existing peers from the peer table */
1458                 kranal_del_peer(LNET_NID_ANY);
1459
1460                 /* Wait for pending conn reqs to be handled */
1461                 i = 2;
1462                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1463                 while (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
1464                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1465                                                    flags);
1466                         i++;
1467                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1468                                "waiting for conn reqs to clean up\n");
1469                         cfs_pause(cfs_time_seconds(1));
1470
1471                         spin_lock_irqsave(&kranal_data.kra_connd_lock,
1472                                               flags);
1473                 }
1474                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1475
1476                 /* Wait for all peers to be freed */
1477                 i = 2;
1478                 while (cfs_atomic_read(&kranal_data.kra_npeers) != 0) {
1479                         i++;
1480                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1481                                "waiting for %d peers to close down\n",
1482                                cfs_atomic_read(&kranal_data.kra_npeers));
1483                         cfs_pause(cfs_time_seconds(1));
1484                 }
1485                 /* fall through */
1486
1487         case RANAL_INIT_DATA:
1488                 break;
1489         }
1490
1491         /* Peer state all cleaned up BEFORE setting shutdown, so threads don't
1492          * have to worry about shutdown races.  NB connections may be created
1493          * while there are still active connds, but these will be temporary
1494          * since peer creation always fails after the listener has started to
1495          * shut down. */
1496         LASSERT (cfs_atomic_read(&kranal_data.kra_npeers) == 0);
1497         
1498         /* Flag threads to terminate */
1499         kranal_data.kra_shutdown = 1;
1500
1501         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1502                 kra_device_t *dev = &kranal_data.kra_devices[i];
1503
1504                 spin_lock_irqsave(&dev->rad_lock, flags);
1505                 wake_up(&dev->rad_waitq);
1506                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1507         }
1508
1509         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1510         wake_up_all(&kranal_data.kra_reaper_waitq);
1511         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1512
1513         LASSERT (cfs_list_empty(&kranal_data.kra_connd_peers));
1514         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1515         wake_up_all(&kranal_data.kra_connd_waitq);
1516         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1517
1518         /* Wait for threads to exit */
1519         i = 2;
1520         while (cfs_atomic_read(&kranal_data.kra_nthreads) != 0) {
1521                 i++;
1522                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1523                        "Waiting for %d threads to terminate\n",
1524                        cfs_atomic_read(&kranal_data.kra_nthreads));
1525                 cfs_pause(cfs_time_seconds(1));
1526         }
1527
1528         LASSERT (cfs_atomic_read(&kranal_data.kra_npeers) == 0);
1529         if (kranal_data.kra_peers != NULL) {
1530                 for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1531                         LASSERT (cfs_list_empty(&kranal_data.kra_peers[i]));
1532
1533                 LIBCFS_FREE(kranal_data.kra_peers,
1534                             sizeof (cfs_list_t) *
1535                             kranal_data.kra_peer_hash_size);
1536         }
1537
1538         LASSERT (cfs_atomic_read(&kranal_data.kra_nconns) == 0);
1539         if (kranal_data.kra_conns != NULL) {
1540                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1541                         LASSERT (cfs_list_empty(&kranal_data.kra_conns[i]));
1542
1543                 LIBCFS_FREE(kranal_data.kra_conns,
1544                             sizeof (cfs_list_t) *
1545                             kranal_data.kra_conn_hash_size);
1546         }
1547
1548         for (i = 0; i < kranal_data.kra_ndevs; i++)
1549                 kranal_device_fini(&kranal_data.kra_devices[i]);
1550
1551         kranal_free_txdescs(&kranal_data.kra_idle_txs);
1552
1553         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1554                cfs_atomic_read(&libcfs_kmemory));
1555
1556         kranal_data.kra_init = RANAL_INIT_NOTHING;
1557         module_put(THIS_MODULE);
1558 }
1559
1560 int
1561 kranal_startup (lnet_ni_t *ni)
1562 {
1563         struct timeval    tv;
1564         int               pkmem = cfs_atomic_read(&libcfs_kmemory);
1565         int               rc;
1566         int               i;
1567         kra_device_t     *dev;
1568         char              name[16];
1569
1570         LASSERT (ni->ni_lnd == &the_kralnd);
1571
1572         /* Only 1 instance supported */
1573         if (kranal_data.kra_init != RANAL_INIT_NOTHING) {
1574                 CERROR ("Only 1 instance supported\n");
1575                 return -EPERM;
1576         }
1577
1578         if (lnet_set_ip_niaddr(ni) != 0) {
1579                 CERROR ("Can't determine my NID\n");
1580                 return -EPERM;
1581         }
1582
1583         if (*kranal_tunables.kra_credits > *kranal_tunables.kra_ntx) {
1584                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1585                         *kranal_tunables.kra_credits,
1586                         *kranal_tunables.kra_ntx);
1587                 return -EINVAL;
1588         }
1589         
1590         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
1591
1592         ni->ni_maxtxcredits = *kranal_tunables.kra_credits;
1593         ni->ni_peertxcredits = *kranal_tunables.kra_peercredits;
1594
1595         ni->ni_data = &kranal_data;
1596         kranal_data.kra_ni = ni;
1597
1598         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
1599          * a unique (for all time) connstamp so we can uniquely identify
1600          * the sender.  The connstamp is an incrementing counter
1601          * initialised with seconds + microseconds at startup time.  So we
1602          * rely on NOT creating connections more frequently on average than
1603          * 1MHz to ensure we don't use old connstamps when we reboot. */
1604         do_gettimeofday(&tv);
1605         kranal_data.kra_connstamp =
1606         kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1607
1608         rwlock_init(&kranal_data.kra_global_lock);
1609
1610         for (i = 0; i < RANAL_MAXDEVS; i++ ) {
1611                 kra_device_t  *dev = &kranal_data.kra_devices[i];
1612
1613                 dev->rad_idx = i;
1614                 CFS_INIT_LIST_HEAD(&dev->rad_ready_conns);
1615                 CFS_INIT_LIST_HEAD(&dev->rad_new_conns);
1616                 init_waitqueue_head(&dev->rad_waitq);
1617                 spin_lock_init(&dev->rad_lock);
1618         }
1619
1620         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1621         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
1622         spin_lock_init(&kranal_data.kra_reaper_lock);
1623
1624         CFS_INIT_LIST_HEAD(&kranal_data.kra_connd_acceptq);
1625         CFS_INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
1626         init_waitqueue_head(&kranal_data.kra_connd_waitq);
1627         spin_lock_init(&kranal_data.kra_connd_lock);
1628
1629         CFS_INIT_LIST_HEAD(&kranal_data.kra_idle_txs);
1630         spin_lock_init(&kranal_data.kra_tx_lock);
1631
1632         /* OK to call kranal_api_shutdown() to cleanup now */
1633         kranal_data.kra_init = RANAL_INIT_DATA;
1634         try_module_get(THIS_MODULE);
1635
1636         kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
1637         LIBCFS_ALLOC(kranal_data.kra_peers,
1638                      sizeof(cfs_list_t) *
1639                             kranal_data.kra_peer_hash_size);
1640         if (kranal_data.kra_peers == NULL)
1641                 goto failed;
1642
1643         for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1644                 CFS_INIT_LIST_HEAD(&kranal_data.kra_peers[i]);
1645
1646         kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE;
1647         LIBCFS_ALLOC(kranal_data.kra_conns,
1648                      sizeof(cfs_list_t) *
1649                             kranal_data.kra_conn_hash_size);
1650         if (kranal_data.kra_conns == NULL)
1651                 goto failed;
1652
1653         for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1654                 CFS_INIT_LIST_HEAD(&kranal_data.kra_conns[i]);
1655
1656         rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, 
1657                                   *kranal_tunables.kra_ntx);
1658         if (rc != 0)
1659                 goto failed;
1660
1661         rc = kranal_thread_start(kranal_reaper, NULL, "kranal_reaper");
1662         if (rc != 0) {
1663                 CERROR("Can't spawn ranal reaper: %d\n", rc);
1664                 goto failed;
1665         }
1666
1667         for (i = 0; i < *kranal_tunables.kra_n_connd; i++) {
1668                 snprintf(name, sizeof(name), "kranal_connd_%02ld", i);
1669                 rc = kranal_thread_start(kranal_connd,
1670                                          (void *)(unsigned long)i, name);
1671                 if (rc != 0) {
1672                         CERROR("Can't spawn ranal connd[%d]: %d\n",
1673                                i, rc);
1674                         goto failed;
1675                 }
1676         }
1677
1678         LASSERT (kranal_data.kra_ndevs == 0);
1679
1680         /* Use all available RapidArray devices */
1681         for (i = 0; i < RANAL_MAXDEVS; i++) {
1682                 dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
1683
1684                 rc = kranal_device_init(kranal_devids[i], dev);
1685                 if (rc == 0)
1686                         kranal_data.kra_ndevs++;
1687         }
1688
1689         if (kranal_data.kra_ndevs == 0) {
1690                 CERROR("Can't initialise any RapidArray devices\n");
1691                 goto failed;
1692         }
1693         
1694         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1695                 dev = &kranal_data.kra_devices[i];
1696                 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1697                 rc = kranal_thread_start(kranal_scheduler, dev, name);
1698                 if (rc != 0) {
1699                         CERROR("Can't spawn ranal scheduler[%d]: %d\n",
1700                                i, rc);
1701                         goto failed;
1702                 }
1703         }
1704
1705         /* flag everything initialised */
1706         kranal_data.kra_init = RANAL_INIT_ALL;
1707         /*****************************************************/
1708
1709         CDEBUG(D_MALLOC, "initial kmem %d\n", pkmem);
1710         return 0;
1711
1712  failed:
1713         kranal_shutdown(ni);
1714         return -ENETDOWN;
1715 }
1716
1717 void __exit
1718 kranal_module_fini (void)
1719 {
1720         lnet_unregister_lnd(&the_kralnd);
1721         kranal_tunables_fini();
1722 }
1723
1724 int __init
1725 kranal_module_init (void)
1726 {
1727         int    rc;
1728
1729         rc = kranal_tunables_init();
1730         if (rc != 0)
1731                 return rc;
1732
1733         lnet_register_lnd(&the_kralnd);
1734
1735         return 0;
1736 }
1737
1738 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1739 MODULE_DESCRIPTION("Kernel RapidArray LND v0.01");
1740 MODULE_LICENSE("GPL");
1741
1742 module_init(kranal_module_init);
1743 module_exit(kranal_module_fini);