Whamcloud - gitweb
* landed unified portals (b_hd_cleanup_merge_singleportals) on HEAD
[fs/lustre-release.git] / lustre / portals / unals / connection.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /* connection.c:
23    This file provides a simple stateful connection manager which
24    builds tcp connections on demand and leaves them open for
25    future use. It also provides the machinery to allow peers
26    to connect to it
27 */
28
29 #include <stdlib.h>
30 #include <pqtimer.h>
31 #include <dispatch.h>
32 #include <table.h>
33 #include <stdio.h>
34 #include <stdarg.h>
35 #include <string.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <netinet/tcp.h>
41 #include <portals/types.h>
42 #include <portals/list.h>
43 #include <portals/lib-types.h>
44 #include <portals/socknal.h>
45 #include <linux/kp30.h>
46 #include <connection.h>
47 #include <pthread.h>
48 #include <errno.h>
49 #ifndef __CYGWIN__
50 #include <syscall.h>
51 #endif
52
53 /* global variable: acceptor port */
54 unsigned short tcpnal_acceptor_port = 988;
55
56
57 /* Function:  compare_connection
58  * Arguments: connection c:      a connection in the hash table
59  *            ptl_process_id_t:  an id to verify  agains
60  * Returns: 1 if the connection is the one requested, 0 otherwise
61  *
62  *    compare_connection() tests for collisions in the hash table
63  */
64 static int compare_connection(void *arg1, void *arg2)
65 {
66     connection c = arg1;
67     unsigned int * id = arg2;
68 #if 0
69     return((c->ip==id[0]) && (c->port==id[1]));
70 #else
71     /* CFS specific hacking */
72     return (c->ip == id[0]);
73 #endif
74 }
75
76
77 /* Function:  connection_key
78  * Arguments: ptl_process_id_t id:  an id to hash
79  * Returns: a not-particularily-well-distributed hash
80  *          of the id
81  */
82 static unsigned int connection_key(unsigned int *id)
83 {
84 #if 0
85     return(id[0]^id[1]);
86 #else
87     /* CFS specific hacking */
88     return (unsigned int) id[0];
89 #endif
90 }
91
92
93 /* Function:  remove_connection
94  * Arguments: c: the connection to remove
95  */
96 void remove_connection(void *arg)
97 {
98         connection c = arg;
99         unsigned int id[2];
100         
101         id[0]=c->ip;
102         id[1]=c->port;
103         hash_table_remove(c->m->connections,id);
104         close(c->fd);
105         free(c);
106 }
107
108
109 /* Function:  read_connection: 
110  * Arguments: c:    the connection to read from 
111  *            dest: the buffer to read into
112  *            len:  the number of bytes to read   
113  * Returns: success as 1, or failure as 0
114  *
115  *   read_connection() reads data from the connection, continuing
116  *   to read partial results until the request is satisfied or
117  *   it errors. TODO: this read should be covered by signal protection.
118  */
119 int read_connection(connection c,
120                     unsigned char *dest,
121                     int len)
122 {
123     int offset = 0,rc;
124
125     if (len) {
126         do {
127 #ifndef __CYGWIN__
128             rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
129 #else
130             rc = recv(c->fd, dest+offset, len-offset, 0);
131 #endif
132             if (rc <= 0) {
133                 if (errno == EINTR) {
134                     rc = 0;
135                 } else {
136                     remove_connection(c);
137                     return (0);
138                 }
139             }
140             offset += rc;
141         } while (offset < len);
142     }
143     return (1);
144 }
145
146 static int connection_input(void *d)
147 {
148         connection c = d;
149         return((*c->m->handler)(c->m->handler_arg,c));
150 }
151
152
153 /* Function:  allocate_connection
154  * Arguments: t:    tcpnal the allocation is occuring in the context of
155  *            dest: portal endpoint address for this connection
156  *            fd:   open file descriptor for the socket
157  * Returns: an allocated connection structure
158  *
159  * just encompasses the action common to active and passive
160  *  connections of allocation and placement in the global table
161  */
162 static connection allocate_connection(manager m,
163                                unsigned int ip,
164                                unsigned short port,
165                                int fd)
166 {
167     connection c=malloc(sizeof(struct connection));
168     unsigned int id[2];
169     c->m=m;
170     c->fd=fd;
171     c->ip=ip;
172     c->port=port;
173     id[0]=ip;
174     id[1]=port;
175     register_io_handler(fd,READ_HANDLER,connection_input,c);
176     hash_table_insert(m->connections,c,id);
177     return(c);
178 }
179
180
181 /* Function:  new_connection
182  * Arguments: t: opaque argument holding the tcpname
183  * Returns: 1 in order to reregister for new connection requests
184  *
185  *  called when the bound service socket recieves
186  *     a new connection request, it always accepts and
187  *     installs a new connection
188  */
189 static int new_connection(void *z)
190 {
191     manager m=z;
192     struct sockaddr_in s;
193     int len=sizeof(struct sockaddr_in);
194     int fd=accept(m->bound,(struct sockaddr *)&s,&len);
195     unsigned int nid=*((unsigned int *)&s.sin_addr);
196     /* cfs specific hack */
197     //unsigned short pid=s.sin_port;
198     pthread_mutex_lock(&m->conn_lock);
199     allocate_connection(m,htonl(nid),0/*pid*/,fd);
200     pthread_mutex_unlock(&m->conn_lock);
201     return(1);
202 }
203
204 extern ptl_nid_t tcpnal_mynid;
205
206 int
207 tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
208 {
209         int                 rc;
210         int                 nob;
211         ptl_hdr_t           hdr;
212         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
213
214         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
215
216         memset (&hdr, 0, sizeof (hdr));
217         hmv->magic         = cpu_to_le32(PORTALS_PROTO_MAGIC);
218         hmv->version_major = cpu_to_le32(PORTALS_PROTO_VERSION_MAJOR);
219         hmv->version_minor = cpu_to_le32(PORTALS_PROTO_VERSION_MINOR);
220         
221         hdr.src_nid = cpu_to_le64(tcpnal_mynid);
222         hdr.type    = cpu_to_le32(PTL_MSG_HELLO);
223
224         hdr.msg.hello.type = cpu_to_le32(type);
225         hdr.msg.hello.incarnation = cpu_to_le64(incarnation);
226
227         /* I don't send any interface info */
228
229         /* Assume sufficient socket buffering for this message */
230         rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
231         if (rc <= 0) {
232                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
233                 return (rc);
234         }
235
236         rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
237         if (rc <= 0) {
238                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
239                 return (rc);
240         }
241         
242         if (hmv->magic != le32_to_cpu(PORTALS_PROTO_MAGIC)) {
243                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
244                         cpu_to_le32(hmv->magic), PORTALS_PROTO_MAGIC, *nid);
245                 return (-EPROTO);
246         }
247
248         if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
249             hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
250                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
251                         " from "LPX64"\n",
252                         le16_to_cpu (hmv->version_major),
253                         le16_to_cpu (hmv->version_minor),
254                         PORTALS_PROTO_VERSION_MAJOR,
255                         PORTALS_PROTO_VERSION_MINOR,
256                         *nid);
257                 return (-EPROTO);
258         }
259
260 #if (PORTALS_PROTO_VERSION_MAJOR != 1)
261 # error "This code only understands protocol version 1.x"
262 #endif
263         /* version 1 sends magic/version as the dest_nid of a 'hello' header,
264          * so read the rest of it in now... */
265
266         rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
267         if (rc <= 0) {
268                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
269                         rc, *nid);
270                 return (rc);
271         }
272
273         /* ...and check we got what we expected */
274         if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
275                 CERROR ("Expecting a HELLO hdr "
276                         " but got type %d with %d payload from "LPX64"\n",
277                         le32_to_cpu (hdr.type),
278                         le32_to_cpu (hdr.payload_length), *nid);
279                 return (-EPROTO);
280         }
281
282         if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
283                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
284                 return (-EPROTO);
285         }
286
287         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
288                 *nid = le64_to_cpu(hdr.src_nid);
289         } else if (*nid != le64_to_cpu (hdr.src_nid)) {
290                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
291                         le64_to_cpu (hdr.src_nid), *nid);
292                 return (-EPROTO);
293         }
294
295         /* Ignore any interface info in the payload */
296         nob = le32_to_cpu(hdr.payload_length);
297         if (nob > getpagesize()) {
298                 CERROR("Unexpected HELLO payload %d from "LPX64"\n",
299                        nob, *nid);
300                 return (-EPROTO);
301         }
302         if (nob > 0) {
303                 char *space = (char *)malloc(nob);
304                 
305                 if (space == NULL) {
306                         CERROR("Can't allocate scratch buffer %d\n", nob);
307                         return (-ENOMEM);
308                 }
309                 
310                 rc = syscall(SYS_read, sockfd, space, nob);
311                 if (rc <= 0) {
312                         CERROR("Error %d skipping HELLO payload from "
313                                LPX64"\n", rc, *nid);
314                         return (rc);
315                 }
316         }
317
318         return (0);
319 }
320
321 /* Function:  force_tcp_connection
322  * Arguments: t: tcpnal
323  *            dest: portals endpoint for the connection
324  * Returns: an allocated connection structure, either
325  *          a pre-existing one, or a new connection
326  */
327 connection force_tcp_connection(manager m,
328                                 unsigned int ip,
329                                 unsigned short port,
330                                 procbridge pb)
331 {
332     connection conn;
333     struct sockaddr_in addr;
334     struct sockaddr_in locaddr; 
335     unsigned int id[2];
336     struct timeval tv;
337     __u64 incarnation;
338
339     int fd;
340     int option;
341     int rc;
342     int rport;
343     ptl_nid_t peernid = PTL_NID_ANY;
344
345     port = tcpnal_acceptor_port;
346
347     id[0] = ip;
348     id[1] = port;
349
350     pthread_mutex_lock(&m->conn_lock);
351
352     conn = hash_table_find(m->connections, id);
353     if (conn)
354             goto out;
355
356     memset(&addr, 0, sizeof(addr));
357     addr.sin_family      = AF_INET;
358     addr.sin_addr.s_addr = htonl(ip);
359     addr.sin_port        = htons(port);
360
361     memset(&locaddr, 0, sizeof(locaddr)); 
362     locaddr.sin_family = AF_INET; 
363     locaddr.sin_addr.s_addr = INADDR_ANY;
364
365     for (rport = IPPORT_RESERVED - 1; rport > IPPORT_RESERVED / 2; --rport) {
366             fd = socket(AF_INET, SOCK_STREAM, 0);
367             if (fd < 0) {
368                     perror("tcpnal socket failed");
369                     goto out;
370             } 
371             
372             option = 1;
373             rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 
374                             &option, sizeof(option));
375             if (rc != 0) {
376                     perror ("Can't set SO_REUSEADDR for socket"); 
377                     close(fd);
378                     goto out;
379             } 
380
381             locaddr.sin_port = htons(rport);
382             rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
383             if (rc == 0 || errno == EACCES) {
384                     rc = connect(fd, (struct sockaddr *)&addr,
385                                  sizeof(struct sockaddr_in));
386                     if (rc == 0) {
387                             break;
388                     } else if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) {
389                             perror("Error connecting to remote host");
390                             close(fd);
391                             goto out;
392                     }
393             } else if (errno != EADDRINUSE) {
394                     perror("Error binding to privileged port");
395                     close(fd);
396                     goto out;
397             }
398             close(fd);
399     }
400     
401     if (rport == IPPORT_RESERVED / 2) {
402             fprintf(stderr, "Out of ports trying to bind to a reserved port\n");
403             goto out;
404     }
405     
406 #if 1
407     option = 1;
408     setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
409     option = 1<<20;
410     setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
411     option = 1<<20;
412     setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
413 #endif
414    
415     gettimeofday(&tv, NULL);
416     incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
417
418     /* say hello */
419     if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, incarnation))
420             exit(-1);
421     
422     conn = allocate_connection(m, ip, port, fd);
423     
424     /* let nal thread know this event right away */
425     if (conn)
426             procbridge_wakeup_nal(pb);
427
428 out:
429     pthread_mutex_unlock(&m->conn_lock);
430     return (conn);
431 }
432
433
434 /* Function:  bind_socket
435  * Arguments: t: the nal state for this interface
436  *            port: the port to attempt to bind to
437  * Returns: 1 on success, or 0 on error
438  *
439  * bind_socket() attempts to allocate and bind a socket to the requested
440  *  port, or dynamically assign one from the kernel should the port be
441  *  zero. Sets the bound and bound_handler elements of m.
442  *
443  *  TODO: The port should be an explicitly sized type.
444  */
445 static int bind_socket(manager m,unsigned short port)
446 {
447     struct sockaddr_in addr;
448     int alen=sizeof(struct sockaddr_in);
449     
450     if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)  
451         return(0);
452     
453     bzero((char *) &addr, sizeof(addr));
454     addr.sin_family      = AF_INET;
455     addr.sin_addr.s_addr = 0;
456     addr.sin_port        = htons(port);
457
458     if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
459         perror ("tcpnal bind"); 
460         return(0);
461     }
462     
463     getsockname(m->bound,(struct sockaddr *)&addr, &alen);
464
465     m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
466                                          new_connection,m);
467     listen(m->bound,5); 
468     m->port=addr.sin_port;
469     return(1);
470 }
471
472
473 /* Function:  shutdown_connections
474  * Arguments: m: the manager structure
475  *
476  * close all connections and reclaim resources
477  */
478 void shutdown_connections(manager m)
479 {
480     close(m->bound);
481     remove_io_handler(m->bound_handler);
482     hash_destroy_table(m->connections,remove_connection);
483     free(m);
484 }
485
486
487 /* Function:  init_connections
488  * Arguments: t: the nal state for this interface
489  *            port: the port to attempt to bind to
490  * Returns: a newly allocated manager structure, or
491  *          zero if the fixed port could not be bound
492  */
493 manager init_connections(unsigned short pid,
494                          int (*input)(void *, void *),
495                          void *a)
496 {
497     manager m = (manager)malloc(sizeof(struct manager));
498     m->connections = hash_create_table(compare_connection,connection_key);
499     m->handler = input;
500     m->handler_arg = a;
501     pthread_mutex_init(&m->conn_lock, 0);
502
503     if (bind_socket(m,pid))
504         return(m);
505
506     free(m);
507     return(0);
508 }