Whamcloud - gitweb
9a88146b5e248056ee99fb79828e944076e36fbc
[fs/lustre-release.git] / lnet / ulnds / socklnd / select.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *  Copyright (c) 2002 Eric Hoffman
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 /* select.c:
24  *  Provides a general mechanism for registering and dispatching
25  *  io events through the select system call.
26  */
27
28 #define DEBUG_SUBSYSTEM S_LND
29
30 #ifdef sun
31 #include <sys/filio.h>
32 #else
33 #include <sys/ioctl.h>
34 #endif
35
36 #include <sys/time.h>
37 #include <sys/types.h>
38 #include <stdlib.h>
39 #include <syscall.h>
40 #include <pthread.h>
41 #include <errno.h>
42 #include <pqtimer.h>
43 #include <dispatch.h>
44 #include <procbridge.h>
45
46
47 static struct timeval beginning_of_epoch;
48 static io_handler io_handlers;
49
50 /* Function: now
51  *
52  * Return: the current time in canonical units: a 64 bit number
53  *   where the most significant 32 bits contains the number
54  *   of seconds, and the least signficant a count of (1/(2^32))ths
55  *   of a second.
56  */
57 when now()
58 {
59     struct timeval result;
60
61     gettimeofday(&result,0);
62     return((((unsigned long long)result.tv_sec)<<32)|
63            (((unsigned long long)result.tv_usec)<<32)/1000000);
64 }
65
66
67 /* Function: register_io_handler
68  * Arguments: fd: the file descriptor of interest
69  *            type: a mask of READ_HANDLER, WRITE_HANDLER, EXCEPTION_HANDLER
70  *            function: a function to call when io is available on fd
71  *            arg: an opaque correlator to return to the handler
72  * Returns: a pointer to the io_handler structure
73  */
74 io_handler register_io_handler(int fd,
75                                int type,
76                                int (*function)(void *),
77                                void *arg)
78 {
79     io_handler i=(io_handler)malloc(sizeof(struct io_handler));
80     if ((i->fd=fd)>=0){
81         i->type=type;
82         i->function=function;
83         i->argument=arg;
84         i->disabled=0;
85         i->last=&io_handlers;
86         if ((i->next=io_handlers)) i->next->last=&i->next;
87         io_handlers=i;
88     }
89     return(i);
90 }
91
92 /* Function: remove_io_handler
93  * Arguments: i: a pointer to the handler to stop servicing
94  *
95  * remove_io_handler() doesn't actually free the handler, due
96  * to reentrancy problems. it just marks the handler for
97  * later cleanup by the blocking function.
98  */
99 void remove_io_handler (io_handler i)
100 {
101     i->disabled=1;
102 }
103
104 static void set_flag(io_handler n,fd_set *r, fd_set *w, fd_set *e)
105 {
106     if (n->type & READ_HANDLER) FD_SET(n->fd, r);
107     if (n->type & WRITE_HANDLER) FD_SET(n->fd, w);
108     if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, e);
109 }
110
111 static int prepare_fd_sets(fd_set *r, fd_set *w, fd_set *e)
112 {
113     io_handler j;
114     io_handler *k;
115     int max = 0;
116
117     FD_ZERO(r);
118     FD_ZERO(w);
119     FD_ZERO(e);
120     for (k=&io_handlers;*k;){
121         if ((*k)->disabled){
122             j=*k;
123             *k=(*k)->next;
124             free(j);
125         }
126         if (*k) {
127             set_flag(*k,r,w,e);
128             if ((*k)->fd > max)
129                 max = (*k)->fd;
130             k=&(*k)->next;
131         }
132     }
133     return max + 1;
134 }
135
136 static int execute_callbacks(fd_set *r, fd_set *w, fd_set *e)
137 {
138     io_handler j;
139     int n = 0, t;
140
141     for (j = io_handlers; j; j = j->next) {
142         if (j->disabled)
143             continue;
144
145         t = 0;
146         if (FD_ISSET(j->fd, r) && (j->type & READ_HANDLER)) {
147             FD_CLR(j->fd, r);
148             t++;
149         }
150         if (FD_ISSET(j->fd, w) && (j->type & WRITE_HANDLER)) {
151             FD_CLR(j->fd, w);
152             t++;
153         }
154         if (FD_ISSET(j->fd, e) && (j->type & EXCEPTION_HANDLER)) {
155             FD_CLR(j->fd, e);
156             t++;
157         }
158         if (t == 0)
159             continue;
160
161         if (!(*j->function)(j->argument))
162             j->disabled = 1;
163
164         n += t;
165     }
166
167     return n;
168 }
169
170 #ifdef ENABLE_SELECT_DISPATCH
171
172 static struct {
173     pthread_mutex_t mutex;
174     pthread_cond_t  cond;
175     int             submitted;
176     int             nready;
177     int             maxfd;
178     fd_set         *rset;
179     fd_set         *wset;
180     fd_set         *eset;
181     struct timeval *timeout;
182     struct timeval  submit_time;
183 } fd_extra = {
184     PTHREAD_MUTEX_INITIALIZER,
185     PTHREAD_COND_INITIALIZER,
186     0, 0, 0,
187     NULL, NULL, NULL, NULL,
188 };
189
190 extern int liblustre_wait_event(int timeout);
191 extern procbridge __global_procbridge;
192
193 /*
194  * this will intercept syscall select() of user apps
195  * such as MPI libs.
196  */
197 int select(int n, fd_set *rset, fd_set *wset, fd_set *eset,
198            struct timeval *timeout)
199 {
200     LASSERT(fd_extra.submitted == 0);
201
202     fd_extra.nready = 0;
203     fd_extra.maxfd = n;
204     fd_extra.rset = rset;
205     fd_extra.wset = wset;
206     fd_extra.eset = eset;
207     fd_extra.timeout = timeout;
208
209     liblustre_wait_event(0);
210     pthread_mutex_lock(&fd_extra.mutex);
211     gettimeofday(&fd_extra.submit_time, NULL);
212     fd_extra.submitted = 1;
213     LASSERT(__global_procbridge);
214     procbridge_wakeup_nal(__global_procbridge);
215
216 again:
217     if (fd_extra.submitted)
218         pthread_cond_wait(&fd_extra.cond, &fd_extra.mutex);
219     pthread_mutex_unlock(&fd_extra.mutex);
220
221     liblustre_wait_event(0);
222
223     pthread_mutex_lock(&fd_extra.mutex);
224     if (fd_extra.submitted)
225         goto again;
226     pthread_mutex_unlock(&fd_extra.mutex);
227
228     LASSERT(fd_extra.nready >= 0);
229     LASSERT(fd_extra.submitted == 0);
230     return fd_extra.nready;
231 }
232
233 static int merge_fds(int max, fd_set *rset, fd_set *wset, fd_set *eset)
234 {
235     int i;
236
237     LASSERT(rset);
238     LASSERT(wset);
239     LASSERT(eset);
240
241     for (i = 0; i < __FD_SETSIZE/__NFDBITS; i++) {
242         LASSERT(!fd_extra.rset ||
243                 !(__FDS_BITS(rset)[i] & __FDS_BITS(fd_extra.rset)[i]));
244         LASSERT(!fd_extra.wset ||
245                 !(__FDS_BITS(wset)[i] & __FDS_BITS(fd_extra.wset)[i]));
246         LASSERT(!fd_extra.eset ||
247                 !(__FDS_BITS(eset)[i] & __FDS_BITS(fd_extra.eset)[i]));
248
249         if (fd_extra.rset && __FDS_BITS(fd_extra.rset)[i])
250             __FDS_BITS(rset)[i] |= __FDS_BITS(fd_extra.rset)[i];
251         if (fd_extra.wset && __FDS_BITS(fd_extra.wset)[i])
252             __FDS_BITS(wset)[i] |= __FDS_BITS(fd_extra.wset)[i];
253         if (fd_extra.eset && __FDS_BITS(fd_extra.eset)[i])
254             __FDS_BITS(eset)[i] |= __FDS_BITS(fd_extra.eset)[i];
255     }
256
257     return (fd_extra.maxfd > max ? fd_extra.maxfd : max);
258 }
259
260 static inline
261 int timeval_ge(struct timeval *tv1, struct timeval *tv2)
262 {
263     LASSERT(tv1 && tv2);
264     return ((tv1->tv_sec - tv2->tv_sec) * 1000000 +
265             (tv1->tv_usec - tv2->tv_usec) >= 0);
266 }
267
268 /*
269  * choose the most recent timeout value
270  */
271 static struct timeval *choose_timeout(struct timeval *tv1,
272                                       struct timeval *tv2)
273 {
274     if (!tv1)
275         return tv2;
276     else if (!tv2)
277         return tv1;
278
279     if (timeval_ge(tv1, tv2))
280         return tv2;
281     else
282         return tv1;
283 }
284
285 /* Function: select_timer_block
286  * Arguments: until: an absolute time when the select should return
287  *
288  *   This function dispatches the various file descriptors' handler
289  *   functions, if the kernel indicates there is io available.
290  */
291 void select_timer_block(when until)
292 {
293     fd_set fds[3];
294     struct timeval timeout;
295     struct timeval *timeout_pointer, *select_timeout;
296     int max, nready, nexec;
297     int fd_handling;
298
299 again:
300     if (until) {
301         when interval;
302
303         interval = until - now();
304         timeout.tv_sec = (interval >> 32);
305         timeout.tv_usec = ((interval << 32) / 1000000) >> 32;
306         timeout_pointer = &timeout;
307     } else
308         timeout_pointer = NULL;
309
310     fd_handling = 0;
311     max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]);
312     select_timeout = timeout_pointer;
313
314     pthread_mutex_lock(&fd_extra.mutex);
315     fd_handling = fd_extra.submitted;
316     pthread_mutex_unlock(&fd_extra.mutex);
317     if (fd_handling) {
318         max = merge_fds(max, &fds[0], &fds[1], &fds[2]);
319         select_timeout = choose_timeout(timeout_pointer, fd_extra.timeout);
320     }
321
322     /* XXX only compile for linux */
323 #if (__WORDSIZE == 64) && !defined(__mips64__)
324     nready = syscall(SYS_select, max, &fds[0], &fds[1], &fds[2],
325                      select_timeout);
326 #else
327     nready = syscall(SYS__newselect, max, &fds[0], &fds[1], &fds[2],
328                      select_timeout);
329 #endif
330     if (nready < 0) {
331         CERROR("select return err %d, errno %d\n", nready, errno);
332         return;
333     }
334
335     if (nready) {
336         nexec = execute_callbacks(&fds[0], &fds[1], &fds[2]);
337         nready -= nexec;
338     } else
339         nexec = 0;
340
341     /* even both nready & nexec are 0, we still need try to wakeup
342      * upper thread since it may have timed out
343      */
344     if (fd_handling) {
345         LASSERT(nready >= 0);
346
347         pthread_mutex_lock(&fd_extra.mutex);
348         if (nready) {
349             if (fd_extra.rset)
350                 *fd_extra.rset = fds[0];
351             if (fd_extra.wset)
352                 *fd_extra.wset = fds[1];
353             if (fd_extra.eset)
354                 *fd_extra.eset = fds[2];
355             fd_extra.nready = nready;
356             fd_extra.submitted = 0;
357         } else {
358             struct timeval t;
359
360             fd_extra.nready = 0;
361             if (fd_extra.timeout) {
362                 gettimeofday(&t, NULL);
363                 if (timeval_ge(&t, &fd_extra.submit_time))
364                     fd_extra.submitted = 0;
365             }
366         }
367
368         pthread_cond_signal(&fd_extra.cond);
369         pthread_mutex_unlock(&fd_extra.mutex);
370     }
371
372     /* haven't found portals event, go back to loop if time
373      * is not expired */
374     if (!nexec) {
375         if (timeout_pointer == NULL || now() >= until)
376             goto again;
377     }
378 }
379
380 #else /* !ENABLE_SELECT_DISPATCH */
381
382 /* Function: select_timer_block
383  * Arguments: until: an absolute time when the select should return
384  *
385  *   This function dispatches the various file descriptors' handler
386  *   functions, if the kernel indicates there is io available.
387  */
388 void select_timer_block(when until)
389 {
390     fd_set fds[3];
391     struct timeval timeout;
392     struct timeval *timeout_pointer;
393     int max, nready;
394
395     if (until) {
396         when interval;
397         interval = until - now();
398         timeout.tv_sec = (interval >> 32);
399         timeout.tv_usec = ((interval << 32) / 1000000) >> 32;
400         timeout_pointer = &timeout;
401     } else
402         timeout_pointer = NULL;
403
404     max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]);
405
406     nready = select(max, &fds[0], &fds[1], &fds[2], timeout_pointer);
407     if (nready > 0)
408         execute_callbacks(&fds[0], &fds[1], &fds[2]);
409 }
410 #endif /* ENABLE_SELECT_DISPATCH */
411
412 /* Function: init_unix_timer()
413  *   is called to initialize the library
414  */
415 void init_unix_timer()
416 {
417     io_handlers=0;
418     gettimeofday(&beginning_of_epoch, 0);
419     initialize_timer(select_timer_block);
420 }