Whamcloud - gitweb
022432c8497edbe88f6f051ef91105b9a9375c89
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ralnd/ralnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40 #include "ralnd.h"
41
42 static int        kranal_devids[RANAL_MAXDEVS] = {RAPK_MAIN_DEVICE_ID,
43                                                   RAPK_EXPANSION_DEVICE_ID};
44
45 lnd_t the_kralnd = {
46         .lnd_type       = RALND,
47         .lnd_startup    = kranal_startup,
48         .lnd_shutdown   = kranal_shutdown,
49         .lnd_ctl        = kranal_ctl,
50         .lnd_send       = kranal_send,
51         .lnd_recv       = kranal_recv,
52         .lnd_eager_recv = kranal_eager_recv,
53         .lnd_accept     = kranal_accept,
54 };
55
56 kra_data_t              kranal_data;
57
58 void
59 kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn, lnet_nid_t dstnid)
60 {
61         RAP_RETURN   rrc;
62
63         memset(connreq, 0, sizeof(*connreq));
64
65         connreq->racr_magic     = RANAL_MSG_MAGIC;
66         connreq->racr_version   = RANAL_MSG_VERSION;
67
68         if (conn == NULL)                       /* prepping a "stub" reply */
69                 return;
70
71         connreq->racr_devid     = conn->rac_device->rad_id;
72         connreq->racr_srcnid    = lnet_ptlcompat_srcnid(kranal_data.kra_ni->ni_nid,
73                                                         dstnid);
74         connreq->racr_dstnid    = dstnid;
75         connreq->racr_peerstamp = kranal_data.kra_peerstamp;
76         connreq->racr_connstamp = conn->rac_my_connstamp;
77         connreq->racr_timeout   = conn->rac_timeout;
78
79         rrc = RapkGetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
80         LASSERT(rrc == RAP_SUCCESS);
81 }
82
83 int
84 kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int active)
85 {
86         int         timeout = active ? *kranal_tunables.kra_timeout :
87                                         lnet_acceptor_timeout();
88         int         swab;
89         int         rc;
90
91         /* return 0 on success, -ve on error, +ve to tell the peer I'm "old" */
92
93         rc = libcfs_sock_read(sock, &connreq->racr_magic, 
94                               sizeof(connreq->racr_magic), timeout);
95         if (rc != 0) {
96                 CERROR("Read(magic) failed(1): %d\n", rc);
97                 return -EIO;
98         }
99
100         if (connreq->racr_magic != RANAL_MSG_MAGIC &&
101             connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {
102                 /* Unexpected magic! */
103                 if (!active &&
104                     the_lnet.ln_ptlcompat == 0 &&
105                     (connreq->racr_magic == LNET_PROTO_MAGIC ||
106                      connreq->racr_magic == __swab32(LNET_PROTO_MAGIC))) {
107                         /* future protocol version compatibility!
108                          * When LNET unifies protocols over all LNDs, the first
109                          * thing sent will be a version query.  +ve rc means I
110                          * reply with my current magic/version */
111                         return EPROTO;
112                 }
113
114                 if (active ||
115                     the_lnet.ln_ptlcompat == 0) {
116                         CERROR("Unexpected magic %08x (1)\n",
117                                connreq->racr_magic);
118                         return -EPROTO;
119                 }
120
121                 /* When portals compatibility is set, I may be passed a new
122                  * connection "blindly" by the acceptor, and I have to
123                  * determine if my peer has sent an acceptor connection request
124                  * or not.  This isn't a connreq, so I'll get the acceptor to
125                  * look at it... */
126                 rc = lnet_accept(kranal_data.kra_ni, sock, connreq->racr_magic);
127                 if (rc != 0)
128                         return -EPROTO;
129
130                 /* ...and if it's OK I'm back to looking for a connreq... */
131                 rc = libcfs_sock_read(sock, &connreq->racr_magic,
132                                       sizeof(connreq->racr_magic), timeout);
133                 if (rc != 0) {
134                         CERROR("Read(magic) failed(2): %d\n", rc);
135                         return -EIO;
136                 }
137
138                 if (connreq->racr_magic != RANAL_MSG_MAGIC &&
139                     connreq->racr_magic != __swab32(RANAL_MSG_MAGIC)) {
140                         CERROR("Unexpected magic %08x(2)\n",
141                                connreq->racr_magic);
142                         return -EPROTO;
143                 }
144         }
145
146         swab = (connreq->racr_magic == __swab32(RANAL_MSG_MAGIC));
147
148         rc = libcfs_sock_read(sock, &connreq->racr_version,
149                               sizeof(connreq->racr_version), timeout);
150         if (rc != 0) {
151                 CERROR("Read(version) failed: %d\n", rc);
152                 return -EIO;
153         }
154
155         if (swab)
156                 __swab16s(&connreq->racr_version);
157         
158         if (connreq->racr_version != RANAL_MSG_VERSION) {
159                 if (active) {
160                         CERROR("Unexpected version %d\n", connreq->racr_version);
161                         return -EPROTO;
162                 }
163                 /* If this is a future version of the ralnd protocol, and I'm
164                  * passive (accepted the connection), tell my peer I'm "old"
165                  * (+ve rc) */
166                 return EPROTO;
167         }
168
169         rc = libcfs_sock_read(sock, &connreq->racr_devid,
170                               sizeof(connreq->racr_version) -
171                               offsetof(kra_connreq_t, racr_devid),
172                               timeout);
173         if (rc != 0) {
174                 CERROR("Read(body) failed: %d\n", rc);
175                 return -EIO;
176         }
177
178         if (swab) {
179                 __swab32s(&connreq->racr_magic);
180                 __swab16s(&connreq->racr_version);
181                 __swab16s(&connreq->racr_devid);
182                 __swab64s(&connreq->racr_srcnid);
183                 __swab64s(&connreq->racr_dstnid);
184                 __swab64s(&connreq->racr_peerstamp);
185                 __swab64s(&connreq->racr_connstamp);
186                 __swab32s(&connreq->racr_timeout);
187
188                 __swab32s(&connreq->racr_riparams.HostId);
189                 __swab32s(&connreq->racr_riparams.FmaDomainHndl);
190                 __swab32s(&connreq->racr_riparams.PTag);
191                 __swab32s(&connreq->racr_riparams.CompletionCookie);
192         }
193
194         if (connreq->racr_srcnid == LNET_NID_ANY ||
195             connreq->racr_dstnid == LNET_NID_ANY) {
196                 CERROR("Received LNET_NID_ANY\n");
197                 return -EPROTO;
198         }
199
200         if (connreq->racr_timeout < RANAL_MIN_TIMEOUT) {
201                 CERROR("Received timeout %d < MIN %d\n",
202                        connreq->racr_timeout, RANAL_MIN_TIMEOUT);
203                 return -EPROTO;
204         }
205
206         return 0;
207 }
208
209 int
210 kranal_close_stale_conns_locked (kra_peer_t *peer, kra_conn_t *newconn)
211 {
212         kra_conn_t         *conn;
213         struct list_head   *ctmp;
214         struct list_head   *cnxt;
215         int                 loopback;
216         int                 count = 0;
217
218         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
219
220         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
221                 conn = list_entry(ctmp, kra_conn_t, rac_list);
222
223                 if (conn == newconn)
224                         continue;
225
226                 if (conn->rac_peerstamp != newconn->rac_peerstamp) {
227                         CDEBUG(D_NET, "Closing stale conn nid: %s "
228                                " peerstamp:"LPX64"("LPX64")\n", 
229                                libcfs_nid2str(peer->rap_nid),
230                                conn->rac_peerstamp, newconn->rac_peerstamp);
231                         LASSERT (conn->rac_peerstamp < newconn->rac_peerstamp);
232                         count++;
233                         kranal_close_conn_locked(conn, -ESTALE);
234                         continue;
235                 }
236
237                 if (conn->rac_device != newconn->rac_device)
238                         continue;
239
240                 if (loopback &&
241                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
242                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
243                         continue;
244
245                 LASSERT (conn->rac_peer_connstamp < newconn->rac_peer_connstamp);
246
247                 CDEBUG(D_NET, "Closing stale conn nid: %s"
248                        " connstamp:"LPX64"("LPX64")\n", 
249                        libcfs_nid2str(peer->rap_nid),
250                        conn->rac_peer_connstamp, newconn->rac_peer_connstamp);
251
252                 count++;
253                 kranal_close_conn_locked(conn, -ESTALE);
254         }
255
256         return count;
257 }
258
259 int
260 kranal_conn_isdup_locked(kra_peer_t *peer, kra_conn_t *newconn)
261 {
262         kra_conn_t       *conn;
263         struct list_head *tmp;
264         int               loopback;
265
266         loopback = peer->rap_nid == kranal_data.kra_ni->ni_nid;
267
268         list_for_each(tmp, &peer->rap_conns) {
269                 conn = list_entry(tmp, kra_conn_t, rac_list);
270
271                 /* 'newconn' is from an earlier version of 'peer'!!! */
272                 if (newconn->rac_peerstamp < conn->rac_peerstamp)
273                         return 1;
274
275                 /* 'conn' is from an earlier version of 'peer': it will be
276                  * removed when we cull stale conns later on... */
277                 if (newconn->rac_peerstamp > conn->rac_peerstamp)
278                         continue;
279
280                 /* Different devices are OK */
281                 if (conn->rac_device != newconn->rac_device)
282                         continue;
283
284                 /* It's me connecting to myself */
285                 if (loopback &&
286                     newconn->rac_my_connstamp == conn->rac_peer_connstamp &&
287                     newconn->rac_peer_connstamp == conn->rac_my_connstamp)
288                         continue;
289
290                 /* 'newconn' is an earlier connection from 'peer'!!! */
291                 if (newconn->rac_peer_connstamp < conn->rac_peer_connstamp)
292                         return 2;
293
294                 /* 'conn' is an earlier connection from 'peer': it will be
295                  * removed when we cull stale conns later on... */
296                 if (newconn->rac_peer_connstamp > conn->rac_peer_connstamp)
297                         continue;
298
299                 /* 'newconn' has the SAME connection stamp; 'peer' isn't
300                  * playing the game... */
301                 return 3;
302         }
303
304         return 0;
305 }
306
307 void
308 kranal_set_conn_uniqueness (kra_conn_t *conn)
309 {
310         unsigned long  flags;
311
312         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
313
314         conn->rac_my_connstamp = kranal_data.kra_connstamp++;
315
316         do {    /* allocate a unique cqid */
317                 conn->rac_cqid = kranal_data.kra_next_cqid++;
318         } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
319
320         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
321 }
322
323 int
324 kranal_create_conn(kra_conn_t **connp, kra_device_t *dev)
325 {
326         kra_conn_t    *conn;
327         RAP_RETURN     rrc;
328
329         LASSERT (!in_interrupt());
330         LIBCFS_ALLOC(conn, sizeof(*conn));
331
332         if (conn == NULL)
333                 return -ENOMEM;
334
335         memset(conn, 0, sizeof(*conn));
336         atomic_set(&conn->rac_refcount, 1);
337         INIT_LIST_HEAD(&conn->rac_list);
338         INIT_LIST_HEAD(&conn->rac_hashlist);
339         INIT_LIST_HEAD(&conn->rac_schedlist);
340         INIT_LIST_HEAD(&conn->rac_fmaq);
341         INIT_LIST_HEAD(&conn->rac_rdmaq);
342         INIT_LIST_HEAD(&conn->rac_replyq);
343         spin_lock_init(&conn->rac_lock);
344
345         kranal_set_conn_uniqueness(conn);
346
347         conn->rac_device = dev;
348         conn->rac_timeout = MAX(*kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
349         kranal_update_reaper_timeout(conn->rac_timeout);
350
351         rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
352                            &conn->rac_rihandle);
353         if (rrc != RAP_SUCCESS) {
354                 CERROR("RapkCreateRi failed: %d\n", rrc);
355                 LIBCFS_FREE(conn, sizeof(*conn));
356                 return -ENETDOWN;
357         }
358
359         atomic_inc(&kranal_data.kra_nconns);
360         *connp = conn;
361         return 0;
362 }
363
364 void
365 kranal_destroy_conn(kra_conn_t *conn)
366 {
367         RAP_RETURN         rrc;
368
369         LASSERT (!in_interrupt());
370         LASSERT (!conn->rac_scheduled);
371         LASSERT (list_empty(&conn->rac_list));
372         LASSERT (list_empty(&conn->rac_hashlist));
373         LASSERT (list_empty(&conn->rac_schedlist));
374         LASSERT (atomic_read(&conn->rac_refcount) == 0);
375         LASSERT (list_empty(&conn->rac_fmaq));
376         LASSERT (list_empty(&conn->rac_rdmaq));
377         LASSERT (list_empty(&conn->rac_replyq));
378
379         rrc = RapkDestroyRi(conn->rac_device->rad_handle,
380                             conn->rac_rihandle);
381         LASSERT (rrc == RAP_SUCCESS);
382
383         if (conn->rac_peer != NULL)
384                 kranal_peer_decref(conn->rac_peer);
385
386         LIBCFS_FREE(conn, sizeof(*conn));
387         atomic_dec(&kranal_data.kra_nconns);
388 }
389
390 void
391 kranal_terminate_conn_locked (kra_conn_t *conn)
392 {
393         LASSERT (!in_interrupt());
394         LASSERT (conn->rac_state == RANAL_CONN_CLOSING);
395         LASSERT (!list_empty(&conn->rac_hashlist));
396         LASSERT (list_empty(&conn->rac_list));
397
398         /* Remove from conn hash table: no new callbacks */
399         list_del_init(&conn->rac_hashlist);
400         kranal_conn_decref(conn);
401
402         conn->rac_state = RANAL_CONN_CLOSED;
403
404         /* schedule to clear out all uncompleted comms in context of dev's
405          * scheduler */
406         kranal_schedule_conn(conn);
407 }
408
409 void
410 kranal_close_conn_locked (kra_conn_t *conn, int error)
411 {
412         kra_peer_t        *peer = conn->rac_peer;
413
414         CDEBUG(error == 0 ? D_NET : D_NETERROR,
415                "closing conn to %s: error %d\n", 
416                libcfs_nid2str(peer->rap_nid), error);
417
418         LASSERT (!in_interrupt());
419         LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED);
420         LASSERT (!list_empty(&conn->rac_hashlist));
421         LASSERT (!list_empty(&conn->rac_list));
422
423         list_del_init(&conn->rac_list);
424
425         if (list_empty(&peer->rap_conns) &&
426             peer->rap_persistence == 0) {
427                 /* Non-persistent peer with no more conns... */
428                 kranal_unlink_peer_locked(peer);
429         }
430
431         /* Reset RX timeout to ensure we wait for an incoming CLOSE for the
432          * full timeout.  If we get a CLOSE we know the peer has stopped all
433          * RDMA.  Otherwise if we wait for the full timeout we can also be sure
434          * all RDMA has stopped. */
435         conn->rac_last_rx = jiffies;
436         mb();
437
438         conn->rac_state = RANAL_CONN_CLOSING;
439         kranal_schedule_conn(conn);             /* schedule sending CLOSE */
440
441         kranal_conn_decref(conn);               /* lose peer's ref */
442 }
443
444 void
445 kranal_close_conn (kra_conn_t *conn, int error)
446 {
447         unsigned long    flags;
448
449
450         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
451
452         if (conn->rac_state == RANAL_CONN_ESTABLISHED)
453                 kranal_close_conn_locked(conn, error);
454
455         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
456 }
457
458 int
459 kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
460                        __u32 peer_ip, int peer_port)
461 {
462         kra_device_t  *dev = conn->rac_device;
463         unsigned long  flags;
464         RAP_RETURN     rrc;
465
466         /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive
467          * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */
468         conn->rac_last_tx = jiffies;
469         conn->rac_keepalive = 0;
470
471         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
472         if (rrc != RAP_SUCCESS) {
473                 CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
474                        HIPQUAD(peer_ip), peer_port, rrc);
475                 return -ECONNABORTED;
476         }
477
478         /* Schedule conn on rad_new_conns */
479         kranal_conn_addref(conn);
480         spin_lock_irqsave(&dev->rad_lock, flags);
481         list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns);
482         wake_up(&dev->rad_waitq);
483         spin_unlock_irqrestore(&dev->rad_lock, flags);
484
485         rrc = RapkWaitToConnect(conn->rac_rihandle);
486         if (rrc != RAP_SUCCESS) {
487                 CERROR("Error waiting to connect to %u.%u.%u.%u/%d: %d\n",
488                        HIPQUAD(peer_ip), peer_port, rrc);
489                 return -ECONNABORTED;
490         }
491
492         /* Scheduler doesn't touch conn apart from to deschedule and decref it
493          * after RapkCompleteSync() return success, so conn is all mine */
494
495         conn->rac_peerstamp = connreq->racr_peerstamp;
496         conn->rac_peer_connstamp = connreq->racr_connstamp;
497         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
498         kranal_update_reaper_timeout(conn->rac_keepalive);
499         return 0;
500 }
501
502 int
503 kranal_passive_conn_handshake (struct socket *sock, lnet_nid_t *src_nidp,
504                                lnet_nid_t *dst_nidp, kra_conn_t **connp)
505 {
506         __u32                peer_ip;
507         unsigned int         peer_port;
508         kra_connreq_t        rx_connreq;
509         kra_connreq_t        tx_connreq;
510         kra_conn_t          *conn;
511         kra_device_t        *dev;
512         int                  rc;
513         int                  i;
514
515         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
516         if (rc != 0) {
517                 CERROR("Can't get peer's IP: %d\n", rc);
518                 return rc;
519         }
520
521         rc = kranal_recv_connreq(sock, &rx_connreq, 0);
522
523         if (rc < 0) {
524                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
525                        HIPQUAD(peer_ip), peer_port, rc);
526                 return rc;
527         }
528
529         if (rc > 0) {
530                 /* Request from "new" peer: send reply with my MAGIC/VERSION to
531                  * tell her I'm old... */
532                 kranal_pack_connreq(&tx_connreq, NULL, LNET_NID_ANY);
533
534                 rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
535                                        lnet_acceptor_timeout());
536                 if (rc != 0)
537                         CERROR("Can't tx stub connreq to %u.%u.%u.%u/%d: %d\n",
538                                HIPQUAD(peer_ip), peer_port, rc);
539
540                 return -EPROTO;
541         }
542
543         for (i = 0;;i++) {
544                 if (i == kranal_data.kra_ndevs) {
545                         CERROR("Can't match dev %d from %u.%u.%u.%u/%d\n",
546                                rx_connreq.racr_devid, HIPQUAD(peer_ip), peer_port);
547                         return -ENODEV;
548                 }
549                 dev = &kranal_data.kra_devices[i];
550                 if (dev->rad_id == rx_connreq.racr_devid)
551                         break;
552         }
553
554         rc = kranal_create_conn(&conn, dev);
555         if (rc != 0)
556                 return rc;
557
558         kranal_pack_connreq(&tx_connreq, conn, rx_connreq.racr_srcnid);
559
560         rc = libcfs_sock_write(sock, &tx_connreq, sizeof(tx_connreq),
561                                lnet_acceptor_timeout());
562         if (rc != 0) {
563                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
564                        HIPQUAD(peer_ip), peer_port, rc);
565                 kranal_conn_decref(conn);
566                 return rc;
567         }
568
569         rc = kranal_set_conn_params(conn, &rx_connreq, peer_ip, peer_port);
570         if (rc != 0) {
571                 kranal_conn_decref(conn);
572                 return rc;
573         }
574
575         *connp = conn;
576         *src_nidp = rx_connreq.racr_srcnid;
577         *dst_nidp = rx_connreq.racr_dstnid;
578         return 0;
579 }
580
581 int
582 kranal_active_conn_handshake(kra_peer_t *peer,
583                              lnet_nid_t *dst_nidp, kra_conn_t **connp)
584 {
585         kra_connreq_t       connreq;
586         kra_conn_t         *conn;
587         kra_device_t       *dev;
588         struct socket      *sock;
589         int                 rc;
590         unsigned int        idx;
591
592         /* spread connections over all devices using both peer NIDs to ensure
593          * all nids use all devices */
594         idx = peer->rap_nid + kranal_data.kra_ni->ni_nid;
595         dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
596
597         rc = kranal_create_conn(&conn, dev);
598         if (rc != 0)
599                 return rc;
600
601         kranal_pack_connreq(&connreq, conn, peer->rap_nid);
602
603         if (the_lnet.ln_testprotocompat != 0) {
604                 /* single-shot proto test */
605                 LNET_LOCK();
606                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
607                         connreq.racr_version++;
608                         the_lnet.ln_testprotocompat &= ~1;
609                 }
610                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
611                         connreq.racr_magic = LNET_PROTO_MAGIC;
612                         the_lnet.ln_testprotocompat &= ~2;
613                 }
614                 LNET_UNLOCK();
615         }
616
617         rc = lnet_connect(&sock, peer->rap_nid,
618                          0, peer->rap_ip, peer->rap_port);
619         if (rc != 0)
620                 goto failed_0;
621
622         /* CAVEAT EMPTOR: the passive side receives with a SHORT rx timeout
623          * immediately after accepting a connection, so we connect and then
624          * send immediately. */
625
626         rc = libcfs_sock_write(sock, &connreq, sizeof(connreq),
627                                lnet_acceptor_timeout());
628         if (rc != 0) {
629                 CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n",
630                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
631                 goto failed_2;
632         }
633
634         rc = kranal_recv_connreq(sock, &connreq, 1);
635         if (rc != 0) {
636                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n",
637                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
638                 goto failed_2;
639         }
640
641         libcfs_sock_release(sock);
642         rc = -EPROTO;
643
644         if (connreq.racr_srcnid != peer->rap_nid) {
645                 CERROR("Unexpected srcnid from %u.%u.%u.%u/%d: "
646                        "received %s expected %s\n",
647                        HIPQUAD(peer->rap_ip), peer->rap_port,
648                        libcfs_nid2str(connreq.racr_srcnid), 
649                        libcfs_nid2str(peer->rap_nid));
650                 goto failed_1;
651         }
652
653         if (connreq.racr_devid != dev->rad_id) {
654                 CERROR("Unexpected device id from %u.%u.%u.%u/%d: "
655                        "received %d expected %d\n",
656                        HIPQUAD(peer->rap_ip), peer->rap_port,
657                        connreq.racr_devid, dev->rad_id);
658                 goto failed_1;
659         }
660
661         rc = kranal_set_conn_params(conn, &connreq,
662                                     peer->rap_ip, peer->rap_port);
663         if (rc != 0)
664                 goto failed_1;
665
666         *connp = conn;
667         *dst_nidp = connreq.racr_dstnid;
668         return 0;
669
670  failed_2:
671         libcfs_sock_release(sock);
672  failed_1:
673         lnet_connect_console_error(rc, peer->rap_nid,
674                                   peer->rap_ip, peer->rap_port);
675  failed_0:
676         kranal_conn_decref(conn);
677         return rc;
678 }
679
680 int
681 kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
682 {
683         kra_peer_t        *peer2;
684         kra_tx_t          *tx;
685         lnet_nid_t          peer_nid;
686         lnet_nid_t          dst_nid;
687         unsigned long      flags;
688         kra_conn_t        *conn;
689         int                rc;
690         int                nstale;
691         int                new_peer = 0;
692
693         if (sock == NULL) {
694                 /* active: connd wants to connect to 'peer' */
695                 LASSERT (peer != NULL);
696                 LASSERT (peer->rap_connecting);
697
698                 rc = kranal_active_conn_handshake(peer, &dst_nid, &conn);
699                 if (rc != 0)
700                         return rc;
701
702                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
703
704                 if (!kranal_peer_active(peer)) {
705                         /* raced with peer getting unlinked */
706                         write_unlock_irqrestore(&kranal_data.kra_global_lock,
707                                                 flags);
708                         kranal_conn_decref(conn);
709                         return -ESTALE;
710                 }
711
712                 peer_nid = peer->rap_nid;
713         } else {
714                 /* passive: listener accepted 'sock' */
715                 LASSERT (peer == NULL);
716
717                 rc = kranal_passive_conn_handshake(sock, &peer_nid,
718                                                    &dst_nid, &conn);
719                 if (rc != 0)
720                         return rc;
721
722                 /* assume this is a new peer */
723                 rc = kranal_create_peer(&peer, peer_nid);
724                 if (rc != 0) {
725                         CERROR("Can't create conn for %s\n", 
726                                libcfs_nid2str(peer_nid));
727                         kranal_conn_decref(conn);
728                         return -ENOMEM;
729                 }
730
731                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
732
733                 peer2 = kranal_find_peer_locked(peer_nid);
734                 if (peer2 == NULL) {
735                         new_peer = 1;
736                 } else {
737                         /* peer_nid already in the peer table */
738                         kranal_peer_decref(peer);
739                         peer = peer2;
740                 }
741         }
742
743         LASSERT ((!new_peer) != (!kranal_peer_active(peer)));
744
745         /* Refuse connection if peer thinks we are a different NID.  We check
746          * this while holding the global lock, to synch with connection
747          * destruction on NID change. */
748         if (!lnet_ptlcompat_matchnid(kranal_data.kra_ni->ni_nid, dst_nid)) {
749                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
750
751                 CERROR("Stale/bad connection with %s: dst_nid %s, expected %s\n",
752                        libcfs_nid2str(peer_nid), libcfs_nid2str(dst_nid), 
753                        libcfs_nid2str(kranal_data.kra_ni->ni_nid));
754                 rc = -ESTALE;
755                 goto failed;
756         }
757
758         /* Refuse to duplicate an existing connection (both sides might try to
759          * connect at once).  NB we return success!  We _are_ connected so we
760          * _don't_ have any blocked txs to complete with failure. */
761         rc = kranal_conn_isdup_locked(peer, conn);
762         if (rc != 0) {
763                 LASSERT (!list_empty(&peer->rap_conns));
764                 LASSERT (list_empty(&peer->rap_tx_queue));
765                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
766                 CWARN("Not creating duplicate connection to %s: %d\n",
767                       libcfs_nid2str(peer_nid), rc);
768                 rc = 0;
769                 goto failed;
770         }
771
772         if (new_peer) {
773                 /* peer table takes my ref on the new peer */
774                 list_add_tail(&peer->rap_list,
775                               kranal_nid2peerlist(peer_nid));
776         }
777
778         /* initialise timestamps before reaper looks at them */
779         conn->rac_last_tx = conn->rac_last_rx = jiffies;
780
781         kranal_peer_addref(peer);               /* +1 ref for conn */
782         conn->rac_peer = peer;
783         list_add_tail(&conn->rac_list, &peer->rap_conns);
784
785         kranal_conn_addref(conn);               /* +1 ref for conn table */
786         list_add_tail(&conn->rac_hashlist,
787                       kranal_cqid2connlist(conn->rac_cqid));
788
789         /* Schedule all packets blocking for a connection */
790         while (!list_empty(&peer->rap_tx_queue)) {
791                 tx = list_entry(peer->rap_tx_queue.next,
792                                 kra_tx_t, tx_list);
793
794                 list_del(&tx->tx_list);
795                 kranal_post_fma(conn, tx);
796         }
797
798         nstale = kranal_close_stale_conns_locked(peer, conn);
799
800         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
801
802         /* CAVEAT EMPTOR: passive peer can disappear NOW */
803
804         if (nstale != 0)
805                 CWARN("Closed %d stale conns to %s\n", nstale, 
806                       libcfs_nid2str(peer_nid));
807
808         CWARN("New connection to %s on devid[%d] = %d\n",
809                libcfs_nid2str(peer_nid), 
810                conn->rac_device->rad_idx, conn->rac_device->rad_id);
811
812         /* Ensure conn gets checked.  Transmits may have been queued and an
813          * FMA event may have happened before it got in the cq hash table */
814         kranal_schedule_conn(conn);
815         return 0;
816
817  failed:
818         if (new_peer)
819                 kranal_peer_decref(peer);
820         kranal_conn_decref(conn);
821         return rc;
822 }
823
824 void
825 kranal_connect (kra_peer_t *peer)
826 {
827         kra_tx_t          *tx;
828         unsigned long      flags;
829         struct list_head   zombies;
830         int                rc;
831
832         LASSERT (peer->rap_connecting);
833
834         CDEBUG(D_NET, "About to handshake %s\n", 
835                libcfs_nid2str(peer->rap_nid));
836
837         rc = kranal_conn_handshake(NULL, peer);
838
839         CDEBUG(D_NET, "Done handshake %s:%d \n", 
840                libcfs_nid2str(peer->rap_nid), rc);
841
842         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
843
844         LASSERT (peer->rap_connecting);
845         peer->rap_connecting = 0;
846
847         if (rc == 0) {
848                 /* kranal_conn_handshake() queues blocked txs immediately on
849                  * success to avoid messages jumping the queue */
850                 LASSERT (list_empty(&peer->rap_tx_queue));
851
852                 peer->rap_reconnect_interval = 0; /* OK to reconnect at any time */
853
854                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
855                 return;
856         }
857
858         peer->rap_reconnect_interval *= 2;
859         peer->rap_reconnect_interval =
860                 MAX(peer->rap_reconnect_interval,
861                     *kranal_tunables.kra_min_reconnect_interval);
862         peer->rap_reconnect_interval =
863                 MIN(peer->rap_reconnect_interval,
864                     *kranal_tunables.kra_max_reconnect_interval);
865
866         peer->rap_reconnect_time = jiffies + peer->rap_reconnect_interval * HZ;
867
868         /* Grab all blocked packets while we have the global lock */
869         list_add(&zombies, &peer->rap_tx_queue);
870         list_del_init(&peer->rap_tx_queue);
871
872         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
873
874         if (list_empty(&zombies))
875                 return;
876
877         CDEBUG(D_NETERROR, "Dropping packets for %s: connection failed\n",
878                libcfs_nid2str(peer->rap_nid));
879
880         do {
881                 tx = list_entry(zombies.next, kra_tx_t, tx_list);
882
883                 list_del(&tx->tx_list);
884                 kranal_tx_done(tx, -EHOSTUNREACH);
885
886         } while (!list_empty(&zombies));
887 }
888
889 void
890 kranal_free_acceptsock (kra_acceptsock_t *ras)
891 {
892         libcfs_sock_release(ras->ras_sock);
893         LIBCFS_FREE(ras, sizeof(*ras));
894 }
895
896 int
897 kranal_accept (lnet_ni_t *ni, struct socket *sock)
898 {
899         kra_acceptsock_t  *ras;
900         int                rc;
901         __u32              peer_ip;
902         int                peer_port;
903         unsigned long      flags;
904
905         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
906         LASSERT (rc == 0);                      /* we succeeded before */
907
908         LIBCFS_ALLOC(ras, sizeof(*ras));
909         if (ras == NULL) {
910                 CERROR("ENOMEM allocating connection request from "
911                        "%u.%u.%u.%u\n", HIPQUAD(peer_ip));
912                 return -ENOMEM;
913         }
914
915         ras->ras_sock = sock;
916
917         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
918
919         list_add_tail(&ras->ras_list, &kranal_data.kra_connd_acceptq);
920         wake_up(&kranal_data.kra_connd_waitq);
921
922         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
923         return 0;
924 }
925
926 int
927 kranal_create_peer (kra_peer_t **peerp, lnet_nid_t nid)
928 {
929         kra_peer_t    *peer;
930         unsigned long  flags;
931
932         LASSERT (nid != LNET_NID_ANY);
933
934         LIBCFS_ALLOC(peer, sizeof(*peer));
935         if (peer == NULL)
936                 return -ENOMEM;
937
938         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
939
940         peer->rap_nid = nid;
941         atomic_set(&peer->rap_refcount, 1);     /* 1 ref for caller */
942
943         INIT_LIST_HEAD(&peer->rap_list);
944         INIT_LIST_HEAD(&peer->rap_connd_list);
945         INIT_LIST_HEAD(&peer->rap_conns);
946         INIT_LIST_HEAD(&peer->rap_tx_queue);
947
948         peer->rap_reconnect_interval = 0;       /* OK to connect at any time */
949
950         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
951
952         if (kranal_data.kra_nonewpeers) {
953                 /* shutdown has started already */
954                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
955                 
956                 LIBCFS_FREE(peer, sizeof(*peer));
957                 CERROR("Can't create peer: network shutdown\n");
958                 return -ESHUTDOWN;
959         }
960
961         atomic_inc(&kranal_data.kra_npeers);
962
963         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
964
965         *peerp = peer;
966         return 0;
967 }
968
969 void
970 kranal_destroy_peer (kra_peer_t *peer)
971 {
972         CDEBUG(D_NET, "peer %s %p deleted\n", 
973                libcfs_nid2str(peer->rap_nid), peer);
974
975         LASSERT (atomic_read(&peer->rap_refcount) == 0);
976         LASSERT (peer->rap_persistence == 0);
977         LASSERT (!kranal_peer_active(peer));
978         LASSERT (!peer->rap_connecting);
979         LASSERT (list_empty(&peer->rap_conns));
980         LASSERT (list_empty(&peer->rap_tx_queue));
981         LASSERT (list_empty(&peer->rap_connd_list));
982
983         LIBCFS_FREE(peer, sizeof(*peer));
984
985         /* NB a peer's connections keep a reference on their peer until
986          * they are destroyed, so we can be assured that _all_ state to do
987          * with this peer has been cleaned up when its refcount drops to
988          * zero. */
989         atomic_dec(&kranal_data.kra_npeers);
990 }
991
992 kra_peer_t *
993 kranal_find_peer_locked (lnet_nid_t nid)
994 {
995         struct list_head *peer_list = kranal_nid2peerlist(nid);
996         struct list_head *tmp;
997         kra_peer_t       *peer;
998
999         list_for_each (tmp, peer_list) {
1000
1001                 peer = list_entry(tmp, kra_peer_t, rap_list);
1002
1003                 LASSERT (peer->rap_persistence > 0 ||     /* persistent peer */
1004                          !list_empty(&peer->rap_conns));  /* active conn */
1005
1006                 if (peer->rap_nid != nid)
1007                         continue;
1008
1009                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
1010                        peer, libcfs_nid2str(nid), 
1011                        atomic_read(&peer->rap_refcount));
1012                 return peer;
1013         }
1014         return NULL;
1015 }
1016
1017 kra_peer_t *
1018 kranal_find_peer (lnet_nid_t nid)
1019 {
1020         kra_peer_t     *peer;
1021
1022         read_lock(&kranal_data.kra_global_lock);
1023         peer = kranal_find_peer_locked(nid);
1024         if (peer != NULL)                       /* +1 ref for caller? */
1025                 kranal_peer_addref(peer);
1026         read_unlock(&kranal_data.kra_global_lock);
1027
1028         return peer;
1029 }
1030
1031 void
1032 kranal_unlink_peer_locked (kra_peer_t *peer)
1033 {
1034         LASSERT (peer->rap_persistence == 0);
1035         LASSERT (list_empty(&peer->rap_conns));
1036
1037         LASSERT (kranal_peer_active(peer));
1038         list_del_init(&peer->rap_list);
1039
1040         /* lose peerlist's ref */
1041         kranal_peer_decref(peer);
1042 }
1043
1044 int
1045 kranal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,
1046                       int *persistencep)
1047 {
1048         kra_peer_t        *peer;
1049         struct list_head  *ptmp;
1050         int                i;
1051
1052         read_lock(&kranal_data.kra_global_lock);
1053
1054         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1055
1056                 list_for_each(ptmp, &kranal_data.kra_peers[i]) {
1057
1058                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1059                         LASSERT (peer->rap_persistence > 0 ||
1060                                  !list_empty(&peer->rap_conns));
1061
1062                         if (index-- > 0)
1063                                 continue;
1064
1065                         *nidp = peer->rap_nid;
1066                         *ipp = peer->rap_ip;
1067                         *portp = peer->rap_port;
1068                         *persistencep = peer->rap_persistence;
1069
1070                         read_unlock(&kranal_data.kra_global_lock);
1071                         return 0;
1072                 }
1073         }
1074
1075         read_unlock(&kranal_data.kra_global_lock);
1076         return -ENOENT;
1077 }
1078
1079 int
1080 kranal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port)
1081 {
1082         unsigned long      flags;
1083         kra_peer_t        *peer;
1084         kra_peer_t        *peer2;
1085         int                rc;
1086
1087         if (nid == LNET_NID_ANY)
1088                 return -EINVAL;
1089
1090         rc = kranal_create_peer(&peer, nid);
1091         if (rc != 0)
1092                 return rc;
1093
1094         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1095
1096         peer2 = kranal_find_peer_locked(nid);
1097         if (peer2 != NULL) {
1098                 kranal_peer_decref(peer);
1099                 peer = peer2;
1100         } else {
1101                 /* peer table takes existing ref on peer */
1102                 list_add_tail(&peer->rap_list,
1103                               kranal_nid2peerlist(nid));
1104         }
1105
1106         peer->rap_ip = ip;
1107         peer->rap_port = port;
1108         peer->rap_persistence++;
1109
1110         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1111         return 0;
1112 }
1113
1114 void
1115 kranal_del_peer_locked (kra_peer_t *peer)
1116 {
1117         struct list_head *ctmp;
1118         struct list_head *cnxt;
1119         kra_conn_t       *conn;
1120
1121         peer->rap_persistence = 0;
1122
1123         if (list_empty(&peer->rap_conns)) {
1124                 kranal_unlink_peer_locked(peer);
1125         } else {
1126                 list_for_each_safe(ctmp, cnxt, &peer->rap_conns) {
1127                         conn = list_entry(ctmp, kra_conn_t, rac_list);
1128
1129                         kranal_close_conn_locked(conn, 0);
1130                 }
1131                 /* peer unlinks itself when last conn is closed */
1132         }
1133 }
1134
1135 int
1136 kranal_del_peer (lnet_nid_t nid)
1137 {
1138         unsigned long      flags;
1139         struct list_head  *ptmp;
1140         struct list_head  *pnxt;
1141         kra_peer_t        *peer;
1142         int                lo;
1143         int                hi;
1144         int                i;
1145         int                rc = -ENOENT;
1146
1147         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1148
1149         if (nid != LNET_NID_ANY)
1150                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1151         else {
1152                 lo = 0;
1153                 hi = kranal_data.kra_peer_hash_size - 1;
1154         }
1155
1156         for (i = lo; i <= hi; i++) {
1157                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1158                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1159                         LASSERT (peer->rap_persistence > 0 ||
1160                                  !list_empty(&peer->rap_conns));
1161
1162                         if (!(nid == LNET_NID_ANY || peer->rap_nid == nid))
1163                                 continue;
1164
1165                         kranal_del_peer_locked(peer);
1166                         rc = 0;         /* matched something */
1167                 }
1168         }
1169
1170         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1171
1172         return rc;
1173 }
1174
1175 kra_conn_t *
1176 kranal_get_conn_by_idx (int index)
1177 {
1178         kra_peer_t        *peer;
1179         struct list_head  *ptmp;
1180         kra_conn_t        *conn;
1181         struct list_head  *ctmp;
1182         int                i;
1183
1184         read_lock (&kranal_data.kra_global_lock);
1185
1186         for (i = 0; i < kranal_data.kra_peer_hash_size; i++) {
1187                 list_for_each (ptmp, &kranal_data.kra_peers[i]) {
1188
1189                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1190                         LASSERT (peer->rap_persistence > 0 ||
1191                                  !list_empty(&peer->rap_conns));
1192
1193                         list_for_each (ctmp, &peer->rap_conns) {
1194                                 if (index-- > 0)
1195                                         continue;
1196
1197                                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1198                                 CDEBUG(D_NET, "++conn[%p] -> %s (%d)\n", conn, 
1199                                        libcfs_nid2str(conn->rac_peer->rap_nid),
1200                                        atomic_read(&conn->rac_refcount));
1201                                 atomic_inc(&conn->rac_refcount);
1202                                 read_unlock(&kranal_data.kra_global_lock);
1203                                 return conn;
1204                         }
1205                 }
1206         }
1207
1208         read_unlock(&kranal_data.kra_global_lock);
1209         return NULL;
1210 }
1211
1212 int
1213 kranal_close_peer_conns_locked (kra_peer_t *peer, int why)
1214 {
1215         kra_conn_t         *conn;
1216         struct list_head   *ctmp;
1217         struct list_head   *cnxt;
1218         int                 count = 0;
1219
1220         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
1221                 conn = list_entry(ctmp, kra_conn_t, rac_list);
1222
1223                 count++;
1224                 kranal_close_conn_locked(conn, why);
1225         }
1226
1227         return count;
1228 }
1229
1230 int
1231 kranal_close_matching_conns (lnet_nid_t nid)
1232 {
1233         unsigned long       flags;
1234         kra_peer_t         *peer;
1235         struct list_head   *ptmp;
1236         struct list_head   *pnxt;
1237         int                 lo;
1238         int                 hi;
1239         int                 i;
1240         int                 count = 0;
1241
1242         write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1243
1244         if (nid != LNET_NID_ANY)
1245                 lo = hi = kranal_nid2peerlist(nid) - kranal_data.kra_peers;
1246         else {
1247                 lo = 0;
1248                 hi = kranal_data.kra_peer_hash_size - 1;
1249         }
1250
1251         for (i = lo; i <= hi; i++) {
1252                 list_for_each_safe (ptmp, pnxt, &kranal_data.kra_peers[i]) {
1253
1254                         peer = list_entry(ptmp, kra_peer_t, rap_list);
1255                         LASSERT (peer->rap_persistence > 0 ||
1256                                  !list_empty(&peer->rap_conns));
1257
1258                         if (!(nid == LNET_NID_ANY || nid == peer->rap_nid))
1259                                 continue;
1260
1261                         count += kranal_close_peer_conns_locked(peer, 0);
1262                 }
1263         }
1264
1265         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1266
1267         /* wildcards always succeed */
1268         if (nid == LNET_NID_ANY)
1269                 return 0;
1270
1271         return (count == 0) ? -ENOENT : 0;
1272 }
1273
1274 int
1275 kranal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1276 {
1277         struct libcfs_ioctl_data *data = arg;
1278         int                       rc = -EINVAL;
1279
1280         LASSERT (ni == kranal_data.kra_ni);
1281
1282         switch(cmd) {
1283         case IOC_LIBCFS_GET_PEER: {
1284                 lnet_nid_t   nid = 0;
1285                 __u32       ip = 0;
1286                 int         port = 0;
1287                 int         share_count = 0;
1288
1289                 rc = kranal_get_peer_info(data->ioc_count,
1290                                           &nid, &ip, &port, &share_count);
1291                 data->ioc_nid    = nid;
1292                 data->ioc_count  = share_count;
1293                 data->ioc_u32[0] = ip;
1294                 data->ioc_u32[1] = port;
1295                 break;
1296         }
1297         case IOC_LIBCFS_ADD_PEER: {
1298                 rc = kranal_add_persistent_peer(data->ioc_nid,
1299                                                 data->ioc_u32[0], /* IP */
1300                                                 data->ioc_u32[1]); /* port */
1301                 break;
1302         }
1303         case IOC_LIBCFS_DEL_PEER: {
1304                 rc = kranal_del_peer(data->ioc_nid);
1305                 break;
1306         }
1307         case IOC_LIBCFS_GET_CONN: {
1308                 kra_conn_t *conn = kranal_get_conn_by_idx(data->ioc_count);
1309
1310                 if (conn == NULL)
1311                         rc = -ENOENT;
1312                 else {
1313                         rc = 0;
1314                         data->ioc_nid    = conn->rac_peer->rap_nid;
1315                         data->ioc_u32[0] = conn->rac_device->rad_id;
1316                         kranal_conn_decref(conn);
1317                 }
1318                 break;
1319         }
1320         case IOC_LIBCFS_CLOSE_CONNECTION: {
1321                 rc = kranal_close_matching_conns(data->ioc_nid);
1322                 break;
1323         }
1324         case IOC_LIBCFS_REGISTER_MYNID: {
1325                 /* Ignore if this is a noop */
1326                 if (data->ioc_nid == ni->ni_nid) {
1327                         rc = 0;
1328                 } else {
1329                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1330                                libcfs_nid2str(data->ioc_nid),
1331                                libcfs_nid2str(ni->ni_nid));
1332                         rc = -EINVAL;
1333                 }
1334                 break;
1335         }
1336         }
1337
1338         return rc;
1339 }
1340
1341 void
1342 kranal_free_txdescs(struct list_head *freelist)
1343 {
1344         kra_tx_t    *tx;
1345
1346         while (!list_empty(freelist)) {
1347                 tx = list_entry(freelist->next, kra_tx_t, tx_list);
1348
1349                 list_del(&tx->tx_list);
1350                 LIBCFS_FREE(tx->tx_phys, LNET_MAX_IOV * sizeof(*tx->tx_phys));
1351                 LIBCFS_FREE(tx, sizeof(*tx));
1352         }
1353 }
1354
1355 int
1356 kranal_alloc_txdescs(struct list_head *freelist, int n)
1357 {
1358         int            i;
1359         kra_tx_t      *tx;
1360
1361         LASSERT (freelist == &kranal_data.kra_idle_txs);
1362         LASSERT (list_empty(freelist));
1363
1364         for (i = 0; i < n; i++) {
1365
1366                 LIBCFS_ALLOC(tx, sizeof(*tx));
1367                 if (tx == NULL) {
1368                         CERROR("Can't allocate tx[%d]\n", i);
1369                         kranal_free_txdescs(freelist);
1370                         return -ENOMEM;
1371                 }
1372
1373                 LIBCFS_ALLOC(tx->tx_phys,
1374                              LNET_MAX_IOV * sizeof(*tx->tx_phys));
1375                 if (tx->tx_phys == NULL) {
1376                         CERROR("Can't allocate tx[%d]->tx_phys\n", i);
1377
1378                         LIBCFS_FREE(tx, sizeof(*tx));
1379                         kranal_free_txdescs(freelist);
1380                         return -ENOMEM;
1381                 }
1382
1383                 tx->tx_buftype = RANAL_BUF_NONE;
1384                 tx->tx_msg.ram_type = RANAL_MSG_NONE;
1385
1386                 list_add(&tx->tx_list, freelist);
1387         }
1388
1389         return 0;
1390 }
1391
1392 int
1393 kranal_device_init(int id, kra_device_t *dev)
1394 {
1395         int               total_ntx = *kranal_tunables.kra_ntx;
1396         RAP_RETURN        rrc;
1397
1398         dev->rad_id = id;
1399         rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
1400                                    &dev->rad_handle);
1401         if (rrc != RAP_SUCCESS) {
1402                 CERROR("Can't get Rapidarray Device %d: %d\n", id, rrc);
1403                 goto failed_0;
1404         }
1405
1406         rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
1407         if (rrc != RAP_SUCCESS) {
1408                 CERROR("Can't reserve %d RDMA descriptors"
1409                        " for device %d: %d\n", total_ntx, id, rrc);
1410                 goto failed_1;
1411         }
1412
1413         rrc = RapkCreateCQ(dev->rad_handle, total_ntx, RAP_CQTYPE_SEND,
1414                            &dev->rad_rdma_cqh);
1415         if (rrc != RAP_SUCCESS) {
1416                 CERROR("Can't create rdma cq size %d for device %d: %d\n",
1417                        total_ntx, id, rrc);
1418                 goto failed_1;
1419         }
1420
1421         rrc = RapkCreateCQ(dev->rad_handle, 
1422                            *kranal_tunables.kra_fma_cq_size, 
1423                            RAP_CQTYPE_RECV, &dev->rad_fma_cqh);
1424         if (rrc != RAP_SUCCESS) {
1425                 CERROR("Can't create fma cq size %d for device %d: %d\n", 
1426                        *kranal_tunables.kra_fma_cq_size, id, rrc);
1427                 goto failed_2;
1428         }
1429
1430         return 0;
1431
1432  failed_2:
1433         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1434  failed_1:
1435         RapkReleaseDevice(dev->rad_handle);
1436  failed_0:
1437         return -ENODEV;
1438 }
1439
1440 void
1441 kranal_device_fini(kra_device_t *dev)
1442 {
1443         LASSERT (list_empty(&dev->rad_ready_conns));
1444         LASSERT (list_empty(&dev->rad_new_conns));
1445         LASSERT (dev->rad_nphysmap == 0);
1446         LASSERT (dev->rad_nppphysmap == 0);
1447         LASSERT (dev->rad_nvirtmap == 0);
1448         LASSERT (dev->rad_nobvirtmap == 0);
1449                 
1450         LASSERT(dev->rad_scheduler == NULL);
1451         RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cqh);
1452         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cqh);
1453         RapkReleaseDevice(dev->rad_handle);
1454 }
1455
1456 void
1457 kranal_shutdown (lnet_ni_t *ni)
1458 {
1459         int           i;
1460         unsigned long flags;
1461
1462         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1463                atomic_read(&libcfs_kmemory));
1464
1465         LASSERT (ni == kranal_data.kra_ni);
1466         LASSERT (ni->ni_data == &kranal_data);
1467
1468         switch (kranal_data.kra_init) {
1469         default:
1470                 CERROR("Unexpected state %d\n", kranal_data.kra_init);
1471                 LBUG();
1472
1473         case RANAL_INIT_ALL:
1474                 /* Prevent new peers from being created */
1475                 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1476                 kranal_data.kra_nonewpeers = 1;
1477                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1478                 
1479                 /* Remove all existing peers from the peer table */
1480                 kranal_del_peer(LNET_NID_ANY);
1481
1482                 /* Wait for pending conn reqs to be handled */
1483                 i = 2;
1484                 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1485                 while (!list_empty(&kranal_data.kra_connd_acceptq)) {
1486                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, 
1487                                                flags);
1488                         i++;
1489                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1490                                "waiting for conn reqs to clean up\n");
1491                         cfs_pause(cfs_time_seconds(1));
1492
1493                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1494                 }
1495                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1496
1497                 /* Wait for all peers to be freed */
1498                 i = 2;
1499                 while (atomic_read(&kranal_data.kra_npeers) != 0) {
1500                         i++;
1501                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n */
1502                                "waiting for %d peers to close down\n",
1503                                atomic_read(&kranal_data.kra_npeers));
1504                         cfs_pause(cfs_time_seconds(1));
1505                 }
1506                 /* fall through */
1507
1508         case RANAL_INIT_DATA:
1509                 break;
1510         }
1511
1512         /* Peer state all cleaned up BEFORE setting shutdown, so threads don't
1513          * have to worry about shutdown races.  NB connections may be created
1514          * while there are still active connds, but these will be temporary
1515          * since peer creation always fails after the listener has started to
1516          * shut down. */
1517         LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
1518         
1519         /* Flag threads to terminate */
1520         kranal_data.kra_shutdown = 1;
1521
1522         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1523                 kra_device_t *dev = &kranal_data.kra_devices[i];
1524
1525                 spin_lock_irqsave(&dev->rad_lock, flags);
1526                 wake_up(&dev->rad_waitq);
1527                 spin_unlock_irqrestore(&dev->rad_lock, flags);
1528         }
1529
1530         spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1531         wake_up_all(&kranal_data.kra_reaper_waitq);
1532         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1533
1534         LASSERT (list_empty(&kranal_data.kra_connd_peers));
1535         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1536         wake_up_all(&kranal_data.kra_connd_waitq);
1537         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1538
1539         /* Wait for threads to exit */
1540         i = 2;
1541         while (atomic_read(&kranal_data.kra_nthreads) != 0) {
1542                 i++;
1543                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1544                        "Waiting for %d threads to terminate\n",
1545                        atomic_read(&kranal_data.kra_nthreads));
1546                 cfs_pause(cfs_time_seconds(1));
1547         }
1548
1549         LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
1550         if (kranal_data.kra_peers != NULL) {
1551                 for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1552                         LASSERT (list_empty(&kranal_data.kra_peers[i]));
1553
1554                 LIBCFS_FREE(kranal_data.kra_peers,
1555                             sizeof (struct list_head) *
1556                             kranal_data.kra_peer_hash_size);
1557         }
1558
1559         LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
1560         if (kranal_data.kra_conns != NULL) {
1561                 for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1562                         LASSERT (list_empty(&kranal_data.kra_conns[i]));
1563
1564                 LIBCFS_FREE(kranal_data.kra_conns,
1565                             sizeof (struct list_head) *
1566                             kranal_data.kra_conn_hash_size);
1567         }
1568
1569         for (i = 0; i < kranal_data.kra_ndevs; i++)
1570                 kranal_device_fini(&kranal_data.kra_devices[i]);
1571
1572         kranal_free_txdescs(&kranal_data.kra_idle_txs);
1573
1574         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1575                atomic_read(&libcfs_kmemory));
1576
1577         kranal_data.kra_init = RANAL_INIT_NOTHING;
1578         PORTAL_MODULE_UNUSE;
1579 }
1580
1581 int
1582 kranal_startup (lnet_ni_t *ni)
1583 {
1584         struct timeval    tv;
1585         int               pkmem = atomic_read(&libcfs_kmemory);
1586         int               rc;
1587         int               i;
1588         kra_device_t     *dev;
1589
1590         LASSERT (ni->ni_lnd == &the_kralnd);
1591
1592         /* Only 1 instance supported */
1593         if (kranal_data.kra_init != RANAL_INIT_NOTHING) {
1594                 CERROR ("Only 1 instance supported\n");
1595                 return -EPERM;
1596         }
1597
1598         if (lnet_set_ip_niaddr(ni) != 0) {
1599                 CERROR ("Can't determine my NID\n");
1600                 return -EPERM;
1601         }
1602
1603         if (*kranal_tunables.kra_credits > *kranal_tunables.kra_ntx) {
1604                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1605                         *kranal_tunables.kra_credits,
1606                         *kranal_tunables.kra_ntx);
1607                 return -EINVAL;
1608         }
1609         
1610         memset(&kranal_data, 0, sizeof(kranal_data)); /* zero pointers, flags etc */
1611
1612         ni->ni_maxtxcredits = *kranal_tunables.kra_credits;
1613         ni->ni_peertxcredits = *kranal_tunables.kra_peercredits;
1614
1615         ni->ni_data = &kranal_data;
1616         kranal_data.kra_ni = ni;
1617
1618         /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
1619          * a unique (for all time) connstamp so we can uniquely identify
1620          * the sender.  The connstamp is an incrementing counter
1621          * initialised with seconds + microseconds at startup time.  So we
1622          * rely on NOT creating connections more frequently on average than
1623          * 1MHz to ensure we don't use old connstamps when we reboot. */
1624         do_gettimeofday(&tv);
1625         kranal_data.kra_connstamp =
1626         kranal_data.kra_peerstamp = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1627
1628         rwlock_init(&kranal_data.kra_global_lock);
1629
1630         for (i = 0; i < RANAL_MAXDEVS; i++ ) {
1631                 kra_device_t  *dev = &kranal_data.kra_devices[i];
1632
1633                 dev->rad_idx = i;
1634                 INIT_LIST_HEAD(&dev->rad_ready_conns);
1635                 INIT_LIST_HEAD(&dev->rad_new_conns);
1636                 init_waitqueue_head(&dev->rad_waitq);
1637                 spin_lock_init(&dev->rad_lock);
1638         }
1639
1640         kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1641         init_waitqueue_head(&kranal_data.kra_reaper_waitq);
1642         spin_lock_init(&kranal_data.kra_reaper_lock);
1643
1644         INIT_LIST_HEAD(&kranal_data.kra_connd_acceptq);
1645         INIT_LIST_HEAD(&kranal_data.kra_connd_peers);
1646         init_waitqueue_head(&kranal_data.kra_connd_waitq);
1647         spin_lock_init(&kranal_data.kra_connd_lock);
1648
1649         INIT_LIST_HEAD(&kranal_data.kra_idle_txs);
1650         spin_lock_init(&kranal_data.kra_tx_lock);
1651
1652         /* OK to call kranal_api_shutdown() to cleanup now */
1653         kranal_data.kra_init = RANAL_INIT_DATA;
1654         PORTAL_MODULE_USE;
1655
1656         kranal_data.kra_peer_hash_size = RANAL_PEER_HASH_SIZE;
1657         LIBCFS_ALLOC(kranal_data.kra_peers,
1658                      sizeof(struct list_head) * kranal_data.kra_peer_hash_size);
1659         if (kranal_data.kra_peers == NULL)
1660                 goto failed;
1661
1662         for (i = 0; i < kranal_data.kra_peer_hash_size; i++)
1663                 INIT_LIST_HEAD(&kranal_data.kra_peers[i]);
1664
1665         kranal_data.kra_conn_hash_size = RANAL_PEER_HASH_SIZE;
1666         LIBCFS_ALLOC(kranal_data.kra_conns,
1667                      sizeof(struct list_head) * kranal_data.kra_conn_hash_size);
1668         if (kranal_data.kra_conns == NULL)
1669                 goto failed;
1670
1671         for (i = 0; i < kranal_data.kra_conn_hash_size; i++)
1672                 INIT_LIST_HEAD(&kranal_data.kra_conns[i]);
1673
1674         rc = kranal_alloc_txdescs(&kranal_data.kra_idle_txs, 
1675                                   *kranal_tunables.kra_ntx);
1676         if (rc != 0)
1677                 goto failed;
1678
1679         rc = kranal_thread_start(kranal_reaper, NULL);
1680         if (rc != 0) {
1681                 CERROR("Can't spawn ranal reaper: %d\n", rc);
1682                 goto failed;
1683         }
1684
1685         for (i = 0; i < *kranal_tunables.kra_n_connd; i++) {
1686                 rc = kranal_thread_start(kranal_connd, (void *)(unsigned long)i);
1687                 if (rc != 0) {
1688                         CERROR("Can't spawn ranal connd[%d]: %d\n",
1689                                i, rc);
1690                         goto failed;
1691                 }
1692         }
1693
1694         LASSERT (kranal_data.kra_ndevs == 0);
1695
1696         /* Use all available RapidArray devices */
1697         for (i = 0; i < RANAL_MAXDEVS; i++) {
1698                 dev = &kranal_data.kra_devices[kranal_data.kra_ndevs];
1699
1700                 rc = kranal_device_init(kranal_devids[i], dev);
1701                 if (rc == 0)
1702                         kranal_data.kra_ndevs++;
1703         }
1704
1705         if (kranal_data.kra_ndevs == 0) {
1706                 CERROR("Can't initialise any RapidArray devices\n");
1707                 goto failed;
1708         }
1709         
1710         for (i = 0; i < kranal_data.kra_ndevs; i++) {
1711                 dev = &kranal_data.kra_devices[i];
1712                 rc = kranal_thread_start(kranal_scheduler, dev);
1713                 if (rc != 0) {
1714                         CERROR("Can't spawn ranal scheduler[%d]: %d\n",
1715                                i, rc);
1716                         goto failed;
1717                 }
1718         }
1719
1720         /* flag everything initialised */
1721         kranal_data.kra_init = RANAL_INIT_ALL;
1722         /*****************************************************/
1723
1724         CDEBUG(D_MALLOC, "initial kmem %d\n", pkmem);
1725         return 0;
1726
1727  failed:
1728         kranal_shutdown(ni);
1729         return -ENETDOWN;
1730 }
1731
1732 void __exit
1733 kranal_module_fini (void)
1734 {
1735         lnet_unregister_lnd(&the_kralnd);
1736         kranal_tunables_fini();
1737 }
1738
1739 int __init
1740 kranal_module_init (void)
1741 {
1742         int    rc;
1743
1744         rc = kranal_tunables_init();
1745         if (rc != 0)
1746                 return rc;
1747
1748         lnet_register_lnd(&the_kralnd);
1749
1750         return 0;
1751 }
1752
1753 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1754 MODULE_DESCRIPTION("Kernel RapidArray LND v0.01");
1755 MODULE_LICENSE("GPL");
1756
1757 module_init(kranal_module_init);
1758 module_exit(kranal_module_fini);