Whamcloud - gitweb
ed6835531c33f9324db7a1cfb6bcae10ee241018
[fs/lustre-release.git] / lnet / ulnds / socklnd / connection.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /* connection.c:
23    This file provides a simple stateful connection manager which
24    builds tcp connections on demand and leaves them open for
25    future use. It also provides the machinery to allow peers
26    to connect to it
27 */
28
29 #include <stdlib.h>
30 #include <pqtimer.h>
31 #include <dispatch.h>
32 #include <table.h>
33 #include <stdio.h>
34 #include <stdarg.h>
35 #include <string.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <netinet/tcp.h>
41 #include <portals/types.h>
42 #include <portals/lib-types.h>
43 #include <portals/socknal.h>
44 #include <libcfs/kp30.h>
45 #include <connection.h>
46 #include <pthread.h>
47 #include <errno.h>
48 #ifndef __CYGWIN__
49 #include <syscall.h>
50 #endif
51
52 /* global variable: acceptor port */
53 unsigned short tcpnal_acceptor_port = 988;
54
55
56 /* Function:  compare_connection
57  * Arguments: connection c:      a connection in the hash table
58  *            ptl_process_id_t:  an id to verify  agains
59  * Returns: 1 if the connection is the one requested, 0 otherwise
60  *
61  *    compare_connection() tests for collisions in the hash table
62  */
63 static int compare_connection(void *arg1, void *arg2)
64 {
65     connection c = arg1;
66     unsigned int * id = arg2;
67 #if 0
68     return((c->ip==id[0]) && (c->port==id[1]));
69 #else
70     /* CFS specific hacking */
71     return (c->ip == id[0]);
72 #endif
73 }
74
75
76 /* Function:  connection_key
77  * Arguments: ptl_process_id_t id:  an id to hash
78  * Returns: a not-particularily-well-distributed hash
79  *          of the id
80  */
81 static unsigned int connection_key(unsigned int *id)
82 {
83 #if 0
84     return(id[0]^id[1]);
85 #else
86     /* CFS specific hacking */
87     return (unsigned int) id[0];
88 #endif
89 }
90
91
92 /* Function:  remove_connection
93  * Arguments: c: the connection to remove
94  */
95 void remove_connection(void *arg)
96 {
97         connection c = arg;
98         unsigned int id[2];
99         
100         id[0]=c->ip;
101         id[1]=c->port;
102         hash_table_remove(c->m->connections,id);
103         close(c->fd);
104         free(c);
105 }
106
107
108 /* Function:  read_connection: 
109  * Arguments: c:    the connection to read from 
110  *            dest: the buffer to read into
111  *            len:  the number of bytes to read   
112  * Returns: success as 1, or failure as 0
113  *
114  *   read_connection() reads data from the connection, continuing
115  *   to read partial results until the request is satisfied or
116  *   it errors. TODO: this read should be covered by signal protection.
117  */
118 int read_connection(connection c,
119                     unsigned char *dest,
120                     int len)
121 {
122     int offset = 0,rc;
123
124     if (len) {
125         do {
126 #ifndef __CYGWIN__
127             rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
128 #else
129             rc = recv(c->fd, dest+offset, len-offset, 0);
130 #endif
131             if (rc <= 0) {
132                 if (errno == EINTR) {
133                     rc = 0;
134                 } else {
135                     remove_connection(c);
136                     return (0);
137                 }
138             }
139             offset += rc;
140         } while (offset < len);
141     }
142     return (1);
143 }
144
145 static int connection_input(void *d)
146 {
147         connection c = d;
148         return((*c->m->handler)(c->m->handler_arg,c));
149 }
150
151
152 /* Function:  allocate_connection
153  * Arguments: t:    tcpnal the allocation is occuring in the context of
154  *            dest: portal endpoint address for this connection
155  *            fd:   open file descriptor for the socket
156  * Returns: an allocated connection structure
157  *
158  * just encompasses the action common to active and passive
159  *  connections of allocation and placement in the global table
160  */
161 static connection allocate_connection(manager m,
162                                unsigned int ip,
163                                unsigned short port,
164                                int fd)
165 {
166     connection c=malloc(sizeof(struct connection));
167     unsigned int id[2];
168     c->m=m;
169     c->fd=fd;
170     c->ip=ip;
171     c->port=port;
172     id[0]=ip;
173     id[1]=port;
174     register_io_handler(fd,READ_HANDLER,connection_input,c);
175     hash_table_insert(m->connections,c,id);
176     return(c);
177 }
178
179
180 /* Function:  new_connection
181  * Arguments: t: opaque argument holding the tcpname
182  * Returns: 1 in order to reregister for new connection requests
183  *
184  *  called when the bound service socket recieves
185  *     a new connection request, it always accepts and
186  *     installs a new connection
187  */
188 static int new_connection(void *z)
189 {
190     manager m=z;
191     struct sockaddr_in s;
192     int len=sizeof(struct sockaddr_in);
193     int fd=accept(m->bound,(struct sockaddr *)&s,&len);
194     unsigned int nid=*((unsigned int *)&s.sin_addr);
195     /* cfs specific hack */
196     //unsigned short pid=s.sin_port;
197     pthread_mutex_lock(&m->conn_lock);
198     allocate_connection(m,htonl(nid),0/*pid*/,fd);
199     pthread_mutex_unlock(&m->conn_lock);
200     return(1);
201 }
202
203 extern ptl_nid_t tcpnal_mynid;
204
205 int
206 tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
207 {
208         int                 rc;
209         int                 nob;
210         ptl_hdr_t           hdr;
211         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
212
213         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
214
215         memset (&hdr, 0, sizeof (hdr));
216         hmv->magic         = cpu_to_le32(PORTALS_PROTO_MAGIC);
217         hmv->version_major = cpu_to_le32(PORTALS_PROTO_VERSION_MAJOR);
218         hmv->version_minor = cpu_to_le32(PORTALS_PROTO_VERSION_MINOR);
219         
220         hdr.src_nid = cpu_to_le64(tcpnal_mynid);
221         hdr.type    = cpu_to_le32(PTL_MSG_HELLO);
222
223         hdr.msg.hello.type = cpu_to_le32(type);
224         hdr.msg.hello.incarnation = cpu_to_le64(incarnation);
225
226         /* I don't send any interface info */
227
228         /* Assume sufficient socket buffering for this message */
229         rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
230         if (rc <= 0) {
231                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
232                 return (rc);
233         }
234
235         rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
236         if (rc <= 0) {
237                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
238                 return (rc);
239         }
240         
241         if (hmv->magic != le32_to_cpu(PORTALS_PROTO_MAGIC)) {
242                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
243                         cpu_to_le32(hmv->magic), PORTALS_PROTO_MAGIC, *nid);
244                 return (-EPROTO);
245         }
246
247         if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
248             hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
249                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
250                         " from "LPX64"\n",
251                         le16_to_cpu (hmv->version_major),
252                         le16_to_cpu (hmv->version_minor),
253                         PORTALS_PROTO_VERSION_MAJOR,
254                         PORTALS_PROTO_VERSION_MINOR,
255                         *nid);
256                 return (-EPROTO);
257         }
258
259 #if (PORTALS_PROTO_VERSION_MAJOR != 1)
260 # error "This code only understands protocol version 1.x"
261 #endif
262         /* version 1 sends magic/version as the dest_nid of a 'hello' header,
263          * so read the rest of it in now... */
264
265         rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
266         if (rc <= 0) {
267                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
268                         rc, *nid);
269                 return (rc);
270         }
271
272         /* ...and check we got what we expected */
273         if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
274                 CERROR ("Expecting a HELLO hdr "
275                         " but got type %d with %d payload from "LPX64"\n",
276                         le32_to_cpu (hdr.type),
277                         le32_to_cpu (hdr.payload_length), *nid);
278                 return (-EPROTO);
279         }
280
281         if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
282                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
283                 return (-EPROTO);
284         }
285
286         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
287                 *nid = le64_to_cpu(hdr.src_nid);
288         } else if (*nid != le64_to_cpu (hdr.src_nid)) {
289                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
290                         le64_to_cpu (hdr.src_nid), *nid);
291                 return (-EPROTO);
292         }
293
294         /* Ignore any interface info in the payload */
295         nob = le32_to_cpu(hdr.payload_length);
296         if (nob > getpagesize()) {
297                 CERROR("Unexpected HELLO payload %d from "LPX64"\n",
298                        nob, *nid);
299                 return (-EPROTO);
300         }
301         if (nob > 0) {
302                 char *space = (char *)malloc(nob);
303                 
304                 if (space == NULL) {
305                         CERROR("Can't allocate scratch buffer %d\n", nob);
306                         return (-ENOMEM);
307                 }
308                 
309                 rc = syscall(SYS_read, sockfd, space, nob);
310                 if (rc <= 0) {
311                         CERROR("Error %d skipping HELLO payload from "
312                                LPX64"\n", rc, *nid);
313                         return (rc);
314                 }
315         }
316
317         return (0);
318 }
319
320 /* Function:  force_tcp_connection
321  * Arguments: t: tcpnal
322  *            dest: portals endpoint for the connection
323  * Returns: an allocated connection structure, either
324  *          a pre-existing one, or a new connection
325  */
326 connection force_tcp_connection(manager m,
327                                 unsigned int ip,
328                                 unsigned short port,
329                                 procbridge pb)
330 {
331     connection conn;
332     struct sockaddr_in addr;
333     struct sockaddr_in locaddr; 
334     unsigned int id[2];
335     struct timeval tv;
336     __u64 incarnation;
337
338     int fd;
339     int option;
340     int rc;
341     int rport;
342     ptl_nid_t peernid = PTL_NID_ANY;
343
344     port = tcpnal_acceptor_port;
345
346     id[0] = ip;
347     id[1] = port;
348
349     pthread_mutex_lock(&m->conn_lock);
350
351     conn = hash_table_find(m->connections, id);
352     if (conn)
353             goto out;
354
355     memset(&addr, 0, sizeof(addr));
356     addr.sin_family      = AF_INET;
357     addr.sin_addr.s_addr = htonl(ip);
358     addr.sin_port        = htons(port);
359
360     memset(&locaddr, 0, sizeof(locaddr)); 
361     locaddr.sin_family = AF_INET; 
362     locaddr.sin_addr.s_addr = INADDR_ANY;
363
364     for (rport = IPPORT_RESERVED - 1; rport > IPPORT_RESERVED / 2; --rport) {
365             fd = socket(AF_INET, SOCK_STREAM, 0);
366             if (fd < 0) {
367                     perror("tcpnal socket failed");
368                     goto out;
369             } 
370             
371             option = 1;
372             rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 
373                             &option, sizeof(option));
374             if (rc != 0) {
375                     perror ("Can't set SO_REUSEADDR for socket"); 
376                     close(fd);
377                     goto out;
378             } 
379
380             locaddr.sin_port = htons(rport);
381             rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
382             if (rc == 0 || errno == EACCES) {
383                     rc = connect(fd, (struct sockaddr *)&addr,
384                                  sizeof(struct sockaddr_in));
385                     if (rc == 0) {
386                             break;
387                     } else if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) {
388                             perror("Error connecting to remote host");
389                             close(fd);
390                             goto out;
391                     }
392             } else if (errno != EADDRINUSE) {
393                     perror("Error binding to privileged port");
394                     close(fd);
395                     goto out;
396             }
397             close(fd);
398     }
399     
400     if (rport == IPPORT_RESERVED / 2) {
401             fprintf(stderr, "Out of ports trying to bind to a reserved port\n");
402             goto out;
403     }
404     
405 #if 1
406     option = 1;
407     setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
408     option = 1<<20;
409     setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
410     option = 1<<20;
411     setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
412 #endif
413    
414     gettimeofday(&tv, NULL);
415     incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
416
417     /* say hello */
418     if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, incarnation))
419             exit(-1);
420     
421     conn = allocate_connection(m, ip, port, fd);
422     
423     /* let nal thread know this event right away */
424     if (conn)
425             procbridge_wakeup_nal(pb);
426
427 out:
428     pthread_mutex_unlock(&m->conn_lock);
429     return (conn);
430 }
431
432
433 /* Function:  bind_socket
434  * Arguments: t: the nal state for this interface
435  *            port: the port to attempt to bind to
436  * Returns: 1 on success, or 0 on error
437  *
438  * bind_socket() attempts to allocate and bind a socket to the requested
439  *  port, or dynamically assign one from the kernel should the port be
440  *  zero. Sets the bound and bound_handler elements of m.
441  *
442  *  TODO: The port should be an explicitly sized type.
443  */
444 static int bind_socket(manager m,unsigned short port)
445 {
446     struct sockaddr_in addr;
447     int alen=sizeof(struct sockaddr_in);
448     
449     if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)  
450         return(0);
451     
452     bzero((char *) &addr, sizeof(addr));
453     addr.sin_family      = AF_INET;
454     addr.sin_addr.s_addr = 0;
455     addr.sin_port        = htons(port);
456
457     if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
458         perror ("tcpnal bind"); 
459         return(0);
460     }
461     
462     getsockname(m->bound,(struct sockaddr *)&addr, &alen);
463
464     m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
465                                          new_connection,m);
466     listen(m->bound,5); 
467     m->port=addr.sin_port;
468     return(1);
469 }
470
471
472 /* Function:  shutdown_connections
473  * Arguments: m: the manager structure
474  *
475  * close all connections and reclaim resources
476  */
477 void shutdown_connections(manager m)
478 {
479     close(m->bound);
480     remove_io_handler(m->bound_handler);
481     hash_destroy_table(m->connections,remove_connection);
482     free(m);
483 }
484
485
486 /* Function:  init_connections
487  * Arguments: t: the nal state for this interface
488  *            port: the port to attempt to bind to
489  * Returns: a newly allocated manager structure, or
490  *          zero if the fixed port could not be bound
491  */
492 manager init_connections(unsigned short pid,
493                          int (*input)(void *, void *),
494                          void *a)
495 {
496     manager m = (manager)malloc(sizeof(struct manager));
497     m->connections = hash_create_table(compare_connection,connection_key);
498     m->handler = input;
499     m->handler_arg = a;
500     pthread_mutex_init(&m->conn_lock, 0);
501
502     if (bind_socket(m,pid))
503         return(m);
504
505     free(m);
506     return(0);
507 }