1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002 Cray Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 This file provides a simple stateful connection manager which
24 builds tcp connections on demand and leaves them open for
36 #include <sys/types.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #include <lnet/types.h>
41 #include <lnet/lib-types.h>
42 #include <lnet/socklnd.h>
43 #include <libcfs/kp30.h>
44 #include <connection.h>
51 /* tunables (via environment) */
52 int tcpnal_acceptor_port = 988;
53 int tcpnal_buffer_size = 0;
57 tcpnal_env_param (char *name, int *val)
59 char *env = getenv(name);
65 n = strlen(env); /* scanf may not assign on EOS */
66 if (sscanf(env, "%i%n", val, &n) >= 1 && n == strlen(env)) {
67 CDEBUG(D_INFO, "Environment variable %s set to %d\n",
72 CERROR("Can't parse environment variable '%s=%s'\n",
78 tcpnal_set_global_params (void)
80 return tcpnal_env_param("TCPNAL_PORT",
81 &tcpnal_acceptor_port) &&
82 tcpnal_env_param("TCPLND_PORT",
83 &tcpnal_acceptor_port) &&
84 tcpnal_env_param("TCPNAL_BUFFER_SIZE",
85 &tcpnal_buffer_size) &&
86 tcpnal_env_param("TCPLND_BUFFER_SIZE",
87 &tcpnal_buffer_size) &&
88 tcpnal_env_param("TCPNAL_NAGLE",
90 tcpnal_env_param("TCPLND_NAGLE",
94 /* Function: compare_connection
95 * Arguments: connection c: a connection in the hash table
96 * lnet_process_id_t: an id to verify agains
97 * Returns: 1 if the connection is the one requested, 0 otherwise
99 * compare_connection() tests for collisions in the hash table
101 static int compare_connection(void *arg1, void *arg2)
104 lnet_nid_t *nid = arg2;
106 return (c->peer_nid == *nid);
109 /* Function: connection_key
110 * Arguments: lnet_process_id_t id: an id to hash
111 * Returns: a not-particularily-well-distributed hash
114 static unsigned int connection_key(void *arg)
116 lnet_nid_t *nid = arg;
118 return (unsigned int)(*nid);
122 close_connection(void *arg)
130 /* Function: remove_connection
131 * Arguments: c: the connection to remove
133 void remove_connection(void *arg)
137 hash_table_remove(c->m->connections,&c->peer_nid);
142 /* Function: read_connection:
143 * Arguments: c: the connection to read from
144 * dest: the buffer to read into
145 * len: the number of bytes to read
146 * Returns: success as 1, or failure as 0
148 * read_connection() reads data from the connection, continuing
149 * to read partial results until the request is satisfied or
150 * it errors. TODO: this read should be covered by signal protection.
152 int read_connection(connection c,
161 rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
163 rc = recv(c->fd, dest+offset, len-offset, 0);
166 if (errno == EINTR) {
169 remove_connection(c);
174 } while (offset < len);
179 static int connection_input(void *d)
182 return((*c->m->handler)(c->m->handler_arg,c));
187 allocate_connection(manager m,
191 connection c=malloc(sizeof(struct connection));
197 register_io_handler(fd,READ_HANDLER,connection_input,c);
198 hash_table_insert(m->connections,c,&nid);
203 tcpnal_write(lnet_nid_t nid, int sockfd, void *buffer, int nob)
205 int rc = syscall(SYS_write, sockfd, buffer, nob);
207 /* NB called on an 'empty' socket with huge buffering! */
212 CERROR("Failed to send to %s: %s\n",
213 libcfs_nid2str(nid), strerror(errno));
217 CERROR("Short send to %s: %d/%d\n",
218 libcfs_nid2str(nid), rc, nob);
223 tcpnal_read(lnet_nid_t nid, int sockfd, void *buffer, int nob)
228 rc = syscall(SYS_read, sockfd, buffer, nob);
231 CERROR("Unexpected EOF from %s\n",
232 libcfs_nid2str(nid));
237 CERROR("Failed to receive from %s: %s\n",
238 libcfs_nid2str(nid), strerror(errno));
248 tcpnal_hello (int sockfd, lnet_nid_t nid)
254 lnet_acceptor_connreq_t cr;
256 lnet_magicversion_t hmv;
258 gettimeofday(&tv, NULL);
259 incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
261 memset(&cr, 0, sizeof(cr));
262 cr.acr_magic = LNET_PROTO_ACCEPTOR_MAGIC;
263 cr.acr_version = LNET_PROTO_ACCEPTOR_VERSION;
266 /* hmv initialised and copied separately into hdr; compiler "optimize"
267 * likely due to confusion about pointer alias of hmv and hdr when this
268 * was done in-place. */
269 hmv.magic = cpu_to_le32(LNET_PROTO_TCP_MAGIC);
270 hmv.version_major = cpu_to_le32(LNET_PROTO_TCP_VERSION_MAJOR);
271 hmv.version_minor = cpu_to_le32(LNET_PROTO_TCP_VERSION_MINOR);
273 memset (&hdr, 0, sizeof (hdr));
275 CLASSERT (sizeof (hmv) == sizeof (hdr.dest_nid));
276 memcpy(&hdr.dest_nid, &hmv, sizeof(hmv));
278 /* hdr.src_nid/src_pid are ignored at dest */
280 hdr.type = cpu_to_le32(LNET_MSG_HELLO);
281 hdr.msg.hello.type = cpu_to_le32(SOCKLND_CONN_ANY);
282 hdr.msg.hello.incarnation = cpu_to_le64(incarnation);
284 /* I don't send any interface info */
286 /* Assume sufficient socket buffering for these messages... */
287 rc = tcpnal_write(nid, sockfd, &cr, sizeof(cr));
291 rc = tcpnal_write(nid, sockfd, &hdr, sizeof(hdr));
295 rc = tcpnal_read(nid, sockfd, &hmv, sizeof(hmv));
299 if (hmv.magic != le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
300 CERROR ("Bad magic %#08x (%#08x expected) from %s\n",
301 cpu_to_le32(hmv.magic), LNET_PROTO_TCP_MAGIC,
302 libcfs_nid2str(nid));
306 if (hmv.version_major != cpu_to_le16 (LNET_PROTO_TCP_VERSION_MAJOR) ||
307 hmv.version_minor != cpu_to_le16 (LNET_PROTO_TCP_VERSION_MINOR)) {
308 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
310 le16_to_cpu (hmv.version_major),
311 le16_to_cpu (hmv.version_minor),
312 LNET_PROTO_TCP_VERSION_MAJOR,
313 LNET_PROTO_TCP_VERSION_MINOR,
314 libcfs_nid2str(nid));
318 #if (LNET_PROTO_TCP_VERSION_MAJOR != 1)
319 # error "This code only understands protocol version 1.x"
321 /* version 1 sends magic/version as the dest_nid of a 'hello' header,
322 * so read the rest of it in now... */
324 rc = tcpnal_read(nid, sockfd, ((char *)&hdr) + sizeof (hmv),
325 sizeof(hdr) - sizeof(hmv));
329 /* ...and check we got what we expected */
330 if (hdr.type != cpu_to_le32 (LNET_MSG_HELLO)) {
331 CERROR ("Expecting a HELLO hdr "
332 " but got type %d with %d payload from %s\n",
333 le32_to_cpu (hdr.type),
334 le32_to_cpu (hdr.payload_length), libcfs_nid2str(nid));
338 if (le64_to_cpu(hdr.src_nid) == LNET_NID_ANY) {
339 CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY\n");
343 if (nid != le64_to_cpu (hdr.src_nid)) {
344 CERROR ("Connected to %s, but expecting %s\n",
345 libcfs_nid2str(le64_to_cpu (hdr.src_nid)),
346 libcfs_nid2str(nid));
350 /* Ignore any interface info in the payload */
351 nob = le32_to_cpu(hdr.payload_length);
353 CERROR("Unexpected HELLO payload %d from %s\n",
354 nob, libcfs_nid2str(nid));
361 /* Function: force_tcp_connection
362 * Arguments: t: tcpnal
363 * dest: portals endpoint for the connection
364 * Returns: an allocated connection structure, either
365 * a pre-existing one, or a new connection
367 connection force_tcp_connection(manager m,
371 unsigned int ip = LNET_NIDADDR(nid);
373 struct sockaddr_in addr;
374 struct sockaddr_in locaddr;
380 pthread_mutex_lock(&m->conn_lock);
382 conn = hash_table_find(m->connections, &nid);
386 memset(&addr, 0, sizeof(addr));
387 addr.sin_family = AF_INET;
388 addr.sin_addr.s_addr = htonl(ip);
389 addr.sin_port = htons(tcpnal_acceptor_port);
391 memset(&locaddr, 0, sizeof(locaddr));
392 locaddr.sin_family = AF_INET;
393 locaddr.sin_addr.s_addr = INADDR_ANY;
394 locaddr.sin_port = htons(m->port);
396 #if 1 /* tcpnal connects from a non-privileged port */
397 fd = socket(AF_INET, SOCK_STREAM, 0);
399 perror("tcpnal socket failed");
404 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
405 &option, sizeof(option));
407 perror ("Can't set SO_REUSEADDR for socket");
413 /* Bind all subsequent connections to the same port */
414 rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
416 perror("Error binding port");
422 rc = connect(fd, (struct sockaddr *)&addr,
423 sizeof(struct sockaddr_in));
425 perror("Error connecting to remote host");
430 sz = sizeof(locaddr);
431 rc = getsockname(fd, (struct sockaddr *)&locaddr, &sz);
433 perror ("Error on getsockname");
439 m->port = ntohs(locaddr.sin_port);
442 for (rport = IPPORT_RESERVED - 1; rport > IPPORT_RESERVED / 2; --rport) {
443 fd = socket(AF_INET, SOCK_STREAM, 0);
445 perror("tcpnal socket failed");
450 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
451 &option, sizeof(option));
453 perror ("Can't set SO_REUSEADDR for socket");
458 locaddr.sin_port = htons(rport);
459 rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
460 if (rc == 0 || errno == EACCES) {
461 rc = connect(fd, (struct sockaddr *)&addr,
462 sizeof(struct sockaddr_in));
465 } else if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) {
466 perror("Error connecting to remote host");
470 } else if (errno != EADDRINUSE) {
471 perror("Error binding to privileged port");
478 if (rport == IPPORT_RESERVED / 2) {
479 fprintf(stderr, "Out of ports trying to bind to a reserved port\n");
484 option = tcpnal_nagle ? 0 : 1;
485 setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
486 option = tcpnal_buffer_size;
488 setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
489 option = tcpnal_buffer_size;
490 setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
494 if (tcpnal_hello(fd, nid))
497 conn = allocate_connection(m, nid, fd);
499 /* let nal thread know this event right away */
501 procbridge_wakeup_nal(pb);
504 pthread_mutex_unlock(&m->conn_lock);
509 #if 0 /* we don't accept connections */
510 /* Function: new_connection
511 * Arguments: t: opaque argument holding the tcpname
512 * Returns: 1 in order to reregister for new connection requests
514 * called when the bound service socket recieves
515 * a new connection request, it always accepts and
516 * installs a new connection
518 static int new_connection(void *z)
521 struct sockaddr_in s;
522 int len=sizeof(struct sockaddr_in);
523 int fd=accept(m->bound,(struct sockaddr *)&s,&len);
524 unsigned int nid=*((unsigned int *)&s.sin_addr);
525 /* cfs specific hack */
526 //unsigned short pid=s.sin_port;
527 pthread_mutex_lock(&m->conn_lock);
528 allocate_connection(m,htonl(nid),0/*pid*/,fd);
529 pthread_mutex_unlock(&m->conn_lock);
533 /* Function: bind_socket
534 * Arguments: t: the nal state for this interface
535 * port: the port to attempt to bind to
536 * Returns: 1 on success, or 0 on error
538 * bind_socket() attempts to allocate and bind a socket to the requested
539 * port, or dynamically assign one from the kernel should the port be
540 * zero. Sets the bound and bound_handler elements of m.
542 * TODO: The port should be an explicitly sized type.
544 static int bind_socket(manager m,unsigned short port)
546 struct sockaddr_in addr;
547 int alen=sizeof(struct sockaddr_in);
549 if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)
552 bzero((char *) &addr, sizeof(addr));
553 addr.sin_family = AF_INET;
554 addr.sin_addr.s_addr = 0;
555 addr.sin_port = htons(port);
557 if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
558 perror ("tcpnal bind");
562 getsockname(m->bound,(struct sockaddr *)&addr, &alen);
564 m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
567 m->port=addr.sin_port;
573 /* Function: shutdown_connections
574 * Arguments: m: the manager structure
576 * close all connections and reclaim resources
578 void shutdown_connections(manager m)
581 /* we don't accept connections */
583 remove_io_handler(m->bound_handler);
585 hash_destroy_table(m->connections,close_connection);
590 /* Function: init_connections
591 * Arguments: t: the nal state for this interface
592 * Returns: a newly allocated manager structure, or
593 * zero if the fixed port could not be bound
595 manager init_connections(int (*input)(void *, void *), void *a)
597 manager m = (manager)malloc(sizeof(struct manager));
599 m->connections = hash_create_table(compare_connection,connection_key);
602 m->port = 0; /* set on first connection */
603 pthread_mutex_init(&m->conn_lock, 0);
607 if (bind_socket(m,pid))