1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002 Cray Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 This file provides a simple stateful connection manager which
24 builds tcp connections on demand and leaves them open for
25 future use. It also provides the machinery to allow peers
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <netinet/tcp.h>
41 #include <portals/types.h>
42 #include <portals/list.h>
43 #include <portals/lib-types.h>
44 #include <portals/socknal.h>
45 #include <linux/kp30.h>
46 #include <connection.h>
53 /* global variable: acceptor port */
54 unsigned short tcpnal_acceptor_port = 988;
57 /* Function: compare_connection
58 * Arguments: connection c: a connection in the hash table
59 * ptl_process_id_t: an id to verify agains
60 * Returns: 1 if the connection is the one requested, 0 otherwise
62 * compare_connection() tests for collisions in the hash table
64 static int compare_connection(void *arg1, void *arg2)
67 unsigned int * id = arg2;
69 return((c->ip==id[0]) && (c->port==id[1]));
71 /* CFS specific hacking */
72 return (c->ip == id[0]);
77 /* Function: connection_key
78 * Arguments: ptl_process_id_t id: an id to hash
79 * Returns: a not-particularily-well-distributed hash
82 static unsigned int connection_key(unsigned int *id)
87 /* CFS specific hacking */
88 return (unsigned int) id[0];
93 /* Function: remove_connection
94 * Arguments: c: the connection to remove
96 void remove_connection(void *arg)
103 hash_table_remove(c->m->connections,id);
109 /* Function: read_connection:
110 * Arguments: c: the connection to read from
111 * dest: the buffer to read into
112 * len: the number of bytes to read
113 * Returns: success as 1, or failure as 0
115 * read_connection() reads data from the connection, continuing
116 * to read partial results until the request is satisfied or
117 * it errors. TODO: this read should be covered by signal protection.
119 int read_connection(connection c,
128 rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
130 rc = recv(c->fd, dest+offset, len-offset, 0);
133 if (errno == EINTR) {
136 remove_connection(c);
141 } while (offset < len);
146 static int connection_input(void *d)
149 return((*c->m->handler)(c->m->handler_arg,c));
153 /* Function: allocate_connection
154 * Arguments: t: tcpnal the allocation is occuring in the context of
155 * dest: portal endpoint address for this connection
156 * fd: open file descriptor for the socket
157 * Returns: an allocated connection structure
159 * just encompasses the action common to active and passive
160 * connections of allocation and placement in the global table
162 static connection allocate_connection(manager m,
167 connection c=malloc(sizeof(struct connection));
175 register_io_handler(fd,READ_HANDLER,connection_input,c);
176 hash_table_insert(m->connections,c,id);
181 /* Function: new_connection
182 * Arguments: t: opaque argument holding the tcpname
183 * Returns: 1 in order to reregister for new connection requests
185 * called when the bound service socket recieves
186 * a new connection request, it always accepts and
187 * installs a new connection
189 static int new_connection(void *z)
192 struct sockaddr_in s;
193 int len=sizeof(struct sockaddr_in);
194 int fd=accept(m->bound,(struct sockaddr *)&s,&len);
195 unsigned int nid=*((unsigned int *)&s.sin_addr);
196 /* cfs specific hack */
197 //unsigned short pid=s.sin_port;
198 pthread_mutex_lock(&m->conn_lock);
199 allocate_connection(m,htonl(nid),0/*pid*/,fd);
200 pthread_mutex_unlock(&m->conn_lock);
204 extern ptl_nid_t tcpnal_mynid;
207 tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
212 ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
214 LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
216 memset (&hdr, 0, sizeof (hdr));
217 hmv->magic = cpu_to_le32(PORTALS_PROTO_MAGIC);
218 hmv->version_major = cpu_to_le32(PORTALS_PROTO_VERSION_MAJOR);
219 hmv->version_minor = cpu_to_le32(PORTALS_PROTO_VERSION_MINOR);
221 hdr.src_nid = cpu_to_le64(tcpnal_mynid);
222 hdr.type = cpu_to_le32(PTL_MSG_HELLO);
224 hdr.msg.hello.type = cpu_to_le32(type);
225 hdr.msg.hello.incarnation = cpu_to_le64(incarnation);
227 /* I don't send any interface info */
229 /* Assume sufficient socket buffering for this message */
230 rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
232 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
236 rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
238 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
242 if (hmv->magic != le32_to_cpu(PORTALS_PROTO_MAGIC)) {
243 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
244 cpu_to_le32(hmv->magic), PORTALS_PROTO_MAGIC, *nid);
248 if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
249 hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
250 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
252 le16_to_cpu (hmv->version_major),
253 le16_to_cpu (hmv->version_minor),
254 PORTALS_PROTO_VERSION_MAJOR,
255 PORTALS_PROTO_VERSION_MINOR,
260 #if (PORTALS_PROTO_VERSION_MAJOR != 1)
261 # error "This code only understands protocol version 1.x"
263 /* version 1 sends magic/version as the dest_nid of a 'hello' header,
264 * so read the rest of it in now... */
266 rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
268 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
273 /* ...and check we got what we expected */
274 if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
275 CERROR ("Expecting a HELLO hdr "
276 " but got type %d with %d payload from "LPX64"\n",
277 le32_to_cpu (hdr.type),
278 le32_to_cpu (hdr.payload_length), *nid);
282 if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
283 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
287 if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */
288 *nid = le64_to_cpu(hdr.src_nid);
289 } else if (*nid != le64_to_cpu (hdr.src_nid)) {
290 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
291 le64_to_cpu (hdr.src_nid), *nid);
295 /* Ignore any interface info in the payload */
296 nob = le32_to_cpu(hdr.payload_length);
297 if (nob > getpagesize()) {
298 CERROR("Unexpected HELLO payload %d from "LPX64"\n",
303 char *space = (char *)malloc(nob);
306 CERROR("Can't allocate scratch buffer %d\n", nob);
310 rc = syscall(SYS_read, sockfd, space, nob);
312 CERROR("Error %d skipping HELLO payload from "
313 LPX64"\n", rc, *nid);
321 /* Function: force_tcp_connection
322 * Arguments: t: tcpnal
323 * dest: portals endpoint for the connection
324 * Returns: an allocated connection structure, either
325 * a pre-existing one, or a new connection
327 connection force_tcp_connection(manager m,
333 struct sockaddr_in addr;
334 struct sockaddr_in locaddr;
343 ptl_nid_t peernid = PTL_NID_ANY;
345 port = tcpnal_acceptor_port;
350 pthread_mutex_lock(&m->conn_lock);
352 conn = hash_table_find(m->connections, id);
356 memset(&addr, 0, sizeof(addr));
357 addr.sin_family = AF_INET;
358 addr.sin_addr.s_addr = htonl(ip);
359 addr.sin_port = htons(port);
361 memset(&locaddr, 0, sizeof(locaddr));
362 locaddr.sin_family = AF_INET;
363 locaddr.sin_addr.s_addr = INADDR_ANY;
365 for (rport = IPPORT_RESERVED - 1; rport > IPPORT_RESERVED / 2; --rport) {
366 fd = socket(AF_INET, SOCK_STREAM, 0);
368 perror("tcpnal socket failed");
373 rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
374 &option, sizeof(option));
376 perror ("Can't set SO_REUSEADDR for socket");
381 locaddr.sin_port = htons(rport);
382 rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
383 if (rc == 0 || errno == EACCES) {
384 rc = connect(fd, (struct sockaddr *)&addr,
385 sizeof(struct sockaddr_in));
388 } else if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) {
389 perror("Error connecting to remote host");
393 } else if (errno != EADDRINUSE) {
394 perror("Error binding to privileged port");
401 if (rport == IPPORT_RESERVED / 2) {
402 fprintf(stderr, "Out of ports trying to bind to a reserved port\n");
408 setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
410 setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
412 setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
415 gettimeofday(&tv, NULL);
416 incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
419 if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, incarnation))
422 conn = allocate_connection(m, ip, port, fd);
424 /* let nal thread know this event right away */
426 procbridge_wakeup_nal(pb);
429 pthread_mutex_unlock(&m->conn_lock);
434 /* Function: bind_socket
435 * Arguments: t: the nal state for this interface
436 * port: the port to attempt to bind to
437 * Returns: 1 on success, or 0 on error
439 * bind_socket() attempts to allocate and bind a socket to the requested
440 * port, or dynamically assign one from the kernel should the port be
441 * zero. Sets the bound and bound_handler elements of m.
443 * TODO: The port should be an explicitly sized type.
445 static int bind_socket(manager m,unsigned short port)
447 struct sockaddr_in addr;
448 int alen=sizeof(struct sockaddr_in);
450 if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)
453 bzero((char *) &addr, sizeof(addr));
454 addr.sin_family = AF_INET;
455 addr.sin_addr.s_addr = 0;
456 addr.sin_port = htons(port);
458 if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
459 perror ("tcpnal bind");
463 getsockname(m->bound,(struct sockaddr *)&addr, &alen);
465 m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
468 m->port=addr.sin_port;
473 /* Function: shutdown_connections
474 * Arguments: m: the manager structure
476 * close all connections and reclaim resources
478 void shutdown_connections(manager m)
481 remove_io_handler(m->bound_handler);
482 hash_destroy_table(m->connections,remove_connection);
487 /* Function: init_connections
488 * Arguments: t: the nal state for this interface
489 * port: the port to attempt to bind to
490 * Returns: a newly allocated manager structure, or
491 * zero if the fixed port could not be bound
493 manager init_connections(unsigned short pid,
494 int (*input)(void *, void *),
497 manager m = (manager)malloc(sizeof(struct manager));
498 m->connections = hash_create_table(compare_connection,connection_key);
501 pthread_mutex_init(&m->conn_lock, 0);
503 if (bind_socket(m,pid))