Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd.c
1 /*
2  * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * GPL HEADER START
6  *
7  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License version 2 only,
11  * as published by the Free Software Foundation.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License version 2 for more details (a copy is included
17  * in the LICENSE file that accompanied this code).
18  *
19  * You should have received a copy of the GNU General Public License
20  * version 2 along with this program; If not, see [sun.com URL with a
21  * copy of GPLv2].
22  *
23  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24  * CA 95054 USA or visit www.sun.com if you need additional information or
25  * have any questions.
26  *
27  * GPL HEADER END
28  */
29 /*
30  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
31  * Use is subject to license terms.
32  */
33 /*
34  * This file is part of Lustre, http://www.lustre.org/
35  * Lustre is a trademark of Sun Microsystems, Inc.
36  *
37  * lnet/klnds/ralnd/ralnd.c
38  *
39  * Author: Eric Barton <eric@bartonsoftware.com>
40  */
41 #include "ralnd.h"
42
43 static int        kranal_devids[RANAL_MAXDEVS] = {RAPK_MAIN_DEVICE_ID,
44                                                   RAPK_EXPANSION_DEVICE_ID};
45
46 lnd_t the_kralnd = {
47         .lnd_type       = RALND,
48         .lnd_startup    = kranal_startup,
49         .lnd_shutdown   = kranal_shutdown,
50         .lnd_ctl        = kranal_ctl,
51         .lnd_send       = kranal_send,
52         .lnd_recv       = kranal_recv,
53         .lnd_eager_recv = kranal_eager_recv,
54         .lnd_accept     = kranal_accept,
55 };
56
57 kra_data_t              kranal_data;
58
59 void
60 kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, lnet_nid_t dstnid)
61 {
62         RAP_RETURN   rrc;
63
64         memset(connreq, 0, sizeof(*connreq));
65
66         connreq->racr_magic     = RANAL_MSG_MAGIC;
67         connreq->racr_version   = RANAL_MSG_VERSION;
68
69         if (conn == NULL)                       /* prepping a "stub" reply */
70                 return;
71
72         connreq->racr_devid     = conn->rac_device->rad_id;
73         connreq->racr_srcnid    = lnet_ptlcompat_srcnid(kranal_data.kra_ni->ni_nid,
74                                                         dstnid);
75         connreq->racr_dstnid    = dstnid;
76         connreq->racr_peerstamp = kranal_data.kra_peerstamp;
77         connreq->racr_connstamp = conn->rac_my_connstamp;
78         connreq->racr_timeout   = conn->rac_timeout;
79
80         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
81         LASSERT(rrc == RAP_SUCCESS);
82 }
83
84 int
85 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int active)
86 {
87         int         timeout = active ? *kranal_tunables.kra_timeout :
88                                         lnet_acceptor_timeout();
89         int         swab;
90         int         rc;
91
92         /* return 0 on success, -ve on error, +ve to tell the peer I'm "old" */
93
94         rc = libcfs_sock_read(sock, &connreq->racr_magic, 
95                               sizeof(connreq->racr_magic), timeout);
96         if (rc != 0) {
97                 CERROR("Read(magic) failed(1): %d\n", rc);
98                 return -EIO;
99         }
100
101         if (connreq->racr_magic != RANAL_MSG_MAGIC &&
102             connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {
103                 /* Unexpected magic! */
104                 if (!active &&
105                     the_lnet.ln_ptlcompat == 0 &&
106                     (connreq->racr_magic == LNET_PROTO_MAGIC ||
107                      connreq->racr_magic == __swab32(LNET_PROTO_MAGIC))) {
108                         /* future protocol version compatibility!
109                          * When LNET unifies protocols over all LNDs, the first
110                          * thing sent will be a version query.  +ve rc means I
111                          * reply with my current magic/version */
112                         return EPROTO;
113                 }
114
115                 if (active ||
116                     the_lnet.ln_ptlcompat == 0) {
117                         CERROR("Unexpected magic %08x (1)\n",
118                                connreq->racr_magic);
119                         return -EPROTO;
120                 }
121
122                 /* When portals compatibility is set, I may be passed a new
123                  * connection "blindly" by the acceptor, and I have to
124                  * determine if my peer has sent an acceptor connection request
125                  * or not.  This isn't a connreq, so I'll get the acceptor to
126                  * look at it... */
127                 rc = lnet_accept(kranal_data.kra_ni, sock, connreq->racr_magic);
128                 if (rc != 0)
129                         return -EPROTO;
130
131                 /* ...and if it's OK I'm back to looking for a connreq... */
132                 rc = libcfs_sock_read(sock, &connreq->racr_magic,
133                                       sizeof(connreq->racr_magic), timeout);
134                 if (rc != 0) {
135                         CERROR("Read(magic) failed(2): %d\n", rc);
136                         return -EIO;
137                 }
138
139                 if (connreq->racr_magic != RANAL_MSG_MAGIC &&
140                     connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {
141                         CERROR("Unexpected magic %08x(2)\n",
142                                connreq->racr_magic);
143                         return -EPROTO;
144                 }
145         }
146
147         swab = (connreq->racr_magic == __swab32(RANAL_MSG_MAGIC));
148
149         rc = libcfs_sock_read(sock, &connreq->racr_version,
150                               sizeof(connreq->racr_version), timeout);
151         if (rc != 0) {
152                 CERROR("Read(version) failed: %d\n", rc);
153                 return -EIO;
154         }
155
156         if (swab)
157                 __swab16s(&connreq->racr_version);
158         
159         if (connreq->racr_version != RANAL_MSG_VERSION) {
160                 if (active) {
161                         CERROR("Unexpected version %d\n", connreq->racr_version);
162                         return -EPROTO;
163                 }
164                 /* If this is a future version of the ralnd protocol, and I'm
165                  * passive (accepted the connection), tell my peer I'm "old"
166                  * (+ve rc) */
167                 return EPROTO;
168         }
169
170         rc = libcfs_sock_read(sock, &connreq->racr_devid,
171                               sizeof(connreq->racr_version) -
172                               offsetof(kra_connreq_t, racr_devid),
173                               timeout);
174         if (rc != 0) {
175                 CERROR("Read(body) failed: %d\n", rc);
176                 return -EIO;
177         }
178
179         if (swab) {
180                 __swab32s(&connreq->racr_magic);
181                 __swab16s(&connreq->racr_version);
182                 __swab16s(&connreq->racr_devid);
183                 __swab64s(&connreq->racr_srcnid);
184                 __swab64s(&connreq->racr_dstnid);
185                 __swab64s(&connreq->racr_peerstamp);
186                 __swab64s(&connreq->racr_connstamp);
187                 __swab32s(&connreq->racr_timeout);
188
189                 __swab32s(&connreq->racr_riparams.HostId);
190                 __swab32s(&connreq->racr_riparams.FmaDomainHndl);
191                 __swab32s(&connreq->racr_riparams.PTag);
192                 __swab32s(&connreq->racr_riparams.CompletionCookie);
193         }
194
195         if (connreq->racr_srcnid == LNET_NID_ANY ||
196             connreq->racr_dstnid == LNET_NID_ANY) {
197                 CERROR("Received LNET_NID_ANY\n");
198                 return -EPROTO;
199         }
200
201         if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {
202                 CERROR("Received timeout %d < MIN %d\n",
203                        connreq->racr_timeout, RANAL_MIN_TIMEOUT);
204                 return -EPROTO;
205         }
206
207         return 0;
208 }
209
210 int
211 kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
212 {
213         kra_conn_t         *conn;
214         struct list_head   *ctmp;
215         struct list_head   *cnxt;
216         int                 loopback;
217         int                 count = 0;
218
219         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
220
221         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
222                 conn = list_entry(ctmp, kra_conn_t, rac_list);
223
224                 if (conn == newconn)
225                         continue;
226
227                 if (conn->rac_peerstamp != newconn->rac_peerstamp) {
228                         CDEBUG(D_NET, "Closing stale conn nid: %s "
229                                " peerstamp:"LPX64"("LPX64")\n", 
230                                libcfs_nid2str(peer->rap_nid),
231                                conn->rac_peerstamp, newconn->rac_peerstamp);
232                         LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
233                         count++;
234                         kranal_close_conn_locked(conn, -ESTALE);
235                         continue;
236                 }
237
238                 if (conn->rac_device != newconn->rac_device)
239                         continue;
240
241                 if (loopback &&
242                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
243                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
244                         continue;
245
246                 LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
247
248                 CDEBUG(D_NET, "Closing stale conn nid: %s"
249                        " connstamp:"LPX64"("LPX64")\n", 
250                        libcfs_nid2str(peer->rap_nid),
251                        conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
252
253                 count++;
254                 kranal_close_conn_locked(conn, -ESTALE);
255         }
256
257         return count;
258 }
259
260 int
261 kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
262 {
263         kra_conn_t       *conn;
264         struct list_head *tmp;
265         int               loopback;
266
267         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
268
269         list_for_each(tmp, &peer->rap_conns) {
270                 conn = list_entry(tmp, kra_conn_t, rac_list);
271
272                 /* 'newconn' is from an earlier version of 'peer'!!! */
273                 if (newconn->rac_peerstamp < conn->rac_peerstamp)
274                         return 1;
275
276                 /* 'conn' is from an earlier version of 'peer': it will be
277                  * removed when we cull stale conns later on... */
278                 if (newconn->rac_peerstamp > conn->rac_peerstamp)
279                         continue;
280
281                 /* Different devices are OK */
282                 if (conn->rac_device != newconn->rac_device)
283                         continue;
284
285                 /* It's me connecting to myself */
286                 if (loopback &&
287                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
288                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
289                         continue;
290
291                 /* 'newconn' is an earlier connection from 'peer'!!! */
292                 if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
293                         return 2;
294
295                 /* 'conn' is an earlier connection from 'peer': it will be
296                  * removed when we cull stale conns later on... */
297                 if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
298                         continue;
299
300                 /* 'newconn' has the SAME connection stamp; 'peer' isn't
301                  * playing the game... */
302                 return 3;
303         }
304
305         return 0;
306 }
307
308 void
309 kranal_set_conn_uniqueness (kra_conn_t *conn)
310 {
311         unsigned long  flags;
312
313         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
314
315         conn->rac_my_connstamp = kranal_data.kra_connstamp++;
316
317         do {    /* allocate a unique cqid */
318                 conn->rac_cqid = kranal_data.kra_next_cqid++;
319         } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
320
321         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
322 }
323
324 int
325 kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
326 {
327         kra_conn_t    *conn;
328         RAP_RETURN     rrc;
329
330         LASSERT (!in_interrupt());
331         LIBCFS_ALLOC(conn, sizeof(*conn));
332
333         if (conn == NULL)
334                 return -ENOMEM;
335
336         memset(conn, 0, sizeof(*conn));
337         atomic_set(&conn->rac_refcount, 1);
338         INIT_LIST_HEAD(&conn->rac_list);
339         INIT_LIST_HEAD(&conn->rac_hashlist);
340         INIT_LIST_HEAD(&conn->rac_schedlist);
341         INIT_LIST_HEAD(&conn->rac_fmaq);
342         INIT_LIST_HEAD(&conn->rac_rdmaq);
343         INIT_LIST_HEAD(&conn->rac_replyq);
344         spin_lock_init(&conn->rac_lock);
345
346         kranal_set_conn_uniqueness(conn);
347
348         conn->rac_device = dev;
349         conn->rac_timeout = MAX(*kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
350         kranal_update_reaper_timeout(conn->rac_timeout);
351
352         rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
353                            &conn->rac_rihandle);
354         if (rrc != RAP_SUCCESS) {
355                 CERROR("RapkCreateRi failed: %d\n", rrc);
356                 LIBCFS_FREE(conn, sizeof(*conn));
357                 return -ENETDOWN;
358         }
359
360         atomic_inc(&kranal_data.kra_nconns);
361         *connp = conn;
362         return 0;
363 }
364
365 void
366 kranal_destroy_conn(kra_conn_t *conn)
367 {
368         RAP_RETURN         rrc;
369
370         LASSERT (!in_interrupt());
371         LASSERT (!conn->rac_scheduled);
372         LASSERT (list_empty(&conn->rac_list));
373         LASSERT (list_empty(&conn->rac_hashlist));
374         LASSERT (list_empty(&conn->rac_schedlist));
375         LASSERT (atomic_read(&conn->rac_refcount) == 0);
376         LASSERT (list_empty(&conn->rac_fmaq));
377         LASSERT (list_empty(&conn->rac_rdmaq));
378         LASSERT (list_empty(&conn->rac_replyq));
379
380         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
381                             conn->rac_rihandle);
382         LASSERT (rrc == RAP_SUCCESS);
383
384         if (conn->rac_peer != NULL)
385                 kranal_peer_decref(conn->rac_peer);
386
387         LIBCFS_FREE(conn, sizeof(*conn));
388         atomic_dec(&kranal_data.kra_nconns);
389 }
390
391 void
392 kranal_terminate_conn_locked (kra_conn_t *conn)
393 {
394         LASSERT (!in_interrupt());
395         LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
396         LASSERT (!list_empty(&conn->rac_hashlist));
397         LASSERT (list_empty(&conn->rac_list));
398
399         /* Remove from conn hash table: no new callbacks */
400         list_del_init(&conn->rac_hashlist);
401         kranal_conn_decref(conn);
402
403         conn->rac_state = RANAL_CONN_CLOSED;
404
405         /* schedule to clear out all uncompleted comms in context of dev's
406          * scheduler */
407         kranal_schedule_conn(conn);
408 }
409
410 void
411 kranal_close_conn_locked (kra_conn_t *conn, int error)
412 {
413         kra_peer_t        *peer = conn->rac_peer;
414
415         CDEBUG(error == 0 ? D_NET : D_NETERROR,
416                "closing conn to %s: error %d\n", 
417                libcfs_nid2str(peer->rap_nid), error);
418
419         LASSERT (!in_interrupt());
420         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
421         LASSERT (!list_empty(&conn->rac_hashlist));
422         LASSERT (!list_empty(&conn->rac_list));
423
424         list_del_init(&conn->rac_list);
425
426         if (list_empty(&peer->rap_conns) &&
427             peer->rap_persistence == 0) {
428                 /* Non-persistent peer with no more conns... */
429                 kranal_unlink_peer_locked(peer);
430         }
431
432         /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
433          * full timeout.  If we get a CLOSE we know the peer has stopped all
434          * RDMA.  Otherwise if we wait for the full timeout we can also be sure
435          * all RDMA has stopped. */
436         conn->rac_last_rx = jiffies;
437         mb();
438
439         conn->rac_state = RANAL_CONN_CLOSING;
440         kranal_schedule_conn(conn);             /* schedule sending CLOSE */
441
442         kranal_conn_decref(conn);               /* lose peer's ref */
443 }
444
445 void
446 kranal_close_conn (kra_conn_t *conn, int error)
447 {
448         unsigned long    flags;
449
450
451         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
452
453         if (conn->rac_state == RANAL_CONN_ESTABLISHED)
454                 kranal_close_conn_locked(conn, error);
455
456         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
457 }
458
459 int
460 kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
461                        __u32 peer_ip, int peer_port)
462 {
463         kra_device_t  *dev = conn->rac_device;
464         unsigned long  flags;
465         RAP_RETURN     rrc;
466
467         /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive
468          * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */
469         conn->rac_last_tx = jiffies;
470         conn->rac_keepalive = 0;
471
472         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
473         if (rrc != RAP_SUCCESS) {
474                 CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
475                        HIPQUAD(peer_ip), peer_port, rrc);
476                 return -ECONNABORTED;
477         }
478
479         /* Schedule conn on rad_new_conns */
480         kranal_conn_addref(conn);
481         spin_lock_irqsave(&dev->rad_lock, flags);
482         list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns);
483         wake_up(&dev->rad_waitq);
484         spin_unlock_irqrestore(&dev->rad_lock, flags);
485
486         rrc = RapkWaitToConnect(conn->rac_rihandle);
487         if (rrc != RAP_SUCCESS) {
488                 CERROR("Error waiting to connect to %u.%u.%u.%u/%d: %d\n",
489                        HIPQUAD(peer_ip), peer_port, rrc);
490                 return -ECONNABORTED;
491         }
492
493         /* Scheduler doesn't touch conn apart from to deschedule and decref it
494          * after RapkCompleteSync() return success, so conn is all mine */
495
496         conn->rac_peerstamp = connreq->racr_peerstamp;
497         conn->rac_peer_connstamp = connreq->racr_connstamp;
498         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
499         kranal_update_reaper_timeout(conn->rac_keepalive);
500         return 0;
501 }
502
503 int
504 kranal_passive_conn_handshake (struct socket *sock, lnet_nid_t *src_nidp,
505                                lnet_nid_t *dst_nidp, kra_conn_t **connp)
506 {
507         __u32                peer_ip;
508         unsigned int         peer_port;
509         kra_connreq_t        rx_connreq;
510         kra_connreq_t        tx_connreq;
511         kra_conn_t          *conn;
512         kra_device_t        *dev;
513         int                  rc;
514         int                  i;
515
516         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
517         if (rc != 0) {
518                 CERROR("Can't get peer's IP: %d\n", rc);
519                 return rc;
520         }
521
522         rc = kranal_recv_connreq(sock, &rx_connreq, 0);
523
524         if (rc < 0) {
525                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
526                        HIPQUAD(peer_ip), peer_port, rc);
527                 return rc;
528         }
529
530         if (rc > 0) {
531                 /* Request from "new" peer: send reply with my MAGIC/VERSION to
532                  * tell her I'm old... */
533                 kranal_pack_connreq(&tx_connreq, NULL, LNET_NID_ANY);
534
535                 rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
536                                        lnet_acceptor_timeout());
537                 if (rc != 0)
538                         CERROR("Can't tx stub connreq to %u.%u.%u.%u/%d: %d\n",
539                                HIPQUAD(peer_ip), peer_port, rc);
540
541                 return -EPROTO;
542         }
543
544         for (i = 0;;i++) {
545                 if (i == kranal_data.kra_ndevs) {
546                         CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
547                                rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
548                         return -ENODEV;
549                 }
550                 dev = &kranal_data.kra_devices[i];
551                 if (dev->rad_id == rx_connreq.racr_devid)
552                         break;
553         }
554
555         rc = kranal_create_conn(&conn, dev);
556         if (rc != 0)
557                 return rc;
558
559         kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid);
560
561         rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
562                                lnet_acceptor_timeout());
563         if (rc != 0) {
564                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
565                        HIPQUAD(peer_ip), peer_port, rc);
566                 kranal_conn_decref(conn);
567                 return rc;
568         }
569
570         rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port);
571         if (rc != 0) {
572                 kranal_conn_decref(conn);
573                 return rc;
574         }
575
576         *connp = conn;
577         *src_nidp = rx_connreq.racr_srcnid;
578         *dst_nidp = rx_connreq.racr_dstnid;
579         return 0;
580 }
581
582 int
583 kranal_active_conn_handshake(kra_peer_t *peer,
584                              lnet_nid_t *dst_nidp, kra_conn_t **connp)
585 {
586         kra_connreq_t       connreq;
587         kra_conn_t         *conn;
588         kra_device_t       *dev;
589         struct socket      *sock;
590         int                 rc;
591         unsigned int        idx;
592
593         /* spread connections over all devices using both peer NIDs to ensure
594          * all nids use all devices */
595         idx = peer->rap_nid + kranal_data.kra_ni->ni_nid;
596         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
597
598         rc = kranal_create_conn(&conn, dev);
599         if (rc != 0)
600                 return rc;
601
602         kranal_pack_connreq(&connreq, conn, peer->rap_nid);
603
604         if (the_lnet.ln_testprotocompat != 0) {
605                 /* single-shot proto test */
606                 LNET_LOCK();
607                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
608                         connreq.racr_version++;
609                         the_lnet.ln_testprotocompat &= ~1;
610                 }
611                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
612                         connreq.racr_magic = LNET_PROTO_MAGIC;
613                         the_lnet.ln_testprotocompat &= ~2;
614                 }
615                 LNET_UNLOCK();
616         }
617
618         rc = lnet_connect(&sock, peer->rap_nid,
619                          0, peer->rap_ip, peer->rap_port);
620         if (rc != 0)
621                 goto failed_0;
622
623         /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
624          * immediately after accepting a connection, so we connect and then
625          * send immediately. */
626
627         rc = libcfs_sock_write(sock, &connreq, sizeof(connreq),
628                                lnet_acceptor_timeout());
629         if (rc != 0) {
630                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
631                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
632                 goto failed_2;
633         }
634
635         rc = kranal_recv_connreq(sock, &connreq, 1);
636         if (rc != 0) {
637                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
638                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
639                 goto failed_2;
640         }
641
642         libcfs_sock_release(sock);
643         rc = -EPROTO;
644
645         if (connreq.racr_srcnid != peer->rap_nid) {
646                 CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
647                        "received %s expected %s\n",
648                        HIPQUAD(peer->rap_ip), peer->rap_port,
649                        libcfs_nid2str(connreq.racr_srcnid), 
650                        libcfs_nid2str(peer->rap_nid));
651                 goto failed_1;
652         }
653
654         if (connreq.racr_devid != dev->rad_id) {
655                 CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
656                        "received %d expected %d\n",
657                        HIPQUAD(peer->rap_ip), peer->rap_port,
658                        connreq.racr_devid, dev->rad_id);
659                 goto failed_1;
660         }
661
662         rc = kranal_set_conn_params(conn, &connreq,
663                                     peer->rap_ip, peer->rap_port);
664         if (rc != 0)
665                 goto failed_1;
666
667         *connp = conn;
668         *dst_nidp = connreq.racr_dstnid;
669         return 0;
670
671  failed_2:
672         libcfs_sock_release(sock);
673  failed_1:
674         lnet_connect_console_error(rc, peer->rap_nid,
675                                   peer->rap_ip, peer->rap_port);
676  failed_0:
677         kranal_conn_decref(conn);
678         return rc;
679 }
680
681 int
682 kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
683 {
684         kra_peer_t        *peer2;
685         kra_tx_t          *tx;
686         lnet_nid_t          peer_nid;
687         lnet_nid_t          dst_nid;
688         unsigned long      flags;
689         kra_conn_t        *conn;
690         int                rc;
691         int                nstale;
692         int                new_peer = 0;
693
694         if (sock == NULL) {
695                 /* active: connd wants to connect to 'peer' */
696                 LASSERT (peer != NULL);
697                 LASSERT (peer->rap_connecting);
698
699                 rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
700                 if (rc != 0)
701                         return rc;
702
703                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
704
705                 if (!kranal_peer_active(peer)) {
706                         /* raced with peer getting unlinked */
707                         write_unlock_irqrestore(&kranal_data.kra_global_lock,
708                                                 flags);
709                         kranal_conn_decref(conn);
710                         return -ESTALE;
711                 }
712
713                 peer_nid = peer->rap_nid;
714         } else {
715                 /* passive: listener accepted 'sock' */
716                 LASSERT (peer == NULL);
717
718                 rc = kranal_passive_conn_handshake(sock, &peer_nid,
719                                                    &dst_nid, &conn);
720                 if (rc != 0)
721                         return rc;
722
723                 /* assume this is a new peer */
724                 rc = kranal_create_peer(&peer, peer_nid);
725                 if (rc != 0) {
726                         CERROR("Can't create conn for %s\n", 
727                                libcfs_nid2str(peer_nid));
728                         kranal_conn_decref(conn);
729                         return -ENOMEM;
730                 }
731
732                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
733
734                 peer2 = kranal_find_peer_locked(peer_nid);
735                 if (peer2 == NULL) {
736                         new_peer = 1;
737                 } else {
738                         /* peer_nid already in the peer table */
739                         kranal_peer_decref(peer);
740                         peer = peer2;
741                 }
742         }
743
744         LASSERT ((!new_peer) != (!kranal_peer_active(peer)));
745
746         /* Refuse connection if peer thinks we are a different NID.  We check
747          * this while holding the global lock, to synch with connection
748          * destruction on NID change. */
749         if (!lnet_ptlcompat_matchnid(kranal_data.kra_ni->ni_nid, dst_nid)) {
750                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
751
752                 CERROR("Stale/bad connection with %s: dst_nid %s, expected %s\n",
753                        libcfs_nid2str(peer_nid), libcfs_nid2str(dst_nid), 
754                        libcfs_nid2str(kranal_data.kra_ni->ni_nid));
755                 rc = -ESTALE;
756                 goto failed;
757         }
758
759         /* Refuse to duplicate an existing connection (both sides might try to
760          * connect at once).  NB we return success!  We _are_ connected so we
761          * _don't_ have any blocked txs to complete with failure. */
762         rc = kranal_conn_isdup_locked(peer, conn);
763         if (rc != 0) {
764                 LASSERT (!list_empty(&peer->rap_conns));
765                 LASSERT (list_empty(&peer->rap_tx_queue));
766                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
767                 CWARN("Not creating duplicate connection to %s: %d\n",
768                       libcfs_nid2str(peer_nid), rc);
769                 rc = 0;
770                 goto failed;
771         }
772
773         if (new_peer) {
774                 /* peer table takes my ref on the new peer */
775                 list_add_tail(&peer->rap_list,
776                               kranal_nid2peerlist(peer_nid));
777         }
778
779         /* initialise timestamps before reaper looks at them */
780         conn->rac_last_tx = conn->rac_last_rx = jiffies;
781
782         kranal_peer_addref(peer);               /* +1 ref for conn */
783         conn->rac_peer = peer;
784         list_add_tail(&conn->rac_list, &peer->rap_conns);
785
786         kranal_conn_addref(conn);               /* +1 ref for conn table */
787         list_add_tail(&conn->rac_hashlist,
788                       kranal_cqid2connlist(conn->rac_cqid));
789
790         /* Schedule all packets blocking for a connection */
791         while (!list_empty(&peer->rap_tx_queue)) {
792                 tx = list_entry(peer->rap_tx_queue.next,
793                                 kra_tx_t, tx_list);
794
795                 list_del(&tx->tx_list);
796                 kranal_post_fma(conn, tx);
797         }
798
799         nstale = kranal_close_stale_conns_locked(peer, conn);
800
801         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
802
803         /* CAVEAT EMPTOR: passive peer can disappear NOW */
804
805         if (nstale != 0)
806                 CWARN("Closed %d stale conns to %s\n", nstale, 
807                       libcfs_nid2str(peer_nid));
808
809         CWARN("New connection to %s on devid[%d] = %d\n",
810                libcfs_nid2str(peer_nid), 
811                conn->rac_device->rad_idx, conn->rac_device->rad_id);
812
813         /* Ensure conn gets checked.  Transmits may have been queued and an
814          * FMA event may have happened before it got in the cq hash table */
815         kranal_schedule_conn(conn);
816         return 0;
817
818  failed:
819         if (new_peer)
820                 kranal_peer_decref(peer);
821         kranal_conn_decref(conn);
822         return rc;
823 }
824
825 void
826 kranal_connect (kra_peer_t *peer)
827 {
828         kra_tx_t          *tx;
829         unsigned long      flags;
830         struct list_head   zombies;
831         int                rc;
832
833         LASSERT (peer->rap_connecting);
834
835         CDEBUG(D_NET, "About to handshake %s\n", 
836                libcfs_nid2str(peer->rap_nid));
837
838         rc = kranal_conn_handshake(NULL, peer);
839
840         CDEBUG(D_NET, "Done handshake %s:%d \n", 
841                libcfs_nid2str(peer->rap_nid), rc);
842
843         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
844
845         LASSERT (peer->rap_connecting);
846         peer->rap_connecting = 0;
847
848         if (rc == 0) {
849                 /* kranal_conn_handshake() queues blocked txs immediately on
850                  * success to avoid messages jumping the queue */
851                 LASSERT (list_empty(&peer->rap_tx_queue));
852
853                 peer->rap_reconnect_interval = 0; /* OK to reconnect at any time */
854
855                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
856                 return;
857         }
858
859         peer->rap_reconnect_interval *= 2;
860         peer->rap_reconnect_interval =
861                 MAX(peer->rap_reconnect_interval,
862                     *kranal_tunables.kra_min_reconnect_interval);
863         peer->rap_reconnect_interval =
864                 MIN(peer->rap_reconnect_interval,
865                     *kranal_tunables.kra_max_reconnect_interval);
866
867         peer->rap_reconnect_time = jiffies + peer->rap_reconnect_interval * HZ;
868
869         /* Grab all blocked packets while we have the global lock */
870         list_add(&zombies, &peer->rap_tx_queue);
871         list_del_init(&peer->rap_tx_queue);
872
873         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
874
875         if (list_empty(&zombies))
876                 return;
877
878         CDEBUG(D_NETERROR, "Dropping packets for %s: connection failed\n",
879                libcfs_nid2str(peer->rap_nid));
880
881         do {
882                 tx = list_entry(zombies.next, kra_tx_t, tx_list);
883
884                 list_del(&tx->tx_list);
885                 kranal_tx_done(tx, -EHOSTUNREACH);
886
887         } while (!list_empty(&zombies));
888 }
889
890 void
891 kranal_free_acceptsock (kra_acceptsock_t *ras)
892 {
893         libcfs_sock_release(ras->ras_sock);
894         LIBCFS_FREE(ras, sizeof(*ras));
895 }
896
897 int
898 kranal_accept (lnet_ni_t *ni, struct socket *sock)
899 {
900         kra_acceptsock_t  *ras;
901         int                rc;
902         __u32              peer_ip;
903         int                peer_port;
904         unsigned long      flags;
905
906         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
907         LASSERT (rc == 0);                      /* we succeeded before */
908
909         LIBCFS_ALLOC(ras, sizeof(*ras));
910         if (ras == NULL) {
911                 CERROR("ENOMEM allocating connection request from "
912                        "%u.%u.%u.%u\n", HIPQUAD(peer_ip));
913                 return -ENOMEM;
914         }
915
916         ras->ras_sock = sock;
917
918         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
919
920         list_add_tail(&ras->ras_list, &kranal_data.kra_connd_acceptq);
921         wake_up(&kranal_data.kra_connd_waitq);
922
923         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
924         return 0;
925 }
926
927 int
928 kranal_create_peer (kra_peer_t **peerp, lnet_nid_t nid)
929 {
930         kra_peer_t    *peer;
931         unsigned long  flags;
932
933         LASSERT (nid != LNET_NID_ANY);
934
935         LIBCFS_ALLOC(peer, sizeof(*peer));
936         if (peer == NULL)
937                 return -ENOMEM;
938
939         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
940
941         peer->rap_nid = nid;
942         atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
943
944         INIT_LIST_HEAD(&peer->rap_list);
945         INIT_LIST_HEAD(&peer->rap_connd_list);
946         INIT_LIST_HEAD(&peer->rap_conns);
947         INIT_LIST_HEAD(&peer->rap_tx_queue);
948
949         peer->rap_reconnect_interval = 0;       /* OK to connect at any time */
950
951         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
952
953         if (kranal_data.kra_nonewpeers) {
954                 /* shutdown has started already */
955                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
956                 
957                 LIBCFS_FREE(peer, sizeof(*peer));
958                 CERROR("Can't create peer: network shutdown\n");
959                 return -ESHUTDOWN;
960         }
961
962         atomic_inc(&kranal_data.kra_npeers);
963
964         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
965
966         *peerp = peer;
967         return 0;
968 }
969
970 void
971 kranal_destroy_peer (kra_peer_t *peer)
972 {
973         CDEBUG(D_NET, "peer %s %p deleted\n", 
974                libcfs_nid2str(peer->rap_nid), peer);
975
976         LASSERT (atomic_read(&peer->rap_refcount) == 0);
977         LASSERT (peer->rap_persistence == 0);
978         LASSERT (!kranal_peer_active(peer));
979         LASSERT (!peer->rap_connecting);
980         LASSERT (list_empty(&peer->rap_conns));
981         LASSERT (list_empty(&peer->rap_tx_queue));
982         LASSERT (list_empty(&peer->rap_connd_list));
983
984         LIBCFS_FREE(peer, sizeof(*peer));
985
986         /* NB a peer's connections keep a reference on their peer until
987          * they are destroyed, so we can be assured that _all_ state to do
988          * with this peer has been cleaned up when its refcount drops to
989          * zero. */
990         atomic_dec(&kranal_data.kra_npeers);
991 }
992
993 kra_peer_t *
994 kranal_find_peer_locked (lnet_nid_t nid)
995 {
996         struct list_head *peer_list = kranal_nid2peerlist(nid);
997         struct list_head *tmp;
998         kra_peer_t       *peer;
999
1000         list_for_each (tmp, peer_list) {
1001
1002                 peer = list_entry(tmp, kra_peer_t, rap_list);
1003
1004                 LASSERT (peer->rap_persistence > 0 ||     /* persistent peer */
1005                          !list_empty(&peer->rap_conns));  /* active conn */
1006
1007                 if (peer->rap_nid != nid)
1008                         continue;
1009
1010                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
1011                        peer, libcfs_nid2str(nid), 
1012                        atomic_read(&peer->rap_refcount));
1013                 return peer;
1014         }
1015         return NULL;
1016 }
1017
1018 kra_peer_t *
1019 kranal_find_peer (lnet_nid_t nid)
1020 {
1021         kra_peer_t     *peer;
1022
1023         read_lock(&kranal_data.kra_global_lock);
1024         peer = kranal_find_peer_locked(nid);
1025         if (peer != NULL)                       /* +1 ref for caller? */
1026                 kranal_peer_addref(peer);
1027         read_unlock(&kranal_data.kra_global_lock);
1028
1029         return peer;
1030 }
1031
1032 void
1033 kranal_unlink_peer_locked (kra_peer_t *peer)
1034 {
1035         LASSERT (peer->rap_persistence == 0);
1036         LASSERT (list_empty(&peer->rap_conns));
1037
1038         LASSERT (kranal_peer_active(peer));
1039         list_del_init(&peer->rap_list);
1040
1041         /* lose peerlist's ref */
1042         kranal_peer_decref(peer);
1043 }
1044
1045 int
1046 kranal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,
1047                       int *persistencep)
1048 {
1049         kra_peer_t        *peer;
1050         struct list_head  *ptmp;
1051         int                i;
1052
1053         read_lock(&kranal_data.kra_global_lock);
1054
1055         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1056
1057                 list_for_each(ptmp, &kranal_data.kra_peers[i]) {
1058
1059                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1060                         LASSERT (peer->rap_persistence > 0 ||
1061                                  !list_empty(&peer->rap_conns));
1062
1063                         if (index-- > 0)
1064                                 continue;
1065
1066                         *nidp = peer->rap_nid;
1067                         *ipp = peer->rap_ip;
1068                         *portp = peer->rap_port;
1069                         *persistencep = peer->rap_persistence;
1070
1071                         read_unlock(&kranal_data.kra_global_lock);
1072                         return 0;
1073                 }
1074         }
1075
1076         read_unlock(&kranal_data.kra_global_lock);
1077         return -ENOENT;
1078 }
1079
1080 int
1081 kranal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port)
1082 {
1083         unsigned long      flags;
1084         kra_peer_t        *peer;
1085         kra_peer_t        *peer2;
1086         int                rc;
1087
1088         if (nid == LNET_NID_ANY)
1089                 return -EINVAL;
1090
1091         rc = kranal_create_peer(&peer, nid);
1092         if (rc != 0)
1093                 return rc;
1094
1095         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1096
1097         peer2 = kranal_find_peer_locked(nid);
1098         if (peer2 != NULL) {
1099                 kranal_peer_decref(peer);
1100                 peer = peer2;
1101         } else {
1102                 /* peer table takes existing ref on peer */
1103                 list_add_tail(&peer->rap_list,
1104                               kranal_nid2peerlist(nid));
1105         }
1106
1107         peer->rap_ip = ip;
1108         peer->rap_port = port;
1109         peer->rap_persistence++;
1110
1111         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1112         return 0;
1113 }
1114
1115 void
1116 kranal_del_peer_locked (kra_peer_t *peer)
1117 {
1118         struct list_head *ctmp;
1119         struct list_head *cnxt;
1120         kra_conn_t       *conn;
1121
1122         peer->rap_persistence = 0;
1123
1124         if (list_empty(&peer->rap_conns)) {
1125                 kranal_unlink_peer_locked(peer);
1126         } else {
1127                 list_for_each_safe(ctmp, cnxt, &peer->rap_conns) {
1128                         conn = list_entry(ctmp, kra_conn_t, rac_list);
1129
1130                         kranal_close_conn_locked(conn, 0);
1131                 }
1132                 /* peer unlinks itself when last conn is closed */
1133         }
1134 }
1135
1136 int
1137 kranal_del_peer (lnet_nid_t nid)
1138 {
1139         unsigned long      flags;
1140         struct list_head  *ptmp;
1141         struct list_head  *pnxt;
1142         kra_peer_t        *peer;
1143         int                lo;
1144         int                hi;
1145         int                i;
1146         int                rc = -ENOENT;
1147
1148         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1149
1150         if (nid != LNET_NID_ANY)
1151                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1152         else {
1153                 lo = 0;
1154                 hi = kranal_data.kra_peer_hash_size - 1;
1155         }
1156
1157         for (i = lo; i <= hi; i++) {
1158                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1159                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1160                         LASSERT (peer->rap_persistence > 0 ||
1161                                  !list_empty(&peer->rap_conns));
1162
1163                         if (!(nid == LNET_NID_ANY || peer->rap_nid == nid))
1164                                 continue;
1165
1166                         kranal_del_peer_locked(peer);
1167                         rc = 0;         /* matched something */
1168                 }
1169         }
1170
1171         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1172
1173         return rc;
1174 }
1175
1176 kra_conn_t *
1177 kranal_get_conn_by_idx (int index)
1178 {
1179         kra_peer_t        *peer;
1180         struct list_head  *ptmp;
1181         kra_conn_t        *conn;
1182         struct list_head  *ctmp;
1183         int                i;
1184
1185         read_lock (&kranal_data.kra_global_lock);
1186
1187         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1188                 list_for_each (ptmp, &kranal_data.kra_peers[i]) {
1189
1190                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1191                         LASSERT (peer->rap_persistence > 0 ||
1192                                  !list_empty(&peer->rap_conns));
1193
1194                         list_for_each (ctmp, &peer->rap_conns) {
1195                                 if (index-- > 0)
1196                                         continue;
1197
1198                                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1199                                 CDEBUG(D_NET, "++conn[%p] -> %s (%d)\n", conn, 
1200                                        libcfs_nid2str(conn->rac_peer->rap_nid),
1201                                        atomic_read(&conn->rac_refcount));
1202                                 atomic_inc(&conn->rac_refcount);
1203                                 read_unlock(&kranal_data.kra_global_lock);
1204                                 return conn;
1205                         }
1206                 }
1207         }
1208
1209         read_unlock(&kranal_data.kra_global_lock);
1210         return NULL;
1211 }
1212
1213 int
1214 kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
1215 {
1216         kra_conn_t         *conn;
1217         struct list_head   *ctmp;
1218         struct list_head   *cnxt;
1219         int                 count = 0;
1220
1221         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
1222                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1223
1224                 count++;
1225                 kranal_close_conn_locked(conn, why);
1226         }
1227
1228         return count;
1229 }
1230
1231 int
1232 kranal_close_matching_conns (lnet_nid_t nid)
1233 {
1234         unsigned long       flags;
1235         kra_peer_t         *peer;
1236         struct list_head   *ptmp;
1237         struct list_head   *pnxt;
1238         int                 lo;
1239         int                 hi;
1240         int                 i;
1241         int                 count = 0;
1242
1243         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1244
1245         if (nid != LNET_NID_ANY)
1246                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1247         else {
1248                 lo = 0;
1249                 hi = kranal_data.kra_peer_hash_size - 1;
1250         }
1251
1252         for (i = lo; i <= hi; i++) {
1253                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1254
1255                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1256                         LASSERT (peer->rap_persistence > 0 ||
1257                                  !list_empty(&peer->rap_conns));
1258
1259                         if (!(nid == LNET_NID_ANY || nid == peer->rap_nid))
1260                                 continue;
1261
1262                         count += kranal_close_peer_conns_locked(peer, 0);
1263                 }
1264         }
1265
1266         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1267
1268         /* wildcards always succeed */
1269         if (nid == LNET_NID_ANY)
1270                 return 0;
1271
1272         return (count == 0) ? -ENOENT : 0;
1273 }
1274
1275 int
1276 kranal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1277 {
1278         struct libcfs_ioctl_data *data = arg;
1279         int                       rc = -EINVAL;
1280
1281         LASSERT (ni == kranal_data.kra_ni);
1282
1283         switch(cmd) {
1284         case IOC_LIBCFS_GET_PEER: {
1285                 lnet_nid_t   nid = 0;
1286                 __u32       ip = 0;
1287                 int         port = 0;
1288                 int         share_count = 0;
1289
1290                 rc = kranal_get_peer_info(data->ioc_count,
1291                                           &nid, &ip, &port, &share_count);
1292                 data->ioc_nid    = nid;
1293                 data->ioc_count  = share_count;
1294                 data->ioc_u32[0] = ip;
1295                 data->ioc_u32[1] = port;
1296                 break;
1297         }
1298         case IOC_LIBCFS_ADD_PEER: {
1299                 rc = kranal_add_persistent_peer(data->ioc_nid,
1300                                                 data->ioc_u32[0], /* IP */
1301                                                 data->ioc_u32[1]); /* port */
1302                 break;
1303         }
1304         case IOC_LIBCFS_DEL_PEER: {
1305                 rc = kranal_del_peer(data->ioc_nid);
1306                 break;
1307         }
1308         case IOC_LIBCFS_GET_CONN: {
1309                 kra_conn_t *conn = kranal_get_conn_by_idx(data->ioc_count);
1310
1311                 if (conn == NULL)
1312                         rc = -ENOENT;
1313                 else {
1314                         rc = 0;
1315                         data->ioc_nid    = conn->rac_peer->rap_nid;
1316                         data->ioc_u32[0] = conn->rac_device->rad_id;
1317                         kranal_conn_decref(conn);
1318                 }
1319                 break;
1320         }
1321         case IOC_LIBCFS_CLOSE_CONNECTION: {
1322                 rc = kranal_close_matching_conns(data->ioc_nid);
1323                 break;
1324         }
1325         case IOC_LIBCFS_REGISTER_MYNID: {
1326                 /* Ignore if this is a noop */
1327                 if (data->ioc_nid == ni->ni_nid) {
1328                         rc = 0;
1329                 } else {
1330                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1331                                libcfs_nid2str(data->ioc_nid),
1332                                libcfs_nid2str(ni->ni_nid));
1333                         rc = -EINVAL;
1334                 }
1335                 break;
1336         }
1337         }
1338
1339         return rc;
1340 }
1341
1342 void
1343 kranal_free_txdescs(struct list_head *freelist)
1344 {
1345         kra_tx_t    *tx;
1346
1347         while (!list_empty(freelist)) {
1348                 tx = list_entry(freelist->next, kra_tx_t, tx_list);
1349
1350                 list_del(&tx->tx_list);
1351                 LIBCFS_FREE(tx->tx_phys, LNET_MAX_IOV * sizeof(*tx->tx_phys));
1352                 LIBCFS_FREE(tx, sizeof(*tx));
1353         }
1354 }
1355
1356 int
1357 kranal_alloc_txdescs(struct list_head *freelist, int n)
1358 {
1359         int            i;
1360         kra_tx_t      *tx;
1361
1362         LASSERT (freelist == &kranal_data.kra_idle_txs);
1363         LASSERT (list_empty(freelist));
1364
1365         for (i = 0; i < n; i++) {
1366
1367                 LIBCFS_ALLOC(tx, sizeof(*tx));
1368                 if (tx == NULL) {
1369                         CERROR("Can't allocate tx[%d]\n", i);
1370                         kranal_free_txdescs(freelist);
1371                         return -ENOMEM;
1372                 }
1373
1374                 LIBCFS_ALLOC(tx->tx_phys,
1375                              LNET_MAX_IOV * sizeof(*tx->tx_phys));
1376                 if (tx->tx_phys == NULL) {
1377                         CERROR("Can't allocate tx[%d]->tx_phys\n", i);
1378
1379                         LIBCFS_FREE(tx, sizeof(*tx));
1380                         kranal_free_txdescs(freelist);
1381                         return -ENOMEM;
1382                 }
1383
1384                 tx->tx_buftype = RANAL_BUF_NONE;
1385                 tx->tx_msg.ram_type = RANAL_MSG_NONE;
1386
1387                 list_add(&tx->tx_list, freelist);
1388         }
1389
1390         return 0;
1391 }
1392
1393 int
1394 kranal_device_init(int id, kra_device_t *dev)
1395 {
1396         int               total_ntx = *kranal_tunables.kra_ntx;
1397         RAP_RETURN        rrc;
1398
1399         dev->rad_id = id;
1400         rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
1401                                    &dev->rad_handle);
1402         if (rrc != RAP_SUCCESS) {
1403                 CERROR("Can't get Rapidarray Device %d: %d\n", id, rrc);
1404                 goto failed_0;
1405         }
1406
1407         rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
1408         if (rrc != RAP_SUCCESS) {
1409                 CERROR("Can't reserve %d RDMA descriptors"
1410                        " for device %d: %d\n", total_ntx, id, rrc);
1411                 goto failed_1;
1412         }
1413
1414         rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
1415                            &dev->rad_rdma_cqh);
1416         if (rrc != RAP_SUCCESS) {
1417                 CERROR("Can't create rdma cq size %d for device %d: %d\n",
1418                        total_ntx, id, rrc);
1419                 goto failed_1;
1420         }
1421
1422         rrc = RapkCreateCQ(dev->rad_handle, 
1423                            *kranal_tunables.kra_fma_cq_size, 
1424                            RAP_CQTYPE_RECV, &dev->rad_fma_cqh);
1425         if (rrc != RAP_SUCCESS) {
1426                 CERROR("Can't create fma cq size %d for device %d: %d\n", 
1427                        *kranal_tunables.kra_fma_cq_size, id, rrc);
1428                 goto failed_2;
1429         }
1430
1431         return 0;
1432
1433  failed_2:
1434         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1435  failed_1:
1436         RapkReleaseDevice(dev->rad_handle);
1437  failed_0:
1438         return -ENODEV;
1439 }
1440
1441 void
1442 kranal_device_fini(kra_device_t *dev)
1443 {
1444         LASSERT (list_empty(&dev->rad_ready_conns));
1445         LASSERT (list_empty(&dev->rad_new_conns));
1446         LASSERT (dev->rad_nphysmap == 0);
1447         LASSERT (dev->rad_nppphysmap == 0);
1448         LASSERT (dev->rad_nvirtmap == 0);
1449         LASSERT (dev->rad_nobvirtmap == 0);
1450                 
1451         LASSERT(dev->rad_scheduler == NULL);
1452         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
1453         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1454         RapkReleaseDevice(dev->rad_handle);
1455 }
1456
1457 void
1458 kranal_shutdown (lnet_ni_t *ni)
1459 {
1460         int           i;
1461         unsigned long flags;
1462
1463         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1464                atomic_read(&libcfs_kmemory));
1465
1466         LASSERT (ni == kranal_data.kra_ni);
1467         LASSERT (ni->ni_data == &kranal_data);
1468
1469         switch (kranal_data.kra_init) {
1470         default:
1471                 CERROR("Unexpected state %d\n", kranal_data.kra_init);
1472                 LBUG();
1473
1474         case RANAL_INIT_ALL:
1475                 /* Prevent new peers from being created */
1476                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1477                 kranal_data.kra_nonewpeers = 1;
1478                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1479                 
1480                 /* Remove all existing peers from the peer table */
1481                 kranal_del_peer(LNET_NID_ANY);
1482
1483                 /* Wait for pending conn reqs to be handled */
1484                 i = 2;
1485                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1486                 while (!list_empty(&kranal_data.kra_connd_acceptq)) {
1487                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, 
1488                                                flags);
1489                         i++;
1490                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1491                                "waiting for conn reqs to clean up\n");
1492                         cfs_pause(cfs_time_seconds(1));
1493
1494                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1495                 }
1496                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1497
1498                 /* Wait for all peers to be freed */
1499                 i = 2;
1500                 while (atomic_read(&kranal_data.kra_npeers) != 0) {
1501                         i++;
1502                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1503                                "waiting for %d peers to close down\n",
1504                                atomic_read(&kranal_data.kra_npeers));
1505                         cfs_pause(cfs_time_seconds(1));
1506                 }
1507                 /* fall through */
1508
1509         case RANAL_INIT_DATA:
1510                 break;
1511         }
1512
1513         /* Peer state all cleaned up BEFORE setting shutdown, so threads don't
1514          * have to worry about shutdown races.  NB connections may be created
1515          * while there are still active connds, but these will be temporary
1516          * since peer creation always fails after the listener has started to
1517          * shut down. */
1518         LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
1519         
1520         /* Flag threads to terminate */
1521         kranal_data.kra_shutdown = 1;
1522
1523         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1524                 kra_device_t *dev = &kranal_data.kra_devices[i];
1525
1526                 spin_lock_irqsave(&dev->rad_lock, flags);
1527                 wake_up(&dev->rad_waitq);
1528                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1529         }
1530
1531         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1532         wake_up_all(&kranal_data.kra_reaper_waitq);
1533         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1534
1535         LASSERT (list_empty(&kranal_data.kra_connd_peers));
1536         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1537         wake_up_all(&kranal_data.kra_connd_waitq);
1538         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1539
1540         /* Wait for threads to exit */
1541         i = 2;
1542         while (atomic_read(&kranal_data.kra_nthreads) != 0) {
1543                 i++;
1544                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1545                        "Waiting for %d threads to terminate\n",
1546                        atomic_read(&kranal_data.kra_nthreads));
1547                 cfs_pause(cfs_time_seconds(1));
1548         }
1549
1550         LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
1551         if (kranal_data.kra_peers != NULL) {
1552                 for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1553                         LASSERT (list_empty(&kranal_data.kra_peers[i]));
1554
1555                 LIBCFS_FREE(kranal_data.kra_peers,
1556                             sizeof (struct list_head) *
1557                             kranal_data.kra_peer_hash_size);
1558         }
1559
1560         LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
1561         if (kranal_data.kra_conns != NULL) {
1562                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1563                         LASSERT (list_empty(&kranal_data.kra_conns[i]));
1564
1565                 LIBCFS_FREE(kranal_data.kra_conns,
1566                             sizeof (struct list_head) *
1567                             kranal_data.kra_conn_hash_size);
1568         }
1569
1570         for (i = 0; i < kranal_data.kra_ndevs; i++)
1571                 kranal_device_fini(&kranal_data.kra_devices[i]);
1572
1573         kranal_free_txdescs(&kranal_data.kra_idle_txs);
1574
1575         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1576                atomic_read(&libcfs_kmemory));
1577
1578         kranal_data.kra_init = RANAL_INIT_NOTHING;
1579         PORTAL_MODULE_UNUSE;
1580 }
1581
1582 int
1583 kranal_startup (lnet_ni_t *ni)
1584 {
1585         struct timeval    tv;
1586         int               pkmem = atomic_read(&libcfs_kmemory);
1587         int               rc;
1588         int               i;
1589         kra_device_t     *dev;
1590
1591         LASSERT (ni->ni_lnd == &the_kralnd);
1592
1593         /* Only 1 instance supported */
1594         if (kranal_data.kra_init != RANAL_INIT_NOTHING) {
1595                 CERROR ("Only 1 instance supported\n");
1596                 return -EPERM;
1597         }
1598
1599         if (lnet_set_ip_niaddr(ni) != 0) {
1600                 CERROR ("Can't determine my NID\n");
1601                 return -EPERM;
1602         }
1603
1604         if (*kranal_tunables.kra_credits > *kranal_tunables.kra_ntx) {
1605                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1606                         *kranal_tunables.kra_credits,
1607                         *kranal_tunables.kra_ntx);
1608                 return -EINVAL;
1609         }
1610         
1611         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
1612
1613         ni->ni_maxtxcredits = *kranal_tunables.kra_credits;
1614         ni->ni_peertxcredits = *kranal_tunables.kra_peercredits;
1615
1616         ni->ni_data = &kranal_data;
1617         kranal_data.kra_ni = ni;
1618
1619         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
1620          * a unique (for all time) connstamp so we can uniquely identify
1621          * the sender.  The connstamp is an incrementing counter
1622          * initialised with seconds + microseconds at startup time.  So we
1623          * rely on NOT creating connections more frequently on average than
1624          * 1MHz to ensure we don't use old connstamps when we reboot. */
1625         do_gettimeofday(&tv);
1626         kranal_data.kra_connstamp =
1627         kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1628
1629         rwlock_init(&kranal_data.kra_global_lock);
1630
1631         for (i = 0; i < RANAL_MAXDEVS; i++ ) {
1632                 kra_device_t  *dev = &kranal_data.kra_devices[i];
1633
1634                 dev->rad_idx = i;
1635                 INIT_LIST_HEAD(&dev->rad_ready_conns);
1636                 INIT_LIST_HEAD(&dev->rad_new_conns);
1637                 init_waitqueue_head(&dev->rad_waitq);
1638                 spin_lock_init(&dev->rad_lock);
1639         }
1640
1641         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1642         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
1643         spin_lock_init(&kranal_data.kra_reaper_lock);
1644
1645         INIT_LIST_HEAD(&kranal_data.kra_connd_acceptq);
1646         INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
1647         init_waitqueue_head(&kranal_data.kra_connd_waitq);
1648         spin_lock_init(&kranal_data.kra_connd_lock);
1649
1650         INIT_LIST_HEAD(&kranal_data.kra_idle_txs);
1651         spin_lock_init(&kranal_data.kra_tx_lock);
1652
1653         /* OK to call kranal_api_shutdown() to cleanup now */
1654         kranal_data.kra_init = RANAL_INIT_DATA;
1655         PORTAL_MODULE_USE;
1656
1657         kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
1658         LIBCFS_ALLOC(kranal_data.kra_peers,
1659                      sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
1660         if (kranal_data.kra_peers == NULL)
1661                 goto failed;
1662
1663         for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1664                 INIT_LIST_HEAD(&kranal_data.kra_peers[i]);
1665
1666         kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE;
1667         LIBCFS_ALLOC(kranal_data.kra_conns,
1668                      sizeof(struct list_head) * kranal_data.kra_conn_hash_size);
1669         if (kranal_data.kra_conns == NULL)
1670                 goto failed;
1671
1672         for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1673                 INIT_LIST_HEAD(&kranal_data.kra_conns[i]);
1674
1675         rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, 
1676                                   *kranal_tunables.kra_ntx);
1677         if (rc != 0)
1678                 goto failed;
1679
1680         rc = kranal_thread_start(kranal_reaper, NULL);
1681         if (rc != 0) {
1682                 CERROR("Can't spawn ranal reaper: %d\n", rc);
1683                 goto failed;
1684         }
1685
1686         for (i = 0; i < *kranal_tunables.kra_n_connd; i++) {
1687                 rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i);
1688                 if (rc != 0) {
1689                         CERROR("Can't spawn ranal connd[%d]: %d\n",
1690                                i, rc);
1691                         goto failed;
1692                 }
1693         }
1694
1695         LASSERT (kranal_data.kra_ndevs == 0);
1696
1697         /* Use all available RapidArray devices */
1698         for (i = 0; i < RANAL_MAXDEVS; i++) {
1699                 dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
1700
1701                 rc = kranal_device_init(kranal_devids[i], dev);
1702                 if (rc == 0)
1703                         kranal_data.kra_ndevs++;
1704         }
1705
1706         if (kranal_data.kra_ndevs == 0) {
1707                 CERROR("Can't initialise any RapidArray devices\n");
1708                 goto failed;
1709         }
1710         
1711         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1712                 dev = &kranal_data.kra_devices[i];
1713                 rc = kranal_thread_start(kranal_scheduler, dev);
1714                 if (rc != 0) {
1715                         CERROR("Can't spawn ranal scheduler[%d]: %d\n",
1716                                i, rc);
1717                         goto failed;
1718                 }
1719         }
1720
1721         /* flag everything initialised */
1722         kranal_data.kra_init = RANAL_INIT_ALL;
1723         /*****************************************************/
1724
1725         CDEBUG(D_MALLOC, "initial kmem %d\n", pkmem);
1726         return 0;
1727
1728  failed:
1729         kranal_shutdown(ni);
1730         return -ENETDOWN;
1731 }
1732
1733 void __exit
1734 kranal_module_fini (void)
1735 {
1736         lnet_unregister_lnd(&the_kralnd);
1737         kranal_tunables_fini();
1738 }
1739
1740 int __init
1741 kranal_module_init (void)
1742 {
1743         int    rc;
1744
1745         rc = kranal_tunables_init();
1746         if (rc != 0)
1747                 return rc;
1748
1749         lnet_register_lnd(&the_kralnd);
1750
1751         return 0;
1752 }
1753
1754 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1755 MODULE_DESCRIPTION("Kernel RapidArray LND v0.01");
1756 MODULE_LICENSE("GPL");
1757
1758 module_init(kranal_module_init);
1759 module_exit(kranal_module_fini);