Whamcloud - gitweb
b=21619 hash MEs on RDMA portal
[fs/lustre-release.git] / lnet / lnet / acceptor.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38 #include <lnet/lib-lnet.h>
39
40 #ifdef __KERNEL__
41 static char *accept = "secure";
42 CFS_MODULE_PARM(accept, "s", charp, 0444,
43                 "Accept connections (secure|all|none)");
44
45 static int accept_port = 988;
46 CFS_MODULE_PARM(accept_port, "i", int, 0444,
47                 "Acceptor's port (same on all nodes)");
48
49 static int accept_backlog = 127;
50 CFS_MODULE_PARM(accept_backlog, "i", int, 0444,
51                 "Acceptor's listen backlog");
52
53 static int accept_timeout = 5;
54 CFS_MODULE_PARM(accept_timeout, "i", int, 0644,
55                 "Acceptor's timeout (seconds)");
56
57 struct {
58         int               pta_shutdown;
59         cfs_socket_t     *pta_sock;
60         struct semaphore  pta_signal;
61 } lnet_acceptor_state;
62
63 int
64 lnet_acceptor_timeout(void)
65 {
66         return accept_timeout;
67 }
68 EXPORT_SYMBOL(lnet_acceptor_timeout);
69
70 int
71 lnet_acceptor_port(void)
72 {
73         return accept_port;
74 }
75 EXPORT_SYMBOL(lnet_acceptor_port);
76
77 void
78 lnet_connect_console_error (int rc, lnet_nid_t peer_nid, 
79                            __u32 peer_ip, int peer_port)
80 {
81         switch (rc) {
82         /* "normal" errors */
83         case -ECONNREFUSED:
84                 CNETERR("Connection to %s at host %u.%u.%u.%u on port %d was "
85                         "refused: check that Lustre is running on that node.\n",
86                         libcfs_nid2str(peer_nid),
87                         HIPQUAD(peer_ip), peer_port);
88                 break;
89         case -EHOSTUNREACH:
90         case -ENETUNREACH:
91                 CNETERR("Connection to %s at host %u.%u.%u.%u "
92                         "was unreachable: the network or that node may "
93                         "be down, or Lustre may be misconfigured.\n",
94                         libcfs_nid2str(peer_nid), HIPQUAD(peer_ip));
95                 break;
96         case -ETIMEDOUT:
97                 CNETERR("Connection to %s at host %u.%u.%u.%u on "
98                         "port %d took too long: that node may be hung "
99                         "or experiencing high load.\n",
100                         libcfs_nid2str(peer_nid),
101                         HIPQUAD(peer_ip), peer_port);
102                 break;
103         case -ECONNRESET:
104                 LCONSOLE_ERROR_MSG(0x11b, "Connection to %s at host %u.%u.%u.%u"
105                                    " on port %d was reset: "
106                                    "is it running a compatible version of "
107                                    "Lustre and is %s one of its NIDs?\n",
108                                    libcfs_nid2str(peer_nid),
109                                    HIPQUAD(peer_ip), peer_port,
110                                    libcfs_nid2str(peer_nid));
111                 break;
112         case -EPROTO:
113                 LCONSOLE_ERROR_MSG(0x11c, "Protocol error connecting to %s at "
114                                    "host %u.%u.%u.%u on port %d: is it running "
115                                    "a compatible version of Lustre?\n",
116                                    libcfs_nid2str(peer_nid),
117                                    HIPQUAD(peer_ip), peer_port);
118                 break;
119         case -EADDRINUSE:
120                 LCONSOLE_ERROR_MSG(0x11d, "No privileged ports available to "
121                                    "connect to %s at host %u.%u.%u.%u on port "
122                                    "%d\n", libcfs_nid2str(peer_nid),
123                                    HIPQUAD(peer_ip), peer_port);
124                 break;
125         default:
126                 LCONSOLE_ERROR_MSG(0x11e, "Unexpected error %d connecting to %s"
127                                    " at host %u.%u.%u.%u on port %d\n", rc,
128                                    libcfs_nid2str(peer_nid),
129                                    HIPQUAD(peer_ip), peer_port);
130                 break;
131         }
132 }
133 EXPORT_SYMBOL(lnet_connect_console_error);
134
135 int
136 lnet_connect(cfs_socket_t **sockp, lnet_nid_t peer_nid,
137             __u32 local_ip, __u32 peer_ip, int peer_port)
138 {
139         lnet_acceptor_connreq_t cr;
140         cfs_socket_t           *sock;
141         int                     rc;
142         int                     port;
143         int                     fatal;
144
145         CLASSERT (sizeof(cr) <= 16);            /* not too big to be on the stack */
146
147         for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; 
148              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; 
149              --port) {
150                 /* Iterate through reserved ports. */
151
152                 rc = libcfs_sock_connect(&sock, &fatal, 
153                                          local_ip, port, 
154                                          peer_ip, peer_port);
155                 if (rc != 0) {
156                         if (fatal)
157                                 goto failed;
158                         continue;
159                 }
160
161                 CLASSERT (LNET_PROTO_ACCEPTOR_VERSION == 1);
162
163                 if (the_lnet.ln_ptlcompat != 2) {
164                         /* When portals compatibility is "strong", simply
165                          * connect (i.e. send no acceptor connection request).
166                          * Othewise send an acceptor connection request. I can
167                          * have no portals peers so everyone else should
168                          * understand my protocol. */
169                         cr.acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
170                         cr.acr_version = LNET_PROTO_ACCEPTOR_VERSION;
171                         cr.acr_nid     = peer_nid;
172
173                         if (the_lnet.ln_testprotocompat != 0) {
174                                 /* single-shot proto check */
175                                 LNET_LOCK();
176                                 if ((the_lnet.ln_testprotocompat & 4) != 0) {
177                                         cr.acr_version++;
178                                         the_lnet.ln_testprotocompat &= ~4;
179                                 }
180                                 if ((the_lnet.ln_testprotocompat & 8) != 0) {
181                                         cr.acr_magic = LNET_PROTO_MAGIC;
182                                         the_lnet.ln_testprotocompat &= ~8;
183                                 }
184                                 LNET_UNLOCK();
185                         }
186
187                         rc = libcfs_sock_write(sock, &cr, sizeof(cr),
188                                                accept_timeout);
189                         if (rc != 0)
190                                 goto failed_sock;
191                 }
192                 
193                 *sockp = sock;
194                 return 0;
195         }
196
197         rc = -EADDRINUSE;
198         goto failed;
199         
200  failed_sock:
201         libcfs_sock_release(sock);
202  failed:
203         lnet_connect_console_error(rc, peer_nid, peer_ip, peer_port);
204         return rc;
205 }
206 EXPORT_SYMBOL(lnet_connect);
207
208 static inline int
209 lnet_accept_magic(__u32 magic, __u32 constant)
210 {
211         return (magic == constant ||
212                 magic == __swab32(constant));
213 }
214
215 int
216 lnet_accept(lnet_ni_t *blind_ni, cfs_socket_t *sock, __u32 magic)
217 {
218         lnet_acceptor_connreq_t cr;
219         __u32                   peer_ip;
220         int                     peer_port;
221         int                     rc;
222         int                     flip;
223         lnet_ni_t              *ni;
224         char                   *str;
225
226         /* CAVEAT EMPTOR: I may be called by an LND in any thread's context if
227          * I passed the new socket "blindly" to the single NI that needed an
228          * acceptor.  If so, blind_ni != NULL... */
229
230         LASSERT (sizeof(cr) <= 16);             /* not too big for the stack */
231         
232         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
233         LASSERT (rc == 0);                      /* we succeeded before */
234
235         if (!lnet_accept_magic(magic, LNET_PROTO_ACCEPTOR_MAGIC)) {
236
237                 if (lnet_accept_magic(magic, LNET_PROTO_MAGIC)) {
238                         /* future version compatibility!
239                          * When LNET unifies protocols over all LNDs, the first
240                          * thing sent will be a version query.  I send back
241                          * LNET_PROTO_ACCEPTOR_MAGIC to tell her I'm "old" */
242
243                         memset (&cr, 0, sizeof(cr));
244                         cr.acr_magic = LNET_PROTO_ACCEPTOR_MAGIC;
245                         cr.acr_version = LNET_PROTO_ACCEPTOR_VERSION;
246                         rc = libcfs_sock_write(sock, &cr, sizeof(cr),
247                                                accept_timeout);
248
249                         if (rc != 0)
250                                 CERROR("Error sending magic+version in response"
251                                        "to LNET magic from %u.%u.%u.%u: %d\n",
252                                        HIPQUAD(peer_ip), rc);
253                         return -EPROTO;
254                 }
255
256                 if (magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC))
257                         str = "'old' socknal/tcpnal";
258                 else if (lnet_accept_magic(magic, LNET_PROTO_RA_MAGIC))
259                         str = "'old' ranal";
260                 else if (lnet_accept_magic(magic, LNET_PROTO_OPENIB_MAGIC))
261                         str = "'old' openibnal";
262                 else
263                         str = "unrecognised";
264             
265                 LCONSOLE_ERROR_MSG(0x11f, "Refusing connection from %u.%u.%u.%u"
266                                    " magic %08x: %s acceptor protocol\n",
267                                    HIPQUAD(peer_ip), magic, str);
268                 return -EPROTO;
269         }
270
271         flip = (magic != LNET_PROTO_ACCEPTOR_MAGIC);
272
273         rc = libcfs_sock_read(sock, &cr.acr_version, 
274                               sizeof(cr.acr_version),
275                               accept_timeout);
276         if (rc != 0) {
277                 CERROR("Error %d reading connection request version from "
278                        "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
279                 return -EIO;
280         }
281
282         if (flip)
283                 __swab32s(&cr.acr_version);
284         
285         if (cr.acr_version != LNET_PROTO_ACCEPTOR_VERSION) {
286                 /* future version compatibility!
287                  * An acceptor-specific protocol rev will first send a version
288                  * query.  I send back my current version to tell her I'm
289                  * "old". */
290                 int peer_version = cr.acr_version;
291
292                 memset (&cr, 0, sizeof(cr));
293                 cr.acr_magic = LNET_PROTO_ACCEPTOR_MAGIC;
294                 cr.acr_version = LNET_PROTO_ACCEPTOR_VERSION;
295
296                 rc = libcfs_sock_write(sock, &cr, sizeof(cr),
297                                        accept_timeout);
298
299                 if (rc != 0)
300                         CERROR("Error sending magic+version in response"
301                                "to version %d from %u.%u.%u.%u: %d\n",
302                                peer_version, HIPQUAD(peer_ip), rc);
303                 return -EPROTO;
304         }
305
306         rc = libcfs_sock_read(sock, &cr.acr_nid,
307                               sizeof(cr) -
308                               offsetof(lnet_acceptor_connreq_t, acr_nid),
309                               accept_timeout);
310         if (rc != 0) {
311                 CERROR("Error %d reading connection request from "
312                        "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
313                 return -EIO;
314         }
315
316         if (flip)
317                 __swab64s(&cr.acr_nid);
318
319         ni = lnet_net2ni(LNET_NIDNET(cr.acr_nid));
320         if (ni == NULL ||               /* no matching net */
321             ni->ni_nid != cr.acr_nid) { /* right NET, wrong NID! */
322                 if (ni != NULL)
323                         lnet_ni_decref(ni);
324                 LCONSOLE_ERROR_MSG(0x120, "Refusing connection from %u.%u.%u.%u"
325                                    " for %s: No matching NI\n",
326                                    HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid));
327                 return -EPERM;
328         }
329
330         if (ni->ni_lnd->lnd_accept == NULL) {
331                 /* This catches a request for the loopback LND */
332                 lnet_ni_decref(ni);
333                 LCONSOLE_ERROR_MSG(0x121, "Refusing connection from %u.%u.%u.%u"
334                                   " for %s: NI doesn not accept IP connections\n",
335                                   HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid));
336                 return -EPERM;
337         }
338
339         CDEBUG(D_NET, "Accept %s from %u.%u.%u.%u%s\n",
340                libcfs_nid2str(cr.acr_nid), HIPQUAD(peer_ip),
341                blind_ni == NULL ? "" : " (blind)");
342
343         if (blind_ni == NULL) {
344                 /* called by the acceptor: call into the requested NI... */
345                 rc = ni->ni_lnd->lnd_accept(ni, sock);
346         } else {
347                 /* portals_compatible set and the (only) NI called me to verify
348                  * and skip the connection request... */
349                 LASSERT (the_lnet.ln_ptlcompat != 0);
350                 LASSERT (ni == blind_ni);
351                 rc = 0;
352         }
353
354         lnet_ni_decref(ni);
355         return rc;
356 }
357 EXPORT_SYMBOL(lnet_accept);
358         
359 int
360 lnet_acceptor(void *arg)
361 {
362         char           name[16];
363         cfs_socket_t  *newsock;
364         int            rc;
365         int            n_acceptor_nis;
366         __u32          magic;
367         __u32          peer_ip;
368         int            peer_port;
369         lnet_ni_t     *blind_ni = NULL;
370         int            secure = (int)((unsigned long)arg);
371
372         LASSERT (lnet_acceptor_state.pta_sock == NULL);
373
374         if (the_lnet.ln_ptlcompat != 0) {
375                 /* When portals_compatibility is enabled, peers may connect
376                  * without sending an acceptor connection request.  There is no
377                  * ambiguity about which network the peer wants to connect to
378                  * since there can only be 1 network, so I pass connections
379                  * "blindly" to it. */
380                 n_acceptor_nis = lnet_count_acceptor_nis(&blind_ni);
381                 LASSERT (n_acceptor_nis == 1);
382                 LASSERT (blind_ni != NULL);
383         }
384
385         snprintf(name, sizeof(name), "acceptor_%03d", accept_port);
386         cfs_daemonize(name);
387         cfs_block_allsigs();
388
389         rc = libcfs_sock_listen(&lnet_acceptor_state.pta_sock,
390                                 0, accept_port, accept_backlog);
391         if (rc != 0) {
392                 if (rc == -EADDRINUSE)
393                         LCONSOLE_ERROR_MSG(0x122, "Can't start acceptor on port"
394                                            " %d: port already in use\n",
395                                            accept_port);
396                 else
397                         LCONSOLE_ERROR_MSG(0x123, "Can't start acceptor on port "
398                                            "%d: unexpected error %d\n",
399                                            accept_port, rc);
400
401                 lnet_acceptor_state.pta_sock = NULL;
402         } else {
403                 LCONSOLE(0, "Accept %s, port %d%s\n", 
404                          accept, accept_port,
405                          blind_ni == NULL ? "" : " (proto compatible)");
406         }
407         
408         /* set init status and unblock parent */
409         lnet_acceptor_state.pta_shutdown = rc;
410         mutex_up(&lnet_acceptor_state.pta_signal);
411         
412         if (rc != 0)
413                 return rc;
414
415         while (!lnet_acceptor_state.pta_shutdown) {
416                 
417                 rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock);
418                 if (rc != 0) {
419                         if (rc != -EAGAIN) {
420                                 CWARN("Accept error %d: pausing...\n", rc);
421                                 cfs_pause(cfs_time_seconds(1));
422                         }
423                         continue;
424                 }
425
426                 rc = libcfs_sock_getaddr(newsock, 1, &peer_ip, &peer_port);
427                 if (rc != 0) {
428                         CERROR("Can't determine new connection's address\n");
429                         goto failed;
430                 }
431
432                 if (secure && peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
433                         CERROR("Refusing connection from %u.%u.%u.%u: "
434                                "insecure port %d\n",
435                                HIPQUAD(peer_ip), peer_port);
436                         goto failed;
437                 }
438
439                 if (blind_ni != NULL) {
440                         rc = blind_ni->ni_lnd->lnd_accept(blind_ni, newsock);
441                         if (rc != 0) {
442                                 CERROR("NI %s refused 'blind' connection from "
443                                        "%u.%u.%u.%u\n", 
444                                        libcfs_nid2str(blind_ni->ni_nid), 
445                                        HIPQUAD(peer_ip));
446                                 goto failed;
447                         }
448                         continue;
449                 }
450                 
451                 rc = libcfs_sock_read(newsock, &magic, sizeof(magic),
452                                       accept_timeout);
453                 if (rc != 0) {
454                         CERROR("Error %d reading connection request from "
455                                "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
456                         goto failed;
457                 }
458
459                 rc = lnet_accept(NULL, newsock, magic);
460                 if (rc != 0)
461                         goto failed;
462                 
463                 continue;
464                 
465         failed:
466                 libcfs_sock_release(newsock);
467         }
468         
469         libcfs_sock_release(lnet_acceptor_state.pta_sock);
470         lnet_acceptor_state.pta_sock = NULL;
471
472         if (blind_ni != NULL)
473                 lnet_ni_decref(blind_ni);
474
475         LCONSOLE(0,"Acceptor stopping\n");
476         
477         /* unblock lnet_acceptor_stop() */
478         mutex_up(&lnet_acceptor_state.pta_signal);
479         return 0;
480 }
481
482 int
483 lnet_acceptor_start(void)
484 {
485         long   pid;
486         long   secure;
487
488         LASSERT (lnet_acceptor_state.pta_sock == NULL);
489         init_mutex_locked(&lnet_acceptor_state.pta_signal);
490
491         if (!strcmp(accept, "secure")) {
492                 secure = 1;
493         } else if (!strcmp(accept, "all")) {
494                 secure = 0;
495         } else if (!strcmp(accept, "none")) {
496                 return 0;
497         } else {
498                 LCONSOLE_ERROR_MSG(0x124, "Can't parse 'accept=\"%s\"'\n",
499                                    accept);
500                 return -EINVAL;
501         }
502         
503         if (lnet_count_acceptor_nis(NULL) == 0)  /* not required */
504                 return 0;
505         
506         pid = cfs_kernel_thread(lnet_acceptor, (void *)secure, 0);
507         if (pid < 0) {
508                 CERROR("Can't start acceptor thread: %ld\n", pid);
509                 return -ESRCH;
510         }
511
512         mutex_down(&lnet_acceptor_state.pta_signal); /* wait for acceptor to startup */
513
514         if (!lnet_acceptor_state.pta_shutdown) {
515                 /* started OK */
516                 LASSERT (lnet_acceptor_state.pta_sock != NULL);
517                 return 0;
518         }
519
520         LASSERT (lnet_acceptor_state.pta_sock == NULL);
521         return -ENETDOWN;
522 }
523
524 void
525 lnet_acceptor_stop(void)
526 {
527         if (lnet_acceptor_state.pta_sock == NULL) /* not running */
528                 return;
529         
530         lnet_acceptor_state.pta_shutdown = 1;
531         libcfs_sock_abort_accept(lnet_acceptor_state.pta_sock);
532
533         /* block until acceptor signals exit */
534         mutex_down(&lnet_acceptor_state.pta_signal);
535 }
536
537 #else /* __KERNEL__ */
538 #ifdef HAVE_LIBPTHREAD
539
540 static char *accept_type;
541 static int accept_port = 988;
542 static int accept_backlog;
543 static int accept_timeout;
544
545 struct {
546         int                   pta_shutdown;
547         int                   pta_sock;
548         struct cfs_completion pta_completion;
549 } lnet_acceptor_state;
550
551 int
552 lnet_acceptor_port(void)
553 {
554         return accept_port;
555 }
556
557 int
558 lnet_parse_int_tunable(int *value, char *name, int dflt)
559 {
560         char    *env = getenv(name);
561         char    *end;
562
563         if (env == NULL) {
564                 *value = dflt;
565                 return 0;
566         }
567
568         *value = strtoull(env, &end, 0);
569         if (*end == 0)
570                 return 0;
571
572         CERROR("Can't parse tunable %s=%s\n", name, env);
573         return -EINVAL;
574 }
575
576 int
577 lnet_parse_string_tunable(char **value, char *name, char *dflt)
578 {
579         char    *env = getenv(name);
580
581         if (env == NULL)
582                 *value = dflt;             
583         else
584                 *value = env;
585
586         return 0;
587 }
588
589 int
590 lnet_acceptor_get_tunables()
591 {
592         int rc;
593         rc = lnet_parse_string_tunable(&accept_type, "LNET_ACCEPT", "secure");
594
595         if (rc != 0)
596                 return rc;
597
598         rc = lnet_parse_int_tunable(&accept_port, "LNET_ACCEPT_PORT", 988);
599
600         if (rc != 0)
601                 return rc;
602         
603         rc = lnet_parse_int_tunable(&accept_backlog, "LNET_ACCEPT_BACKLOG", 127);
604
605         if (rc != 0)
606                 return rc;
607
608         rc = lnet_parse_int_tunable(&accept_timeout, "LNET_ACCEPT_TIMEOUT", 5);
609
610         if (rc != 0)
611                 return rc;
612
613         CDEBUG(D_NET, "accept_type     = %s\n", accept_type);
614         CDEBUG(D_NET, "accept_port     = %d\n", accept_port);
615         CDEBUG(D_NET, "accept_backlog  = %d\n", accept_backlog);
616         CDEBUG(D_NET, "accept_timeout  = %d\n", accept_timeout);
617         return 0;
618 }
619
620 static inline int
621 lnet_accept_magic(__u32 magic, __u32 constant)
622 {
623         return (magic == constant ||
624                 magic == __swab32(constant));
625 }
626
627 /* user-land lnet_accept() isn't used by any LND's directly. So, we don't
628  * do it visible outside acceptor.c and we can change its prototype
629  * freely */
630 static int
631 lnet_accept(int sock, __u32 magic, __u32 peer_ip, int peer_port)
632 {
633         int rc, flip;
634         lnet_acceptor_connreq_t cr;
635         lnet_ni_t *ni;
636         
637         if (!lnet_accept_magic(magic, LNET_PROTO_ACCEPTOR_MAGIC)) {
638                 LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u magic %08x: "
639                                "unsupported acceptor protocol\n",
640                                HIPQUAD(peer_ip), magic);
641                 return -EPROTO;
642         }
643
644         flip = (magic != LNET_PROTO_ACCEPTOR_MAGIC);
645         
646         rc = libcfs_sock_read(sock, &cr.acr_version, 
647                               sizeof(cr.acr_version),
648                               accept_timeout);
649         if (rc != 0) {
650                 CERROR("Error %d reading connection request version from "
651                        "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
652                 return -EIO;
653         }
654
655         if (flip)
656                 __swab32s(&cr.acr_version);
657
658         if (cr.acr_version != LNET_PROTO_ACCEPTOR_VERSION)
659                 return -EPROTO;
660                 
661         rc = libcfs_sock_read(sock, &cr.acr_nid,
662                               sizeof(cr) -
663                               offsetof(lnet_acceptor_connreq_t, acr_nid),
664                               accept_timeout);
665         if (rc != 0) {
666                 CERROR("Error %d reading connection request from "
667                        "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
668                 return -EIO;
669         }
670
671         if (flip)
672                 __swab64s(&cr.acr_nid);
673
674         ni = lnet_net2ni(LNET_NIDNET(cr.acr_nid));
675                        
676         if (ni == NULL ||                    /* no matching net */
677              ni->ni_nid != cr.acr_nid) {     /* right NET, wrong NID! */
678                 if (ni != NULL)
679                         lnet_ni_decref(ni);
680                 LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: "
681                                " No matching NI\n",
682                                HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid));
683                 return -EPERM;
684         }
685
686         if (ni->ni_lnd->lnd_accept == NULL) {
687                 lnet_ni_decref(ni);
688                 LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: "
689                                " NI doesn not accept IP connections\n",
690                                HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid));
691                 return -EPERM;
692         }
693         
694         CDEBUG(D_NET, "Accept %s from %u.%u.%u.%u\n",
695                libcfs_nid2str(cr.acr_nid), HIPQUAD(peer_ip));
696
697         rc = ni->ni_lnd->lnd_accept(ni, sock);
698         
699         lnet_ni_decref(ni);
700         return rc;
701 }
702
703 int
704 lnet_acceptor(void *arg)
705 {
706         char           name[16];
707         int            secure = (int)((unsigned long)arg);
708         int            rc;
709         int            newsock;
710         __u32          peer_ip;
711         int            peer_port;
712         __u32          magic;
713
714         snprintf(name, sizeof(name), "acceptor_%03d", accept_port);
715         cfs_daemonize(name);
716         cfs_block_allsigs();
717
718         rc = libcfs_sock_listen(&lnet_acceptor_state.pta_sock,
719                                 0, accept_port, accept_backlog);
720         if (rc != 0) {
721                 if (rc == -EADDRINUSE)
722                         LCONSOLE_ERROR("Can't start acceptor on port %d: "
723                                        "port already in use\n",
724                                        accept_port);
725                 else
726                         LCONSOLE_ERROR("Can't start acceptor on port %d: "
727                                        "unexpected error %d\n",
728                                        accept_port, rc);
729
730         } else {
731                 LCONSOLE(0, "Accept %s, port %d\n", accept_type, accept_port);
732         }
733         
734         /* set init status and unblock parent */
735         lnet_acceptor_state.pta_shutdown = rc;
736         cfs_complete(&lnet_acceptor_state.pta_completion);
737
738         if (rc != 0)
739                 return rc;
740
741         while (!lnet_acceptor_state.pta_shutdown) {
742
743                 rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock,
744                                         &peer_ip, &peer_port);
745                 if (rc != 0)
746                         continue;
747
748                 /* maybe we're waken up with libcfs_sock_abort_accept() */
749                 if (lnet_acceptor_state.pta_shutdown) {
750                         close(newsock);
751                         break;
752                 }
753
754                 if (secure && peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
755                         CERROR("Refusing connection from %u.%u.%u.%u: "
756                                "insecure port %d\n",
757                                HIPQUAD(peer_ip), peer_port);
758                         goto failed;
759                 }
760
761                 rc = libcfs_sock_read(newsock, &magic, sizeof(magic),
762                                       accept_timeout);
763                 if (rc != 0) {
764                         CERROR("Error %d reading connection request from "
765                                "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
766                         goto failed;
767                 }
768
769                 rc = lnet_accept(newsock, magic, peer_ip, peer_port);
770                 if (rc != 0)
771                         goto failed;
772                 
773                 continue;
774                 
775           failed:
776                 close(newsock);
777         }
778         
779         close(lnet_acceptor_state.pta_sock);
780         LCONSOLE(0,"Acceptor stopping\n");
781
782         /* unblock lnet_acceptor_stop() */
783         cfs_complete(&lnet_acceptor_state.pta_completion);        
784
785         return 0;
786 }
787
788 static int skip_waiting_for_completion;
789
790 int
791 lnet_acceptor_start(void)
792 {
793         long   secure;
794         int rc;
795
796         rc = lnet_acceptor_get_tunables();
797         if (rc != 0)
798                 return rc;
799
800         /* Do nothing if we're liblustre clients */
801         if ((the_lnet.ln_pid & LNET_PID_USERFLAG) != 0)
802                 return 0;
803                         
804         cfs_init_completion(&lnet_acceptor_state.pta_completion);
805
806         if (!strcmp(accept_type, "secure")) {
807                 secure = 1;
808         } else if (!strcmp(accept_type, "all")) {
809                 secure = 0;
810         } else if (!strcmp(accept_type, "none")) {
811                 skip_waiting_for_completion = 1;
812                 return 0;
813         } else {
814                 LCONSOLE_ERROR ("Can't parse 'accept_type=\"%s\"'\n", accept_type);
815                 cfs_fini_completion(&lnet_acceptor_state.pta_completion);
816                 return -EINVAL;
817         }
818
819         if (lnet_count_acceptor_nis(NULL) == 0) { /* not required */
820                 skip_waiting_for_completion = 1;
821                 return 0;
822         }
823
824         rc = cfs_create_thread(lnet_acceptor, (void *)secure);
825         if (rc != 0) {
826                 CERROR("Can't start acceptor thread: %d\n", rc);
827                 cfs_fini_completion(&lnet_acceptor_state.pta_completion);
828                 return rc;
829         }
830
831         /* wait for acceptor to startup */
832         cfs_wait_for_completion(&lnet_acceptor_state.pta_completion);
833
834         if (!lnet_acceptor_state.pta_shutdown)
835                 return 0;
836         
837         cfs_fini_completion(&lnet_acceptor_state.pta_completion);
838         return -ENETDOWN;
839 }
840
841 void
842 lnet_acceptor_stop(void)
843 {
844         /* Do nothing if we're liblustre clients */
845         if ((the_lnet.ln_pid & LNET_PID_USERFLAG) != 0)
846                 return;
847
848         if (!skip_waiting_for_completion) {
849                 lnet_acceptor_state.pta_shutdown = 1;
850                 libcfs_sock_abort_accept(accept_port);
851                 
852                 /* block until acceptor signals exit */
853                 cfs_wait_for_completion(&lnet_acceptor_state.pta_completion);
854         }
855         
856         cfs_fini_completion(&lnet_acceptor_state.pta_completion);
857 }
858 #else
859 int
860 lnet_acceptor_start(void)
861 {
862         return 0;
863 }
864
865 void
866 lnet_acceptor_stop(void)
867 {
868 }
869 #endif /* !HAVE_LIBPTHREAD */
870 #endif /* !__KERNEL__ */