Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / portals / unals / connection.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /* connection.c:
23    This file provides a simple stateful connection manager which
24    builds tcp connections on demand and leaves them open for
25    future use. It also provides the machinery to allow peers
26    to connect to it
27 */
28
29 #include <stdlib.h>
30 #include <pqtimer.h>
31 #include <dispatch.h>
32 #include <table.h>
33 #include <stdio.h>
34 #include <stdarg.h>
35 #include <string.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <netinet/tcp.h>
41 #include <portals/types.h>
42 #include <portals/list.h>
43 #include <portals/lib-types.h>
44 #include <portals/socknal.h>
45 #include <linux/kp30.h>
46 #include <connection.h>
47 #include <pthread.h>
48 #include <errno.h>
49 #ifndef __CYGWIN__
50 #include <syscall.h>
51 #endif
52
53 /* global variable: acceptor port */
54 unsigned short tcpnal_acceptor_port = 988;
55
56
57 /* Function:  compare_connection
58  * Arguments: connection c:      a connection in the hash table
59  *            ptl_process_id_t:  an id to verify  agains
60  * Returns: 1 if the connection is the one requested, 0 otherwise
61  *
62  *    compare_connection() tests for collisions in the hash table
63  */
64 static int compare_connection(void *arg1, void *arg2)
65 {
66     connection c = arg1;
67     unsigned int * id = arg2;
68 #if 0
69     return((c->ip==id[0]) && (c->port==id[1]));
70 #else
71     /* CFS specific hacking */
72     return (c->ip == id[0]);
73 #endif
74 }
75
76
77 /* Function:  connection_key
78  * Arguments: ptl_process_id_t id:  an id to hash
79  * Returns: a not-particularily-well-distributed hash
80  *          of the id
81  */
82 static unsigned int connection_key(unsigned int *id)
83 {
84 #if 0
85     return(id[0]^id[1]);
86 #else
87     /* CFS specific hacking */
88     return (unsigned int) id[0];
89 #endif
90 }
91
92
93 /* Function:  remove_connection
94  * Arguments: c: the connection to remove
95  */
96 void remove_connection(void *arg)
97 {
98         connection c = arg;
99         unsigned int id[2];
100         
101         id[0]=c->ip;
102         id[1]=c->port;
103         hash_table_remove(c->m->connections,id);
104         close(c->fd);
105         free(c);
106 }
107
108
109 /* Function:  read_connection: 
110  * Arguments: c:    the connection to read from 
111  *            dest: the buffer to read into
112  *            len:  the number of bytes to read   
113  * Returns: success as 1, or failure as 0
114  *
115  *   read_connection() reads data from the connection, continuing
116  *   to read partial results until the request is satisfied or
117  *   it errors. TODO: this read should be covered by signal protection.
118  */
119 int read_connection(connection c,
120                     unsigned char *dest,
121                     int len)
122 {
123     int offset = 0,rc;
124
125     if (len) {
126         do {
127 #ifndef __CYGWIN__
128             rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
129 #else
130             rc = recv(c->fd, dest+offset, len-offset, 0);
131 #endif
132             if (rc <= 0) {
133                 if (errno == EINTR) {
134                     rc = 0;
135                 } else {
136                     remove_connection(c);
137                     return (0);
138                 }
139             }
140             offset += rc;
141         } while (offset < len);
142     }
143     return (1);
144 }
145
146 static int connection_input(void *d)
147 {
148         connection c = d;
149         return((*c->m->handler)(c->m->handler_arg,c));
150 }
151
152
153 /* Function:  allocate_connection
154  * Arguments: t:    tcpnal the allocation is occuring in the context of
155  *            dest: portal endpoint address for this connection
156  *            fd:   open file descriptor for the socket
157  * Returns: an allocated connection structure
158  *
159  * just encompasses the action common to active and passive
160  *  connections of allocation and placement in the global table
161  */
162 static connection allocate_connection(manager m,
163                                unsigned int ip,
164                                unsigned short port,
165                                int fd)
166 {
167     connection c=malloc(sizeof(struct connection));
168     unsigned int id[2];
169     c->m=m;
170     c->fd=fd;
171     c->ip=ip;
172     c->port=port;
173     id[0]=ip;
174     id[1]=port;
175     register_io_handler(fd,READ_HANDLER,connection_input,c);
176     hash_table_insert(m->connections,c,id);
177     return(c);
178 }
179
180
181 /* Function:  new_connection
182  * Arguments: t: opaque argument holding the tcpname
183  * Returns: 1 in order to reregister for new connection requests
184  *
185  *  called when the bound service socket recieves
186  *     a new connection request, it always accepts and
187  *     installs a new connection
188  */
189 static int new_connection(void *z)
190 {
191     manager m=z;
192     struct sockaddr_in s;
193     int len=sizeof(struct sockaddr_in);
194     int fd=accept(m->bound,(struct sockaddr *)&s,&len);
195     unsigned int nid=*((unsigned int *)&s.sin_addr);
196     /* cfs specific hack */
197     //unsigned short pid=s.sin_port;
198     pthread_mutex_lock(&m->conn_lock);
199     allocate_connection(m,htonl(nid),0/*pid*/,fd);
200     pthread_mutex_unlock(&m->conn_lock);
201     return(1);
202 }
203
204 /* FIXME assuming little endian, cleanup!! */
205 #define __cpu_to_le64(x) ((__u64)(x))
206 #define __le64_to_cpu(x) ((__u64)(x))
207 #define __cpu_to_le32(x) ((__u32)(x))
208 #define __le32_to_cpu(x) ((__u32)(x))
209 #define __cpu_to_le16(x) ((__u16)(x))
210 #define __le16_to_cpu(x) ((__u16)(x))
211
212 extern ptl_nid_t tcpnal_mynid;
213
214 int
215 tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
216 {
217         int                 rc;
218         ptl_hdr_t           hdr;
219         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
220
221         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
222
223         memset (&hdr, 0, sizeof (hdr));
224         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
225         hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
226         hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
227         
228         hdr.src_nid = __cpu_to_le64 (tcpnal_mynid);
229         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
230
231         hdr.msg.hello.type = __cpu_to_le32 (type);
232         hdr.msg.hello.incarnation = 0;
233
234         /* Assume sufficient socket buffering for this message */
235         rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
236         if (rc <= 0) {
237                 CERROR ("Error %d sending HELLO to %llx\n", rc, *nid);
238                 return (rc);
239         }
240
241         rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
242         if (rc <= 0) {
243                 CERROR ("Error %d reading HELLO from %llx\n", rc, *nid);
244                 return (rc);
245         }
246         
247         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
248                 CERROR ("Bad magic %#08x (%#08x expected) from %llx\n",
249                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
250                 return (-EPROTO);
251         }
252
253         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
254             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
255                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
256                         " from %llx\n",
257                         __le16_to_cpu (hmv->version_major),
258                         __le16_to_cpu (hmv->version_minor),
259                         PORTALS_PROTO_VERSION_MAJOR,
260                         PORTALS_PROTO_VERSION_MINOR,
261                         *nid);
262                 return (-EPROTO);
263         }
264
265 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
266 # error "This code only understands protocol version 0.x"
267 #endif
268         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
269          * so read the rest of it in now... */
270
271         rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
272         if (rc <= 0) {
273                 CERROR ("Error %d reading rest of HELLO hdr from %llx\n",
274                         rc, *nid);
275                 return (rc);
276         }
277
278         /* ...and check we got what we expected */
279         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
280             hdr.payload_length != __cpu_to_le32 (0)) {
281                 CERROR ("Expecting a HELLO hdr with 0 payload,"
282                         " but got type %d with %d payload from %llx\n",
283                         __le32_to_cpu (hdr.type),
284                         __le32_to_cpu (hdr.payload_length), *nid);
285                 return (-EPROTO);
286         }
287
288         if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
289                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
290                 return (-EPROTO);
291         }
292
293         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
294                 *nid = __le64_to_cpu(hdr.src_nid);
295         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
296                 CERROR ("Connected to nid %llx, but expecting %llx\n",
297                         __le64_to_cpu (hdr.src_nid), *nid);
298                 return (-EPROTO);
299         }
300
301         return (0);
302 }
303
304 /* Function:  force_tcp_connection
305  * Arguments: t: tcpnal
306  *            dest: portals endpoint for the connection
307  * Returns: an allocated connection structure, either
308  *          a pre-existing one, or a new connection
309  */
310 connection force_tcp_connection(manager m,
311                                 unsigned int ip,
312                                 unsigned short port)
313 {
314     connection conn;
315     struct sockaddr_in addr;
316     unsigned int id[2];
317
318     port = tcpnal_acceptor_port;
319
320     id[0] = ip;
321     id[1] = port;
322
323     pthread_mutex_lock(&m->conn_lock);
324
325     conn = hash_table_find(m->connections, id);
326     if (!conn) {
327         int fd;
328         int option;
329         ptl_nid_t peernid = PTL_NID_ANY;
330
331         bzero((char *) &addr, sizeof(addr));
332         addr.sin_family      = AF_INET;
333         addr.sin_addr.s_addr = htonl(ip);
334         addr.sin_port        = htons(port);
335
336         if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 
337             perror("tcpnal socket failed");
338             exit(-1);
339         }
340         if (connect(fd, (struct sockaddr *)&addr,
341                     sizeof(struct sockaddr_in))) {
342             perror("tcpnal connect");
343             return(0);
344         }
345
346 #if 1
347         option = 1;
348         setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
349         option = 1<<20;
350         setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
351         option = 1<<20;
352         setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
353 #endif
354    
355         /* say hello */
356         if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, 0))
357             exit(-1);
358
359         conn = allocate_connection(m, ip, port, fd);
360     }
361
362     pthread_mutex_unlock(&m->conn_lock);
363     return (conn);
364 }
365
366
367 /* Function:  bind_socket
368  * Arguments: t: the nal state for this interface
369  *            port: the port to attempt to bind to
370  * Returns: 1 on success, or 0 on error
371  *
372  * bind_socket() attempts to allocate and bind a socket to the requested
373  *  port, or dynamically assign one from the kernel should the port be
374  *  zero. Sets the bound and bound_handler elements of m.
375  *
376  *  TODO: The port should be an explicitly sized type.
377  */
378 static int bind_socket(manager m,unsigned short port)
379 {
380     struct sockaddr_in addr;
381     int alen=sizeof(struct sockaddr_in);
382     
383     if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)  
384         return(0);
385     
386     bzero((char *) &addr, sizeof(addr));
387     addr.sin_family      = AF_INET;
388     addr.sin_addr.s_addr = 0;
389     addr.sin_port        = htons(port);
390
391     if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
392         perror ("tcpnal bind"); 
393         return(0);
394     }
395     
396     getsockname(m->bound,(struct sockaddr *)&addr, &alen);
397
398     m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
399                                          new_connection,m);
400     listen(m->bound,5); 
401     m->port=addr.sin_port;
402     return(1);
403 }
404
405
406 /* Function:  shutdown_connections
407  * Arguments: m: the manager structure
408  *
409  * close all connections and reclaim resources
410  */
411 void shutdown_connections(manager m)
412 {
413     close(m->bound);
414     remove_io_handler(m->bound_handler);
415     hash_destroy_table(m->connections,remove_connection);
416     free(m);
417 }
418
419
420 /* Function:  init_connections
421  * Arguments: t: the nal state for this interface
422  *            port: the port to attempt to bind to
423  * Returns: a newly allocated manager structure, or
424  *          zero if the fixed port could not be bound
425  */
426 manager init_connections(unsigned short pid,
427                          int (*input)(void *, void *),
428                          void *a)
429 {
430     manager m = (manager)malloc(sizeof(struct manager));
431     m->connections = hash_create_table(compare_connection,connection_key);
432     m->handler = input;
433     m->handler_arg = a;
434     pthread_mutex_init(&m->conn_lock, 0);
435
436     if (bind_socket(m,pid))
437         return(m);
438
439     free(m);
440     return(0);
441 }