Whamcloud - gitweb
invoking section 3 of the GNU LGPL, to instead apply the terms of the GPL
[fs/lustre-release.git] / lnet / ulnds / connection.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /* connection.c:
23    This file provides a simple stateful connection manager which
24    builds tcp connections on demand and leaves them open for
25    future use. It also provides the machinery to allow peers
26    to connect to it
27 */
28
29 #include <stdlib.h>
30 #include <pqtimer.h>
31 #include <dispatch.h>
32 #include <table.h>
33 #include <stdio.h>
34 #include <stdarg.h>
35 #include <string.h>
36 #include <unistd.h>
37 #include <syscall.h>
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <connection.h>
42 #include <errno.h>
43
44
45 /* global variable: acceptor port */
46 unsigned short tcpnal_acceptor_port = 988;
47
48
49 /* Function:  compare_connection
50  * Arguments: connection c:      a connection in the hash table
51  *            ptl_process_id_t:  an id to verify  agains
52  * Returns: 1 if the connection is the one requested, 0 otherwise
53  *
54  *    compare_connection() tests for collisions in the hash table
55  */
56 static int compare_connection(void *arg1, void *arg2)
57 {
58         connection c = arg1;
59         unsigned int * id = arg2;
60         return((c->ip==id[0]) && (c->port==id[1]));
61 }
62
63
64 /* Function:  connection_key
65  * Arguments: ptl_process_id_t id:  an id to hash
66  * Returns: a not-particularily-well-distributed hash
67  *          of the id
68  */
69 static unsigned int connection_key(unsigned int *id)
70 {
71     return(id[0]^id[1]);
72 }
73
74
75 /* Function:  remove_connection
76  * Arguments: c: the connection to remove
77  */
78 void remove_connection(void *arg)
79 {
80         connection c = arg;
81         unsigned int id[2];
82         
83         id[0]=c->ip;
84         id[1]=c->port;
85         hash_table_remove(c->m->connections,id);
86         close(c->fd);
87         free(c);
88 }
89
90
91 /* Function:  read_connection: 
92  * Arguments: c:    the connection to read from 
93  *            dest: the buffer to read into
94  *            len:  the number of bytes to read   
95  * Returns: success as 1, or failure as 0
96  *
97  *   read_connection() reads data from the connection, continuing
98  *   to read partial results until the request is satisfied or
99  *   it errors. TODO: this read should be covered by signal protection.
100  */
101 int read_connection(connection c,
102                     unsigned char *dest,
103                     int len)
104 {
105     int offset=0,rc;
106
107     if (len){
108         do {
109             if((rc=syscall(SYS_read, c->fd, dest+offset, len-offset))<=0){
110                 if (errno==EINTR) {
111                     rc=0;
112                 } else {
113                     remove_connection(c);
114                     return(0);
115                 }
116             }
117             offset+=rc;
118         } while (offset<len);
119     }
120     return(1);
121 }
122
123 static int connection_input(void *d)
124 {
125         connection c = d;
126         return((*c->m->handler)(c->m->handler_arg,c));
127 }
128
129
130 /* Function:  allocate_connection
131  * Arguments: t:    tcpnal the allocation is occuring in the context of
132  *            dest: portal endpoint address for this connection
133  *            fd:   open file descriptor for the socket
134  * Returns: an allocated connection structure
135  *
136  * just encompasses the action common to active and passive
137  *  connections of allocation and placement in the global table
138  */
139 static connection allocate_connection(manager m,
140                                unsigned int ip,
141                                unsigned short port,
142                                int fd)
143 {
144     connection c=malloc(sizeof(struct connection));
145     unsigned int id[2];
146     c->m=m;
147     c->fd=fd;
148     c->ip=ip;
149     c->port=port;
150     id[0]=ip;
151     id[1]=port;
152     register_io_handler(fd,READ_HANDLER,connection_input,c);
153     hash_table_insert(m->connections,c,id);
154     return(c);
155 }
156
157
158 /* Function:  new_connection
159  * Arguments: t: opaque argument holding the tcpname
160  * Returns: 1 in order to reregister for new connection requests
161  *
162  *  called when the bound service socket recieves
163  *     a new connection request, it always accepts and
164  *     installs a new connection
165  */
166 static int new_connection(void *z)
167 {
168     manager m=z;
169     struct sockaddr_in s;
170     int len=sizeof(struct sockaddr_in);
171     int fd=accept(m->bound,(struct sockaddr *)&s,&len);
172     unsigned int nid=*((unsigned int *)&s.sin_addr);
173     /* cfs specific hack */
174     //unsigned short pid=s.sin_port;
175     allocate_connection(m,htonl(nid),0/*pid*/,fd);
176     return(1);
177 }
178
179
180 /* Function:  force_tcp_connection
181  * Arguments: t: tcpnal
182  *            dest: portals endpoint for the connection
183  * Returns: an allocated connection structure, either
184  *          a pre-existing one, or a new connection
185  */
186 connection force_tcp_connection(manager m,
187                                 unsigned int ip,
188                                 unsigned short port)
189 {
190     connection c;
191     struct sockaddr_in addr;
192     unsigned int id[2];
193
194     port = tcpnal_acceptor_port;
195
196     id[0]=ip;
197     id[1]=port;
198
199     if (!(c=hash_table_find(m->connections,id))){
200         int fd;
201
202         bzero((char *) &addr, sizeof(addr));
203         addr.sin_family      = AF_INET;
204         addr.sin_addr.s_addr = htonl(ip);
205         addr.sin_port        = htons(port);
206
207         if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 
208             perror("tcpnal socket failed");
209             exit(-1);
210         }
211         if (connect(fd,
212                     (struct sockaddr *)&addr,
213                     sizeof(struct sockaddr_in)))
214             {
215                 perror("tcpnal connect");
216                 return(0);
217             }
218         return(allocate_connection(m,ip,port,fd));
219     }
220     return(c);
221 }
222
223
224 /* Function:  bind_socket
225  * Arguments: t: the nal state for this interface
226  *            port: the port to attempt to bind to
227  * Returns: 1 on success, or 0 on error
228  *
229  * bind_socket() attempts to allocate and bind a socket to the requested
230  *  port, or dynamically assign one from the kernel should the port be
231  *  zero. Sets the bound and bound_handler elements of m.
232  *
233  *  TODO: The port should be an explicitly sized type.
234  */
235 static int bind_socket(manager m,unsigned short port)
236 {
237     struct sockaddr_in addr;
238     int alen=sizeof(struct sockaddr_in);
239     
240     if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)  
241         return(0);
242     
243     bzero((char *) &addr, sizeof(addr));
244     addr.sin_family      = AF_INET;
245     addr.sin_addr.s_addr = 0;
246     addr.sin_port        = port; 
247     
248     if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
249         perror ("tcpnal bind"); 
250         return(0);
251     }
252     
253     getsockname(m->bound,(struct sockaddr *)&addr, &alen);
254
255     m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
256                                          new_connection,m);
257     listen(m->bound,5); 
258     m->port=addr.sin_port;
259     return(1);
260 }
261
262
263 /* Function:  shutdown_connections
264  * Arguments: m: the manager structure
265  *
266  * close all connections and reclaim resources
267  */
268 void shutdown_connections(manager m)
269 {
270     close(m->bound);
271     remove_io_handler(m->bound_handler);
272     hash_destroy_table(m->connections,remove_connection);
273     free(m);
274 }
275
276
277 /* Function:  init_connections
278  * Arguments: t: the nal state for this interface
279  *            port: the port to attempt to bind to
280  * Returns: a newly allocated manager structure, or
281  *          zero if the fixed port could not be bound
282  */
283 manager init_connections(unsigned short pid,
284                          int (*input)(void *, void *),
285                          void *a)
286 {
287     manager m=(manager)malloc(sizeof(struct manager));
288     m->connections=hash_create_table(compare_connection,connection_key);
289     m->handler=input;
290     m->handler_arg=a;
291     if (bind_socket(m,pid)) return(m);
292     free(m);
293     return(0);
294 }