Whamcloud - gitweb
Land b1_2 onto HEAD (20040304_171022)
[fs/lustre-release.git] / lnet / ulnds / connection.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /* connection.c:
23    This file provides a simple stateful connection manager which
24    builds tcp connections on demand and leaves them open for
25    future use. It also provides the machinery to allow peers
26    to connect to it
27 */
28
29 #include <stdlib.h>
30 #include <pqtimer.h>
31 #include <dispatch.h>
32 #include <table.h>
33 #include <stdio.h>
34 #include <stdarg.h>
35 #include <string.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <netinet/tcp.h>
41 #include <portals/types.h>
42 #include <portals/list.h>
43 #include <portals/lib-types.h>
44 #include <portals/socknal.h>
45 #include <linux/kp30.h>
46 #include <connection.h>
47 #include <pthread.h>
48 #include <errno.h>
49 #ifndef __CYGWIN__
50 #include <syscall.h>
51 #endif
52
53 /* global variable: acceptor port */
54 unsigned short tcpnal_acceptor_port = 988;
55
56
57 /* Function:  compare_connection
58  * Arguments: connection c:      a connection in the hash table
59  *            ptl_process_id_t:  an id to verify  agains
60  * Returns: 1 if the connection is the one requested, 0 otherwise
61  *
62  *    compare_connection() tests for collisions in the hash table
63  */
64 static int compare_connection(void *arg1, void *arg2)
65 {
66     connection c = arg1;
67     unsigned int * id = arg2;
68 #if 0
69     return((c->ip==id[0]) && (c->port==id[1]));
70 #else
71     /* CFS specific hacking */
72     return (c->ip == id[0]);
73 #endif
74 }
75
76
77 /* Function:  connection_key
78  * Arguments: ptl_process_id_t id:  an id to hash
79  * Returns: a not-particularily-well-distributed hash
80  *          of the id
81  */
82 static unsigned int connection_key(unsigned int *id)
83 {
84 #if 0
85     return(id[0]^id[1]);
86 #else
87     /* CFS specific hacking */
88     return (unsigned int) id[0];
89 #endif
90 }
91
92
93 /* Function:  remove_connection
94  * Arguments: c: the connection to remove
95  */
96 void remove_connection(void *arg)
97 {
98         connection c = arg;
99         unsigned int id[2];
100         
101         id[0]=c->ip;
102         id[1]=c->port;
103         hash_table_remove(c->m->connections,id);
104         close(c->fd);
105         free(c);
106 }
107
108
109 /* Function:  read_connection: 
110  * Arguments: c:    the connection to read from 
111  *            dest: the buffer to read into
112  *            len:  the number of bytes to read   
113  * Returns: success as 1, or failure as 0
114  *
115  *   read_connection() reads data from the connection, continuing
116  *   to read partial results until the request is satisfied or
117  *   it errors. TODO: this read should be covered by signal protection.
118  */
119 int read_connection(connection c,
120                     unsigned char *dest,
121                     int len)
122 {
123     int offset = 0,rc;
124
125     if (len) {
126         do {
127 #ifndef __CYGWIN__
128             rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
129 #else
130             rc = recv(c->fd, dest+offset, len-offset, 0);
131 #endif
132             if (rc <= 0) {
133                 if (errno == EINTR) {
134                     rc = 0;
135                 } else {
136                     remove_connection(c);
137                     return (0);
138                 }
139             }
140             offset += rc;
141         } while (offset < len);
142     }
143     return (1);
144 }
145
146 static int connection_input(void *d)
147 {
148         connection c = d;
149         return((*c->m->handler)(c->m->handler_arg,c));
150 }
151
152
153 /* Function:  allocate_connection
154  * Arguments: t:    tcpnal the allocation is occuring in the context of
155  *            dest: portal endpoint address for this connection
156  *            fd:   open file descriptor for the socket
157  * Returns: an allocated connection structure
158  *
159  * just encompasses the action common to active and passive
160  *  connections of allocation and placement in the global table
161  */
162 static connection allocate_connection(manager m,
163                                unsigned int ip,
164                                unsigned short port,
165                                int fd)
166 {
167     connection c=malloc(sizeof(struct connection));
168     unsigned int id[2];
169     c->m=m;
170     c->fd=fd;
171     c->ip=ip;
172     c->port=port;
173     id[0]=ip;
174     id[1]=port;
175     register_io_handler(fd,READ_HANDLER,connection_input,c);
176     hash_table_insert(m->connections,c,id);
177     return(c);
178 }
179
180
181 /* Function:  new_connection
182  * Arguments: t: opaque argument holding the tcpname
183  * Returns: 1 in order to reregister for new connection requests
184  *
185  *  called when the bound service socket recieves
186  *     a new connection request, it always accepts and
187  *     installs a new connection
188  */
189 static int new_connection(void *z)
190 {
191     manager m=z;
192     struct sockaddr_in s;
193     int len=sizeof(struct sockaddr_in);
194     int fd=accept(m->bound,(struct sockaddr *)&s,&len);
195     unsigned int nid=*((unsigned int *)&s.sin_addr);
196     /* cfs specific hack */
197     //unsigned short pid=s.sin_port;
198     pthread_mutex_lock(&m->conn_lock);
199     allocate_connection(m,htonl(nid),0/*pid*/,fd);
200     pthread_mutex_unlock(&m->conn_lock);
201     return(1);
202 }
203
204 /* FIXME assuming little endian, cleanup!! */
205 #define __cpu_to_le64(x) ((__u64)(x))
206 #define __le64_to_cpu(x) ((__u64)(x))
207 #define __cpu_to_le32(x) ((__u32)(x))
208 #define __le32_to_cpu(x) ((__u32)(x))
209 #define __cpu_to_le16(x) ((__u16)(x))
210 #define __le16_to_cpu(x) ((__u16)(x))
211
212 extern ptl_nid_t tcpnal_mynid;
213
214 int
215 tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
216 {
217         int                 rc;
218         ptl_hdr_t           hdr;
219         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
220
221         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
222
223         memset (&hdr, 0, sizeof (hdr));
224         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
225         hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
226         hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
227         
228         hdr.src_nid = __cpu_to_le64 (tcpnal_mynid);
229         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
230
231         hdr.msg.hello.type = __cpu_to_le32 (type);
232         hdr.msg.hello.incarnation = 0;
233
234         /* Assume sufficient socket buffering for this message */
235         rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
236         if (rc <= 0) {
237                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
238                 return (rc);
239         }
240
241         rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
242         if (rc <= 0) {
243                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
244                 return (rc);
245         }
246         
247         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
248                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
249                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
250                 return (-EPROTO);
251         }
252
253         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
254             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
255                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
256                         " from "LPX64"\n",
257                         __le16_to_cpu (hmv->version_major),
258                         __le16_to_cpu (hmv->version_minor),
259                         PORTALS_PROTO_VERSION_MAJOR,
260                         PORTALS_PROTO_VERSION_MINOR,
261                         *nid);
262                 return (-EPROTO);
263         }
264
265 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
266 # error "This code only understands protocol version 0.x"
267 #endif
268         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
269          * so read the rest of it in now... */
270
271         rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
272         if (rc <= 0) {
273                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
274                         rc, *nid);
275                 return (rc);
276         }
277
278         /* ...and check we got what we expected */
279         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
280             hdr.payload_length != __cpu_to_le32 (0)) {
281                 CERROR ("Expecting a HELLO hdr with 0 payload,"
282                         " but got type %d with %d payload from "LPX64"\n",
283                         __le32_to_cpu (hdr.type),
284                         __le32_to_cpu (hdr.payload_length), *nid);
285                 return (-EPROTO);
286         }
287
288         if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
289                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
290                 return (-EPROTO);
291         }
292
293         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
294                 *nid = __le64_to_cpu(hdr.src_nid);
295         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
296                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
297                         __le64_to_cpu (hdr.src_nid), *nid);
298                 return (-EPROTO);
299         }
300
301         return (0);
302 }
303
304 /* Function:  force_tcp_connection
305  * Arguments: t: tcpnal
306  *            dest: portals endpoint for the connection
307  * Returns: an allocated connection structure, either
308  *          a pre-existing one, or a new connection
309  */
310 connection force_tcp_connection(manager m,
311                                 unsigned int ip,
312                                 unsigned short port,
313                                 procbridge pb)
314 {
315     connection conn;
316     struct sockaddr_in addr;
317     unsigned int id[2];
318
319     port = tcpnal_acceptor_port;
320
321     id[0] = ip;
322     id[1] = port;
323
324     pthread_mutex_lock(&m->conn_lock);
325
326     conn = hash_table_find(m->connections, id);
327     if (!conn) {
328         int fd;
329         int option;
330         ptl_nid_t peernid = PTL_NID_ANY;
331
332         bzero((char *) &addr, sizeof(addr));
333         addr.sin_family      = AF_INET;
334         addr.sin_addr.s_addr = htonl(ip);
335         addr.sin_port        = htons(port);
336
337         if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 
338             perror("tcpnal socket failed");
339             exit(-1);
340         }
341         if (connect(fd, (struct sockaddr *)&addr,
342                     sizeof(struct sockaddr_in))) {
343             perror("tcpnal connect");
344             return(0);
345         }
346
347 #if 1
348         option = 1;
349         setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
350         option = 1<<20;
351         setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
352         option = 1<<20;
353         setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
354 #endif
355    
356         /* say hello */
357         if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, 0))
358             exit(-1);
359
360         conn = allocate_connection(m, ip, port, fd);
361
362         /* let nal thread know this event right away */
363         if (conn)
364                 procbridge_wakeup_nal(pb);
365     }
366
367     pthread_mutex_unlock(&m->conn_lock);
368     return (conn);
369 }
370
371
372 /* Function:  bind_socket
373  * Arguments: t: the nal state for this interface
374  *            port: the port to attempt to bind to
375  * Returns: 1 on success, or 0 on error
376  *
377  * bind_socket() attempts to allocate and bind a socket to the requested
378  *  port, or dynamically assign one from the kernel should the port be
379  *  zero. Sets the bound and bound_handler elements of m.
380  *
381  *  TODO: The port should be an explicitly sized type.
382  */
383 static int bind_socket(manager m,unsigned short port)
384 {
385     struct sockaddr_in addr;
386     int alen=sizeof(struct sockaddr_in);
387     
388     if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)  
389         return(0);
390     
391     bzero((char *) &addr, sizeof(addr));
392     addr.sin_family      = AF_INET;
393     addr.sin_addr.s_addr = 0;
394     addr.sin_port        = htons(port);
395
396     if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
397         perror ("tcpnal bind"); 
398         return(0);
399     }
400     
401     getsockname(m->bound,(struct sockaddr *)&addr, &alen);
402
403     m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
404                                          new_connection,m);
405     listen(m->bound,5); 
406     m->port=addr.sin_port;
407     return(1);
408 }
409
410
411 /* Function:  shutdown_connections
412  * Arguments: m: the manager structure
413  *
414  * close all connections and reclaim resources
415  */
416 void shutdown_connections(manager m)
417 {
418     close(m->bound);
419     remove_io_handler(m->bound_handler);
420     hash_destroy_table(m->connections,remove_connection);
421     free(m);
422 }
423
424
425 /* Function:  init_connections
426  * Arguments: t: the nal state for this interface
427  *            port: the port to attempt to bind to
428  * Returns: a newly allocated manager structure, or
429  *          zero if the fixed port could not be bound
430  */
431 manager init_connections(unsigned short pid,
432                          int (*input)(void *, void *),
433                          void *a)
434 {
435     manager m = (manager)malloc(sizeof(struct manager));
436     m->connections = hash_create_table(compare_connection,connection_key);
437     m->handler = input;
438     m->handler_arg = a;
439     pthread_mutex_init(&m->conn_lock, 0);
440
441     if (bind_socket(m,pid))
442         return(m);
443
444     free(m);
445     return(0);
446 }