Whamcloud - gitweb
Severity : enhancement
[fs/lustre-release.git] / lnet / ulnds / socklnd / connection.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /* connection.c:
23    This file provides a simple stateful connection manager which
24    builds tcp connections on demand and leaves them open for
25    future use. 
26 */
27
28 #include <stdlib.h>
29 #include <pqtimer.h>
30 #include <dispatch.h>
31 #include <table.h>
32 #include <stdio.h>
33 #include <stdarg.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <sys/types.h>
37 #include <sys/socket.h>
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #include <lnet/types.h>
41 #include <lnet/lib-types.h>
42 #include <lnet/socklnd.h>
43 #include <libcfs/kp30.h>
44 #include <connection.h>
45 #include <pthread.h>
46 #include <errno.h>
47 #ifndef __CYGWIN__
48 #include <syscall.h>
49 #endif
50
51 /* tunables (via environment) */
52 int tcpnal_acceptor_port = 988;
53 int tcpnal_buffer_size   = 0;
54 int tcpnal_nagle         = 0;
55
56 int
57 tcpnal_env_param (char *name, int *val)
58 {
59         char   *env = getenv(name);
60         int     n;
61
62         if (env == NULL)
63                 return 1;
64
65         n = strlen(env);                /* scanf may not assign on EOS */
66         if (sscanf(env, "%i%n", val, &n) >= 1 && n == strlen(env)) {
67                 CDEBUG(D_INFO, "Environment variable %s set to %d\n",
68                        name, *val);
69                 return 1;
70         }
71
72         CERROR("Can't parse environment variable '%s=%s'\n",
73                name, env);
74         return 0;
75 }
76
77 int
78 tcpnal_set_global_params (void)
79 {
80         return  tcpnal_env_param("TCPNAL_PORT",
81                                 &tcpnal_acceptor_port) &&
82                 tcpnal_env_param("TCPLND_PORT",
83                                 &tcpnal_acceptor_port) &&
84                 tcpnal_env_param("TCPNAL_BUFFER_SIZE",
85                                  &tcpnal_buffer_size) &&
86                 tcpnal_env_param("TCPLND_BUFFER_SIZE",
87                                  &tcpnal_buffer_size) &&
88                 tcpnal_env_param("TCPNAL_NAGLE",
89                                  &tcpnal_nagle) &&
90                 tcpnal_env_param("TCPLND_NAGLE",
91                                  &tcpnal_nagle);
92 }
93
94 /* Function:  compare_connection
95  * Arguments: connection c:      a connection in the hash table
96  *            lnet_process_id_t:  an id to verify  agains
97  * Returns: 1 if the connection is the one requested, 0 otherwise
98  *
99  *    compare_connection() tests for collisions in the hash table
100  */
101 static int compare_connection(void *arg1, void *arg2)
102 {
103     connection  c = arg1;
104     lnet_nid_t *nid = arg2;
105
106     return (c->peer_nid == *nid);
107 }
108
109 /* Function:  connection_key
110  * Arguments: lnet_process_id_t id:  an id to hash
111  * Returns: a not-particularily-well-distributed hash
112  *          of the id
113  */
114 static unsigned int connection_key(void *arg)
115 {
116         lnet_nid_t *nid = arg;
117         
118         return (unsigned int)(*nid);
119 }
120
121 void
122 close_connection(void *arg)
123 {
124         connection c = arg;
125         
126         close(c->fd);
127         free(c);
128 }
129
130 /* Function:  remove_connection
131  * Arguments: c: the connection to remove
132  */
133 void remove_connection(void *arg)
134 {
135         connection c = arg;
136         
137         hash_table_remove(c->m->connections,&c->peer_nid);
138         close_connection(c);
139 }
140
141
142 /* Function:  read_connection: 
143  * Arguments: c:    the connection to read from 
144  *            dest: the buffer to read into
145  *            len:  the number of bytes to read   
146  * Returns: success as 1, or failure as 0
147  *
148  *   read_connection() reads data from the connection, continuing
149  *   to read partial results until the request is satisfied or
150  *   it errors. TODO: this read should be covered by signal protection.
151  */
152 int read_connection(connection c,
153                     unsigned char *dest,
154                     int len)
155 {
156     int offset = 0,rc;
157
158     if (len) {
159         do {
160 #ifndef __CYGWIN__
161             rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
162 #else
163             rc = recv(c->fd, dest+offset, len-offset, 0);
164 #endif
165             if (rc <= 0) {
166                 if (errno == EINTR) {
167                     rc = 0;
168                 } else {
169                     remove_connection(c);
170                     return (0);
171                 }
172             }
173             offset += rc;
174         } while (offset < len);
175     }
176     return (1);
177 }
178
179 static int connection_input(void *d)
180 {
181         connection c = d;
182         return((*c->m->handler)(c->m->handler_arg,c));
183 }
184
185
186 static connection 
187 allocate_connection(manager        m,
188                     lnet_nid_t     nid,
189                     int            fd)
190 {
191     connection c=malloc(sizeof(struct connection));
192
193     c->m=m;
194     c->fd=fd;
195     c->peer_nid = nid;
196
197     register_io_handler(fd,READ_HANDLER,connection_input,c);
198     hash_table_insert(m->connections,c,&nid);
199     return(c);
200 }
201
202 int
203 tcpnal_write(lnet_nid_t nid, int sockfd, void *buffer, int nob)
204 {
205         int rc = syscall(SYS_write, sockfd, buffer, nob);
206         
207         /* NB called on an 'empty' socket with huge buffering! */
208         if (rc == nob)
209                 return 0;
210
211         if (rc < 0) {
212                 CERROR("Failed to send to %s: %s\n",
213                        libcfs_nid2str(nid), strerror(errno));
214                 return -1;
215         }
216         
217         CERROR("Short send to %s: %d/%d\n",
218                libcfs_nid2str(nid), rc, nob);
219         return -1;
220 }
221
222 int
223 tcpnal_read(lnet_nid_t nid, int sockfd, void *buffer, int nob) 
224 {
225         int       rc;
226
227         while (nob > 0) {
228                 rc = syscall(SYS_read, sockfd, buffer, nob);
229                 
230                 if (rc == 0) {
231                         CERROR("Unexpected EOF from %s\n",
232                                libcfs_nid2str(nid));
233                         return -1;
234                 }
235
236                 if (rc < 0) {
237                         CERROR("Failed to receive from %s: %s\n",
238                                libcfs_nid2str(nid), strerror(errno));
239                         return -1;
240                 }
241
242                 nob -= rc;
243         }
244         return 0;
245 }
246
247 int
248 tcpnal_hello (int sockfd, lnet_nid_t nid)
249 {
250         struct timeval          tv;
251         __u64                   incarnation;
252         int                     rc;
253         int                     nob;
254         lnet_acceptor_connreq_t cr;
255         lnet_hdr_t              hdr;
256         lnet_magicversion_t     hmv;
257
258         gettimeofday(&tv, NULL);
259         incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
260
261         memset(&cr, 0, sizeof(cr));
262         cr.acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;
263         cr.acr_version = LNET_PROTO_ACCEPTOR_VERSION;
264         cr.acr_nid     = nid;
265
266         /* hmv initialised and copied separately into hdr; compiler "optimize"
267          * likely due to confusion about pointer alias of hmv and hdr when this
268          * was done in-place. */
269         hmv.magic         = cpu_to_le32(LNET_PROTO_TCP_MAGIC);
270         hmv.version_major = cpu_to_le32(LNET_PROTO_TCP_VERSION_MAJOR);
271         hmv.version_minor = cpu_to_le32(LNET_PROTO_TCP_VERSION_MINOR);
272
273         memset (&hdr, 0, sizeof (hdr));
274
275         CLASSERT (sizeof (hmv) == sizeof (hdr.dest_nid));
276         memcpy(&hdr.dest_nid, &hmv, sizeof(hmv));
277
278         /* hdr.src_nid/src_pid are ignored at dest */
279
280         hdr.type    = cpu_to_le32(LNET_MSG_HELLO);
281         hdr.msg.hello.type = cpu_to_le32(SOCKLND_CONN_ANY);
282         hdr.msg.hello.incarnation = cpu_to_le64(incarnation);
283
284         /* I don't send any interface info */
285
286         /* Assume sufficient socket buffering for these messages... */
287         rc = tcpnal_write(nid, sockfd, &cr, sizeof(cr));
288         if (rc != 0)
289                 return -1;
290
291         rc = tcpnal_write(nid, sockfd, &hdr, sizeof(hdr));
292         if (rc != 0)
293                 return -1;
294
295         rc = tcpnal_read(nid, sockfd, &hmv, sizeof(hmv));
296         if (rc != 0)
297                 return -1;
298         
299         if (hmv.magic != le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
300                 CERROR ("Bad magic %#08x (%#08x expected) from %s\n",
301                         cpu_to_le32(hmv.magic), LNET_PROTO_TCP_MAGIC, 
302                         libcfs_nid2str(nid));
303                 return -1;
304         }
305
306         if (hmv.version_major != cpu_to_le16 (LNET_PROTO_TCP_VERSION_MAJOR) ||
307             hmv.version_minor != cpu_to_le16 (LNET_PROTO_TCP_VERSION_MINOR)) {
308                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
309                         " from %s\n",
310                         le16_to_cpu (hmv.version_major),
311                         le16_to_cpu (hmv.version_minor),
312                         LNET_PROTO_TCP_VERSION_MAJOR,
313                         LNET_PROTO_TCP_VERSION_MINOR,
314                         libcfs_nid2str(nid));
315                 return -1;
316         }
317
318 #if (LNET_PROTO_TCP_VERSION_MAJOR != 1)
319 # error "This code only understands protocol version 1.x"
320 #endif
321         /* version 1 sends magic/version as the dest_nid of a 'hello' header,
322          * so read the rest of it in now... */
323
324         rc = tcpnal_read(nid, sockfd, ((char *)&hdr) + sizeof (hmv),
325                          sizeof(hdr) - sizeof(hmv));
326         if (rc != 0)
327                 return -1;
328
329         /* ...and check we got what we expected */
330         if (hdr.type != cpu_to_le32 (LNET_MSG_HELLO)) {
331                 CERROR ("Expecting a HELLO hdr "
332                         " but got type %d with %d payload from %s\n",
333                         le32_to_cpu (hdr.type),
334                         le32_to_cpu (hdr.payload_length), libcfs_nid2str(nid));
335                 return -1;
336         }
337
338         if (le64_to_cpu(hdr.src_nid) == LNET_NID_ANY) {
339                 CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY\n");
340                 return -1;
341         }
342
343         if (nid != le64_to_cpu (hdr.src_nid)) {
344                 CERROR ("Connected to %s, but expecting %s\n",
345                         libcfs_nid2str(le64_to_cpu (hdr.src_nid)), 
346                         libcfs_nid2str(nid));
347                 return -1;
348         }
349
350         /* Ignore any interface info in the payload */
351         nob = le32_to_cpu(hdr.payload_length);
352         if (nob != 0) {
353                 CERROR("Unexpected HELLO payload %d from %s\n",
354                        nob, libcfs_nid2str(nid));
355                 return -1;
356         }
357
358         return 0;
359 }
360
361 /* Function:  force_tcp_connection
362  * Arguments: t: tcpnal
363  *            dest: portals endpoint for the connection
364  * Returns: an allocated connection structure, either
365  *          a pre-existing one, or a new connection
366  */
367 connection force_tcp_connection(manager    m,
368                                 lnet_nid_t nid,
369                                 procbridge pb)
370 {
371     unsigned int       ip = LNET_NIDADDR(nid);
372     connection         conn;
373     struct sockaddr_in addr;
374     struct sockaddr_in locaddr; 
375     int                fd;
376     int                option;
377     int                rc;
378     int                sz;
379
380     pthread_mutex_lock(&m->conn_lock);
381
382     conn = hash_table_find(m->connections, &nid);
383     if (conn)
384             goto out;
385
386     memset(&addr, 0, sizeof(addr));
387     addr.sin_family      = AF_INET;
388     addr.sin_addr.s_addr = htonl(ip);
389     addr.sin_port        = htons(tcpnal_acceptor_port);
390
391     memset(&locaddr, 0, sizeof(locaddr)); 
392     locaddr.sin_family = AF_INET; 
393     locaddr.sin_addr.s_addr = INADDR_ANY;
394     locaddr.sin_port = htons(m->port);
395
396 #if 1 /* tcpnal connects from a non-privileged port */
397     fd = socket(AF_INET, SOCK_STREAM, 0);
398     if (fd < 0) {
399             perror("tcpnal socket failed");
400             goto out;
401     } 
402
403     option = 1;
404     rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 
405                     &option, sizeof(option));
406     if (rc != 0) {
407             perror ("Can't set SO_REUSEADDR for socket"); 
408             close(fd);
409             goto out;
410     } 
411
412     if (m->port != 0) {
413             /* Bind all subsequent connections to the same port */
414             rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
415             if (rc != 0) {
416                     perror("Error binding port");
417                     close(fd);
418                     goto out;
419             }
420     }
421     
422     rc = connect(fd, (struct sockaddr *)&addr,
423                  sizeof(struct sockaddr_in));
424     if (rc != 0) {
425             perror("Error connecting to remote host");
426             close(fd);
427             goto out;
428     }
429
430     sz = sizeof(locaddr);
431     rc = getsockname(fd, (struct sockaddr *)&locaddr, &sz);
432     if (rc != 0) {
433             perror ("Error on getsockname");
434             close(fd);
435             goto out;
436     }
437
438     if (m->port == 0)
439             m->port = ntohs(locaddr.sin_port);
440     
441 #else
442     for (rport = IPPORT_RESERVED - 1; rport > IPPORT_RESERVED / 2; --rport) {
443             fd = socket(AF_INET, SOCK_STREAM, 0);
444             if (fd < 0) {
445                     perror("tcpnal socket failed");
446                     goto out;
447             } 
448             
449             option = 1;
450             rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 
451                             &option, sizeof(option));
452             if (rc != 0) {
453                     perror ("Can't set SO_REUSEADDR for socket"); 
454                     close(fd);
455                     goto out;
456             } 
457
458             locaddr.sin_port = htons(rport);
459             rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
460             if (rc == 0 || errno == EACCES) {
461                     rc = connect(fd, (struct sockaddr *)&addr,
462                                  sizeof(struct sockaddr_in));
463                     if (rc == 0) {
464                             break;
465                     } else if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) {
466                             perror("Error connecting to remote host");
467                             close(fd);
468                             goto out;
469                     }
470             } else if (errno != EADDRINUSE) {
471                     perror("Error binding to privileged port");
472                     close(fd);
473                     goto out;
474             }
475             close(fd);
476     }
477     
478     if (rport == IPPORT_RESERVED / 2) {
479             fprintf(stderr, "Out of ports trying to bind to a reserved port\n");
480             goto out;
481     }
482 #endif
483     
484     option = tcpnal_nagle ? 0 : 1;
485     setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
486     option = tcpnal_buffer_size;
487     if (option != 0) {
488             setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
489             option = tcpnal_buffer_size;
490             setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
491     }
492     
493     /* say hello */
494     if (tcpnal_hello(fd, nid))
495             goto out;
496     
497     conn = allocate_connection(m, nid, fd);
498     
499     /* let nal thread know this event right away */
500     if (conn)
501             procbridge_wakeup_nal(pb);
502
503 out:
504     pthread_mutex_unlock(&m->conn_lock);
505     return (conn);
506 }
507
508
509 #if 0                                           /* we don't accept connections */
510 /* Function:  new_connection
511  * Arguments: t: opaque argument holding the tcpname
512  * Returns: 1 in order to reregister for new connection requests
513  *
514  *  called when the bound service socket recieves
515  *     a new connection request, it always accepts and
516  *     installs a new connection
517  */
518 static int new_connection(void *z)
519 {
520     manager m=z;
521     struct sockaddr_in s;
522     int len=sizeof(struct sockaddr_in);
523     int fd=accept(m->bound,(struct sockaddr *)&s,&len);
524     unsigned int nid=*((unsigned int *)&s.sin_addr);
525     /* cfs specific hack */
526     //unsigned short pid=s.sin_port;
527     pthread_mutex_lock(&m->conn_lock);
528     allocate_connection(m,htonl(nid),0/*pid*/,fd);
529     pthread_mutex_unlock(&m->conn_lock);
530     return(1);
531 }
532
533 /* Function:  bind_socket
534  * Arguments: t: the nal state for this interface
535  *            port: the port to attempt to bind to
536  * Returns: 1 on success, or 0 on error
537  *
538  * bind_socket() attempts to allocate and bind a socket to the requested
539  *  port, or dynamically assign one from the kernel should the port be
540  *  zero. Sets the bound and bound_handler elements of m.
541  *
542  *  TODO: The port should be an explicitly sized type.
543  */
544 static int bind_socket(manager m,unsigned short port)
545 {
546     struct sockaddr_in addr;
547     int alen=sizeof(struct sockaddr_in);
548     
549     if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)  
550         return(0);
551     
552     bzero((char *) &addr, sizeof(addr));
553     addr.sin_family      = AF_INET;
554     addr.sin_addr.s_addr = 0;
555     addr.sin_port        = htons(port);
556
557     if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
558         perror ("tcpnal bind"); 
559         return(0);
560     }
561     
562     getsockname(m->bound,(struct sockaddr *)&addr, &alen);
563
564     m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
565                                          new_connection,m);
566     listen(m->bound,5); 
567     m->port=addr.sin_port;
568     return(1);
569 }
570 #endif
571
572
573 /* Function:  shutdown_connections
574  * Arguments: m: the manager structure
575  *
576  * close all connections and reclaim resources
577  */
578 void shutdown_connections(manager m)
579 {
580 #if 0
581         /* we don't accept connections */
582         close(m->bound);
583         remove_io_handler(m->bound_handler);
584 #endif
585         hash_destroy_table(m->connections,close_connection);
586         free(m);
587 }
588
589
590 /* Function:  init_connections
591  * Arguments: t: the nal state for this interface
592  * Returns: a newly allocated manager structure, or
593  *          zero if the fixed port could not be bound
594  */
595 manager init_connections(int (*input)(void *, void *), void *a)
596 {
597     manager m = (manager)malloc(sizeof(struct manager));
598
599     m->connections = hash_create_table(compare_connection,connection_key);
600     m->handler = input;
601     m->handler_arg = a;
602     m->port = 0;                                /* set on first connection */
603     pthread_mutex_init(&m->conn_lock, 0);
604
605     return m;
606 #if 0
607     if (bind_socket(m,pid))
608         return(m);
609
610     free(m);
611     return(0);
612 #endif
613 }