Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lnet / ulnds / socklnd / connection.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /* connection.c:
23    This file provides a simple stateful connection manager which
24    builds tcp connections on demand and leaves them open for
25    future use. It also provides the machinery to allow peers
26    to connect to it
27 */
28
29 #include <stdlib.h>
30 #include <pqtimer.h>
31 #include <dispatch.h>
32 #include <table.h>
33 #include <stdio.h>
34 #include <stdarg.h>
35 #include <string.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <netinet/tcp.h>
41 #include <portals/types.h>
42 #include <portals/list.h>
43 #include <portals/lib-types.h>
44 #include <portals/socknal.h>
45 #include <linux/kp30.h>
46 #include <connection.h>
47 #include <pthread.h>
48 #include <errno.h>
49 #ifndef __CYGWIN__
50 #include <syscall.h>
51 #endif
52
53 /* global variable: acceptor port */
54 unsigned short tcpnal_acceptor_port = 988;
55
56
57 /* Function:  compare_connection
58  * Arguments: connection c:      a connection in the hash table
59  *            ptl_process_id_t:  an id to verify  agains
60  * Returns: 1 if the connection is the one requested, 0 otherwise
61  *
62  *    compare_connection() tests for collisions in the hash table
63  */
64 static int compare_connection(void *arg1, void *arg2)
65 {
66     connection c = arg1;
67     unsigned int * id = arg2;
68 #if 0
69     return((c->ip==id[0]) && (c->port==id[1]));
70 #else
71     /* CFS specific hacking */
72     return (c->ip == id[0]);
73 #endif
74 }
75
76
77 /* Function:  connection_key
78  * Arguments: ptl_process_id_t id:  an id to hash
79  * Returns: a not-particularily-well-distributed hash
80  *          of the id
81  */
82 static unsigned int connection_key(unsigned int *id)
83 {
84 #if 0
85     return(id[0]^id[1]);
86 #else
87     /* CFS specific hacking */
88     return (unsigned int) id[0];
89 #endif
90 }
91
92
93 /* Function:  remove_connection
94  * Arguments: c: the connection to remove
95  */
96 void remove_connection(void *arg)
97 {
98         connection c = arg;
99         unsigned int id[2];
100         
101         id[0]=c->ip;
102         id[1]=c->port;
103         hash_table_remove(c->m->connections,id);
104         close(c->fd);
105         free(c);
106 }
107
108
109 /* Function:  read_connection: 
110  * Arguments: c:    the connection to read from 
111  *            dest: the buffer to read into
112  *            len:  the number of bytes to read   
113  * Returns: success as 1, or failure as 0
114  *
115  *   read_connection() reads data from the connection, continuing
116  *   to read partial results until the request is satisfied or
117  *   it errors. TODO: this read should be covered by signal protection.
118  */
119 int read_connection(connection c,
120                     unsigned char *dest,
121                     int len)
122 {
123     int offset = 0,rc;
124
125     if (len) {
126         do {
127 #ifndef __CYGWIN__
128             rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
129 #else
130             rc = recv(c->fd, dest+offset, len-offset, 0);
131 #endif
132             if (rc <= 0) {
133                 if (errno == EINTR) {
134                     rc = 0;
135                 } else {
136                     remove_connection(c);
137                     return (0);
138                 }
139             }
140             offset += rc;
141         } while (offset < len);
142     }
143     return (1);
144 }
145
146 static int connection_input(void *d)
147 {
148         connection c = d;
149         return((*c->m->handler)(c->m->handler_arg,c));
150 }
151
152
153 /* Function:  allocate_connection
154  * Arguments: t:    tcpnal the allocation is occuring in the context of
155  *            dest: portal endpoint address for this connection
156  *            fd:   open file descriptor for the socket
157  * Returns: an allocated connection structure
158  *
159  * just encompasses the action common to active and passive
160  *  connections of allocation and placement in the global table
161  */
162 static connection allocate_connection(manager m,
163                                unsigned int ip,
164                                unsigned short port,
165                                int fd)
166 {
167     connection c=malloc(sizeof(struct connection));
168     unsigned int id[2];
169     c->m=m;
170     c->fd=fd;
171     c->ip=ip;
172     c->port=port;
173     id[0]=ip;
174     id[1]=port;
175     register_io_handler(fd,READ_HANDLER,connection_input,c);
176     hash_table_insert(m->connections,c,id);
177     return(c);
178 }
179
180
181 /* Function:  new_connection
182  * Arguments: t: opaque argument holding the tcpname
183  * Returns: 1 in order to reregister for new connection requests
184  *
185  *  called when the bound service socket recieves
186  *     a new connection request, it always accepts and
187  *     installs a new connection
188  */
189 static int new_connection(void *z)
190 {
191     manager m=z;
192     struct sockaddr_in s;
193     int len=sizeof(struct sockaddr_in);
194     int fd=accept(m->bound,(struct sockaddr *)&s,&len);
195     unsigned int nid=*((unsigned int *)&s.sin_addr);
196     /* cfs specific hack */
197     //unsigned short pid=s.sin_port;
198     pthread_mutex_lock(&m->conn_lock);
199     allocate_connection(m,htonl(nid),0/*pid*/,fd);
200     pthread_mutex_unlock(&m->conn_lock);
201     return(1);
202 }
203
204 /* FIXME assuming little endian, cleanup!! */
205 #define __cpu_to_le64(x) ((__u64)(x))
206 #define __le64_to_cpu(x) ((__u64)(x))
207 #define __cpu_to_le32(x) ((__u32)(x))
208 #define __le32_to_cpu(x) ((__u32)(x))
209 #define __cpu_to_le16(x) ((__u16)(x))
210 #define __le16_to_cpu(x) ((__u16)(x))
211
212 extern ptl_nid_t tcpnal_mynid;
213
214 int
215 tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
216 {
217         int                 rc;
218         ptl_hdr_t           hdr;
219         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
220
221         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
222
223         memset (&hdr, 0, sizeof (hdr));
224         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
225         hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
226         hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
227         
228         hdr.src_nid = __cpu_to_le64 (tcpnal_mynid);
229         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
230
231         hdr.msg.hello.type = __cpu_to_le32 (type);
232         hdr.msg.hello.incarnation = __cpu_to_le64(incarnation);
233
234         /* Assume sufficient socket buffering for this message */
235         rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
236         if (rc <= 0) {
237                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
238                 return (rc);
239         }
240
241         rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
242         if (rc <= 0) {
243                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
244                 return (rc);
245         }
246         
247         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
248                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
249                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
250                 return (-EPROTO);
251         }
252
253         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
254             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
255                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
256                         " from "LPX64"\n",
257                         __le16_to_cpu (hmv->version_major),
258                         __le16_to_cpu (hmv->version_minor),
259                         PORTALS_PROTO_VERSION_MAJOR,
260                         PORTALS_PROTO_VERSION_MINOR,
261                         *nid);
262                 return (-EPROTO);
263         }
264
265 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
266 # error "This code only understands protocol version 0.x"
267 #endif
268         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
269          * so read the rest of it in now... */
270
271         rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
272         if (rc <= 0) {
273                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
274                         rc, *nid);
275                 return (rc);
276         }
277
278         /* ...and check we got what we expected */
279         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
280             hdr.payload_length != __cpu_to_le32 (0)) {
281                 CERROR ("Expecting a HELLO hdr with 0 payload,"
282                         " but got type %d with %d payload from "LPX64"\n",
283                         __le32_to_cpu (hdr.type),
284                         __le32_to_cpu (hdr.payload_length), *nid);
285                 return (-EPROTO);
286         }
287
288         if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
289                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
290                 return (-EPROTO);
291         }
292
293         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
294                 *nid = __le64_to_cpu(hdr.src_nid);
295         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
296                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
297                         __le64_to_cpu (hdr.src_nid), *nid);
298                 return (-EPROTO);
299         }
300
301         return (0);
302 }
303
304 /* Function:  force_tcp_connection
305  * Arguments: t: tcpnal
306  *            dest: portals endpoint for the connection
307  * Returns: an allocated connection structure, either
308  *          a pre-existing one, or a new connection
309  */
310 connection force_tcp_connection(manager m,
311                                 unsigned int ip,
312                                 unsigned short port,
313                                 procbridge pb)
314 {
315     connection conn;
316     struct sockaddr_in addr;
317     unsigned int id[2];
318     struct timeval tv;
319     __u64 incarnation;
320
321     port = tcpnal_acceptor_port;
322
323     id[0] = ip;
324     id[1] = port;
325
326     pthread_mutex_lock(&m->conn_lock);
327
328     conn = hash_table_find(m->connections, id);
329     if (!conn) {
330         int fd;
331         int option;
332         ptl_nid_t peernid = PTL_NID_ANY;
333
334         bzero((char *) &addr, sizeof(addr));
335         addr.sin_family      = AF_INET;
336         addr.sin_addr.s_addr = htonl(ip);
337         addr.sin_port        = htons(port);
338
339         if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 
340             perror("tcpnal socket failed");
341             exit(-1);
342         }
343         if (connect(fd, (struct sockaddr *)&addr,
344                     sizeof(struct sockaddr_in))) {
345             perror("tcpnal connect");
346             return(0);
347         }
348
349 #if 1
350         option = 1;
351         setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
352         option = 1<<20;
353         setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
354         option = 1<<20;
355         setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
356 #endif
357    
358         gettimeofday(&tv, NULL);
359         incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
360
361         /* say hello */
362         if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, incarnation))
363             exit(-1);
364
365         conn = allocate_connection(m, ip, port, fd);
366
367         /* let nal thread know this event right away */
368         if (conn)
369                 procbridge_wakeup_nal(pb);
370     }
371
372     pthread_mutex_unlock(&m->conn_lock);
373     return (conn);
374 }
375
376
377 /* Function:  bind_socket
378  * Arguments: t: the nal state for this interface
379  *            port: the port to attempt to bind to
380  * Returns: 1 on success, or 0 on error
381  *
382  * bind_socket() attempts to allocate and bind a socket to the requested
383  *  port, or dynamically assign one from the kernel should the port be
384  *  zero. Sets the bound and bound_handler elements of m.
385  *
386  *  TODO: The port should be an explicitly sized type.
387  */
388 static int bind_socket(manager m,unsigned short port)
389 {
390     struct sockaddr_in addr;
391     int alen=sizeof(struct sockaddr_in);
392     
393     if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)  
394         return(0);
395     
396     bzero((char *) &addr, sizeof(addr));
397     addr.sin_family      = AF_INET;
398     addr.sin_addr.s_addr = 0;
399     addr.sin_port        = htons(port);
400
401     if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
402         perror ("tcpnal bind"); 
403         return(0);
404     }
405     
406     getsockname(m->bound,(struct sockaddr *)&addr, &alen);
407
408     m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
409                                          new_connection,m);
410     listen(m->bound,5); 
411     m->port=addr.sin_port;
412     return(1);
413 }
414
415
416 /* Function:  shutdown_connections
417  * Arguments: m: the manager structure
418  *
419  * close all connections and reclaim resources
420  */
421 void shutdown_connections(manager m)
422 {
423     close(m->bound);
424     remove_io_handler(m->bound_handler);
425     hash_destroy_table(m->connections,remove_connection);
426     free(m);
427 }
428
429
430 /* Function:  init_connections
431  * Arguments: t: the nal state for this interface
432  *            port: the port to attempt to bind to
433  * Returns: a newly allocated manager structure, or
434  *          zero if the fixed port could not be bound
435  */
436 manager init_connections(unsigned short pid,
437                          int (*input)(void *, void *),
438                          void *a)
439 {
440     manager m = (manager)malloc(sizeof(struct manager));
441     m->connections = hash_create_table(compare_connection,connection_key);
442     m->handler = input;
443     m->handler_arg = a;
444     pthread_mutex_init(&m->conn_lock, 0);
445
446     if (bind_socket(m,pid))
447         return(m);
448
449     free(m);
450     return(0);
451 }