Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lnet / ulnds / socklnd / connection.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /* connection.c:
23    This file provides a simple stateful connection manager which
24    builds tcp connections on demand and leaves them open for
25    future use. It also provides the machinery to allow peers
26    to connect to it
27 */
28
29 #include <stdlib.h>
30 #include <pqtimer.h>
31 #include <dispatch.h>
32 #include <table.h>
33 #include <stdio.h>
34 #include <stdarg.h>
35 #include <string.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <sys/socket.h>
39 #include <netinet/in.h>
40 #include <netinet/tcp.h>
41 #include <portals/types.h>
42 #include <portals/list.h>
43 #include <portals/lib-types.h>
44 #include <portals/socknal.h>
45 #include <linux/kp30.h>
46 #include <connection.h>
47 #include <pthread.h>
48 #include <errno.h>
49 #ifndef __CYGWIN__
50 #include <syscall.h>
51 #endif
52
53 /* global variable: acceptor port */
54 unsigned short tcpnal_acceptor_port = 988;
55
56
57 /* Function:  compare_connection
58  * Arguments: connection c:      a connection in the hash table
59  *            ptl_process_id_t:  an id to verify  agains
60  * Returns: 1 if the connection is the one requested, 0 otherwise
61  *
62  *    compare_connection() tests for collisions in the hash table
63  */
64 static int compare_connection(void *arg1, void *arg2)
65 {
66     connection c = arg1;
67     unsigned int * id = arg2;
68 #if 0
69     return((c->ip==id[0]) && (c->port==id[1]));
70 #else
71     /* CFS specific hacking */
72     return (c->ip == id[0]);
73 #endif
74 }
75
76
77 /* Function:  connection_key
78  * Arguments: ptl_process_id_t id:  an id to hash
79  * Returns: a not-particularily-well-distributed hash
80  *          of the id
81  */
82 static unsigned int connection_key(unsigned int *id)
83 {
84 #if 0
85     return(id[0]^id[1]);
86 #else
87     /* CFS specific hacking */
88     return (unsigned int) id[0];
89 #endif
90 }
91
92
93 /* Function:  remove_connection
94  * Arguments: c: the connection to remove
95  */
96 void remove_connection(void *arg)
97 {
98         connection c = arg;
99         unsigned int id[2];
100         
101         id[0]=c->ip;
102         id[1]=c->port;
103         hash_table_remove(c->m->connections,id);
104         close(c->fd);
105         free(c);
106 }
107
108
109 /* Function:  read_connection: 
110  * Arguments: c:    the connection to read from 
111  *            dest: the buffer to read into
112  *            len:  the number of bytes to read   
113  * Returns: success as 1, or failure as 0
114  *
115  *   read_connection() reads data from the connection, continuing
116  *   to read partial results until the request is satisfied or
117  *   it errors. TODO: this read should be covered by signal protection.
118  */
119 int read_connection(connection c,
120                     unsigned char *dest,
121                     int len)
122 {
123     int offset = 0,rc;
124
125     if (len) {
126         do {
127 #ifndef __CYGWIN__
128             rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
129 #else
130             rc = recv(c->fd, dest+offset, len-offset, 0);
131 #endif
132             if (rc <= 0) {
133                 if (errno == EINTR) {
134                     rc = 0;
135                 } else {
136                     remove_connection(c);
137                     return (0);
138                 }
139             }
140             offset += rc;
141         } while (offset < len);
142     }
143     return (1);
144 }
145
146 static int connection_input(void *d)
147 {
148         connection c = d;
149         return((*c->m->handler)(c->m->handler_arg,c));
150 }
151
152
153 /* Function:  allocate_connection
154  * Arguments: t:    tcpnal the allocation is occuring in the context of
155  *            dest: portal endpoint address for this connection
156  *            fd:   open file descriptor for the socket
157  * Returns: an allocated connection structure
158  *
159  * just encompasses the action common to active and passive
160  *  connections of allocation and placement in the global table
161  */
162 static connection allocate_connection(manager m,
163                                unsigned int ip,
164                                unsigned short port,
165                                int fd)
166 {
167     connection c=malloc(sizeof(struct connection));
168     unsigned int id[2];
169     c->m=m;
170     c->fd=fd;
171     c->ip=ip;
172     c->port=port;
173     id[0]=ip;
174     id[1]=port;
175     register_io_handler(fd,READ_HANDLER,connection_input,c);
176     hash_table_insert(m->connections,c,id);
177     return(c);
178 }
179
180
181 /* Function:  new_connection
182  * Arguments: t: opaque argument holding the tcpname
183  * Returns: 1 in order to reregister for new connection requests
184  *
185  *  called when the bound service socket recieves
186  *     a new connection request, it always accepts and
187  *     installs a new connection
188  */
189 static int new_connection(void *z)
190 {
191     manager m=z;
192     struct sockaddr_in s;
193     int len=sizeof(struct sockaddr_in);
194     int fd=accept(m->bound,(struct sockaddr *)&s,&len);
195     unsigned int nid=*((unsigned int *)&s.sin_addr);
196     /* cfs specific hack */
197     //unsigned short pid=s.sin_port;
198     pthread_mutex_lock(&m->conn_lock);
199     allocate_connection(m,htonl(nid),0/*pid*/,fd);
200     pthread_mutex_unlock(&m->conn_lock);
201     return(1);
202 }
203
204 extern ptl_nid_t tcpnal_mynid;
205
206 int
207 tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
208 {
209         int                 rc;
210         int                 nob;
211         ptl_hdr_t           hdr;
212         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
213
214         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
215
216         memset (&hdr, 0, sizeof (hdr));
217         hmv->magic         = cpu_to_le32(PORTALS_PROTO_MAGIC);
218         hmv->version_major = cpu_to_le32(PORTALS_PROTO_VERSION_MAJOR);
219         hmv->version_minor = cpu_to_le32(PORTALS_PROTO_VERSION_MINOR);
220         
221         hdr.src_nid = cpu_to_le64(tcpnal_mynid);
222         hdr.type    = cpu_to_le32(PTL_MSG_HELLO);
223
224         hdr.msg.hello.type = cpu_to_le32(type);
225         hdr.msg.hello.incarnation = cpu_to_le64(incarnation);
226
227         /* I don't send any interface info */
228
229         /* Assume sufficient socket buffering for this message */
230         rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
231         if (rc <= 0) {
232                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
233                 return (rc);
234         }
235
236         rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
237         if (rc <= 0) {
238                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
239                 return (rc);
240         }
241         
242         if (hmv->magic != le32_to_cpu(PORTALS_PROTO_MAGIC)) {
243                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
244                         cpu_to_le32(hmv->magic), PORTALS_PROTO_MAGIC, *nid);
245                 return (-EPROTO);
246         }
247
248         if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
249             hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
250                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
251                         " from "LPX64"\n",
252                         le16_to_cpu (hmv->version_major),
253                         le16_to_cpu (hmv->version_minor),
254                         PORTALS_PROTO_VERSION_MAJOR,
255                         PORTALS_PROTO_VERSION_MINOR,
256                         *nid);
257                 return (-EPROTO);
258         }
259
260 #if (PORTALS_PROTO_VERSION_MAJOR != 1)
261 # error "This code only understands protocol version 1.x"
262 #endif
263         /* version 1 sends magic/version as the dest_nid of a 'hello' header,
264          * so read the rest of it in now... */
265
266         rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
267         if (rc <= 0) {
268                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
269                         rc, *nid);
270                 return (rc);
271         }
272
273         /* ...and check we got what we expected */
274         if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
275                 CERROR ("Expecting a HELLO hdr "
276                         " but got type %d with %d payload from "LPX64"\n",
277                         le32_to_cpu (hdr.type),
278                         le32_to_cpu (hdr.payload_length), *nid);
279                 return (-EPROTO);
280         }
281
282         if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
283                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
284                 return (-EPROTO);
285         }
286
287         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
288                 *nid = le64_to_cpu(hdr.src_nid);
289         } else if (*nid != le64_to_cpu (hdr.src_nid)) {
290                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
291                         le64_to_cpu (hdr.src_nid), *nid);
292                 return (-EPROTO);
293         }
294
295         /* Ignore any interface info in the payload */
296         nob = le32_to_cpu(hdr.payload_length);
297         if (nob > getpagesize()) {
298                 CERROR("Unexpected HELLO payload %d from "LPX64"\n",
299                        nob, *nid);
300                 return (-EPROTO);
301         }
302         if (nob > 0) {
303                 char *space = (char *)malloc(nob);
304                 
305                 if (space == NULL) {
306                         CERROR("Can't allocate scratch buffer %d\n", nob);
307                         return (-ENOMEM);
308                 }
309                 
310                 rc = syscall(SYS_read, sockfd, space, nob);
311                 if (rc <= 0) {
312                         CERROR("Error %d skipping HELLO payload from "
313                                LPX64"\n", rc, *nid);
314                         return (rc);
315                 }
316         }
317
318         return (0);
319 }
320
321 /* Function:  force_tcp_connection
322  * Arguments: t: tcpnal
323  *            dest: portals endpoint for the connection
324  * Returns: an allocated connection structure, either
325  *          a pre-existing one, or a new connection
326  */
327 connection force_tcp_connection(manager m,
328                                 unsigned int ip,
329                                 unsigned short port,
330                                 procbridge pb)
331 {
332     connection conn;
333     struct sockaddr_in addr;
334     unsigned int id[2];
335     struct timeval tv;
336     __u64 incarnation;
337
338     port = tcpnal_acceptor_port;
339
340     id[0] = ip;
341     id[1] = port;
342
343     pthread_mutex_lock(&m->conn_lock);
344
345     conn = hash_table_find(m->connections, id);
346     if (!conn) {
347         int fd;
348         int option;
349         ptl_nid_t peernid = PTL_NID_ANY;
350
351         bzero((char *) &addr, sizeof(addr));
352         addr.sin_family      = AF_INET;
353         addr.sin_addr.s_addr = htonl(ip);
354         addr.sin_port        = htons(port);
355
356         if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 
357             perror("tcpnal socket failed");
358             exit(-1);
359         }
360         if (connect(fd, (struct sockaddr *)&addr,
361                     sizeof(struct sockaddr_in))) {
362             perror("tcpnal connect");
363             return(0);
364         }
365
366 #if 1
367         option = 1;
368         setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
369         option = 1<<20;
370         setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
371         option = 1<<20;
372         setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
373 #endif
374    
375         gettimeofday(&tv, NULL);
376         incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
377
378         /* say hello */
379         if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, incarnation))
380             exit(-1);
381
382         conn = allocate_connection(m, ip, port, fd);
383
384         /* let nal thread know this event right away */
385         if (conn)
386                 procbridge_wakeup_nal(pb);
387     }
388
389     pthread_mutex_unlock(&m->conn_lock);
390     return (conn);
391 }
392
393
394 /* Function:  bind_socket
395  * Arguments: t: the nal state for this interface
396  *            port: the port to attempt to bind to
397  * Returns: 1 on success, or 0 on error
398  *
399  * bind_socket() attempts to allocate and bind a socket to the requested
400  *  port, or dynamically assign one from the kernel should the port be
401  *  zero. Sets the bound and bound_handler elements of m.
402  *
403  *  TODO: The port should be an explicitly sized type.
404  */
405 static int bind_socket(manager m,unsigned short port)
406 {
407     struct sockaddr_in addr;
408     int alen=sizeof(struct sockaddr_in);
409     
410     if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)  
411         return(0);
412     
413     bzero((char *) &addr, sizeof(addr));
414     addr.sin_family      = AF_INET;
415     addr.sin_addr.s_addr = 0;
416     addr.sin_port        = htons(port);
417
418     if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
419         perror ("tcpnal bind"); 
420         return(0);
421     }
422     
423     getsockname(m->bound,(struct sockaddr *)&addr, &alen);
424
425     m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
426                                          new_connection,m);
427     listen(m->bound,5); 
428     m->port=addr.sin_port;
429     return(1);
430 }
431
432
433 /* Function:  shutdown_connections
434  * Arguments: m: the manager structure
435  *
436  * close all connections and reclaim resources
437  */
438 void shutdown_connections(manager m)
439 {
440     close(m->bound);
441     remove_io_handler(m->bound_handler);
442     hash_destroy_table(m->connections,remove_connection);
443     free(m);
444 }
445
446
447 /* Function:  init_connections
448  * Arguments: t: the nal state for this interface
449  *            port: the port to attempt to bind to
450  * Returns: a newly allocated manager structure, or
451  *          zero if the fixed port could not be bound
452  */
453 manager init_connections(unsigned short pid,
454                          int (*input)(void *, void *),
455                          void *a)
456 {
457     manager m = (manager)malloc(sizeof(struct manager));
458     m->connections = hash_create_table(compare_connection,connection_key);
459     m->handler = input;
460     m->handler_arg = a;
461     pthread_mutex_init(&m->conn_lock, 0);
462
463     if (bind_socket(m,pid))
464         return(m);
465
466     free(m);
467     return(0);
468 }