Whamcloud - gitweb
* landed unified portals (b_hd_cleanup_merge_singleportals) on HEAD
[fs/lustre-release.git] / lnet / ulnds / socklnd / select.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002 Cray Inc.
5  *  Copyright (c) 2002 Eric Hoffman
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 /* select.c:
24  *  Provides a general mechanism for registering and dispatching
25  *  io events through the select system call.
26  */
27
28 #ifdef sun
29 #include <sys/filio.h>
30 #else
31 #include <sys/ioctl.h>
32 #endif
33
34 #include <sys/time.h>
35 #include <sys/types.h>
36 #include <stdlib.h>
37 #include <syscall.h>
38 #include <pthread.h>
39 #include <errno.h>
40 #include <pqtimer.h>
41 #include <dispatch.h>
42 #include <procbridge.h>
43
44
45 static struct timeval beginning_of_epoch;
46 static io_handler io_handlers;
47
48 /* Function: now
49  *
50  * Return: the current time in canonical units: a 64 bit number
51  *   where the most significant 32 bits contains the number
52  *   of seconds, and the least signficant a count of (1/(2^32))ths
53  *   of a second.
54  */
55 when now()
56 {
57     struct timeval result;
58   
59     gettimeofday(&result,0);
60     return((((unsigned long long)result.tv_sec)<<32)|
61            (((unsigned long long)result.tv_usec)<<32)/1000000);
62 }
63
64
65 /* Function: register_io_handler
66  * Arguments: fd: the file descriptor of interest
67  *            type: a mask of READ_HANDLER, WRITE_HANDLER, EXCEPTION_HANDLER
68  *            function: a function to call when io is available on fd
69  *            arg: an opaque correlator to return to the handler
70  * Returns: a pointer to the io_handler structure
71  */
72 io_handler register_io_handler(int fd,
73                                int type,
74                                int (*function)(void *),
75                                void *arg)
76 {
77     io_handler i=(io_handler)malloc(sizeof(struct io_handler));
78     if ((i->fd=fd)>=0){
79         i->type=type;
80         i->function=function;
81         i->argument=arg;
82         i->disabled=0;
83         i->last=&io_handlers;
84         if ((i->next=io_handlers)) i->next->last=&i->next;
85         io_handlers=i;
86     }
87     return(i);
88 }
89
90 /* Function: remove_io_handler
91  * Arguments: i: a pointer to the handler to stop servicing
92  *
93  * remove_io_handler() doesn't actually free the handler, due
94  * to reentrancy problems. it just marks the handler for 
95  * later cleanup by the blocking function.
96  */
97 void remove_io_handler (io_handler i)
98 {
99     i->disabled=1;
100 }
101
102 static void set_flag(io_handler n,fd_set *r, fd_set *w, fd_set *e)
103 {
104     if (n->type & READ_HANDLER) FD_SET(n->fd, r);
105     if (n->type & WRITE_HANDLER) FD_SET(n->fd, w);
106     if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, e);
107 }
108
109 static int prepare_fd_sets(fd_set *r, fd_set *w, fd_set *e)
110 {
111     io_handler j;
112     io_handler *k;
113     int max = 0;
114
115     FD_ZERO(r);
116     FD_ZERO(w);
117     FD_ZERO(e);
118     for (k=&io_handlers;*k;){
119         if ((*k)->disabled){
120             j=*k;
121             *k=(*k)->next;
122             free(j);
123         }
124         if (*k) {
125             set_flag(*k,r,w,e);
126             if ((*k)->fd > max)
127                 max = (*k)->fd;
128             k=&(*k)->next;
129         }
130     }
131     return max + 1;
132 }
133
134 static int execute_callbacks(fd_set *r, fd_set *w, fd_set *e)
135 {
136     io_handler j;
137     int n = 0, t;
138
139     for (j = io_handlers; j; j = j->next) {
140         if (j->disabled)
141             continue;
142
143         t = 0;
144         if (FD_ISSET(j->fd, r) && (j->type & READ_HANDLER)) {
145             FD_CLR(j->fd, r);
146             t++;
147         }
148         if (FD_ISSET(j->fd, w) && (j->type & WRITE_HANDLER)) {
149             FD_CLR(j->fd, w);
150             t++;
151         }
152         if (FD_ISSET(j->fd, e) && (j->type & EXCEPTION_HANDLER)) {
153             FD_CLR(j->fd, e);
154             t++;
155         }
156         if (t == 0)
157             continue;
158
159         if (!(*j->function)(j->argument))
160             j->disabled = 1;
161
162         n += t;
163     }
164
165     return n;
166 }
167
168 #ifdef ENABLE_SELECT_DISPATCH
169
170 static struct {
171     pthread_mutex_t mutex;
172     pthread_cond_t  cond;
173     int             submitted;
174     int             nready;
175     int             maxfd;
176     fd_set         *rset;
177     fd_set         *wset;
178     fd_set         *eset;
179     struct timeval *timeout;
180     struct timeval  submit_time;
181 } fd_extra = {
182     PTHREAD_MUTEX_INITIALIZER,
183     PTHREAD_COND_INITIALIZER,
184     0, 0, 0,
185     NULL, NULL, NULL, NULL,
186 };
187
188 extern int liblustre_wait_event(int timeout);
189 extern procbridge __global_procbridge;
190
191 /*
192  * this will intercept syscall select() of user apps
193  * such as MPI libs.
194  */
195 int select(int n, fd_set *rset, fd_set *wset, fd_set *eset,
196            struct timeval *timeout)
197 {
198     LASSERT(fd_extra.submitted == 0);
199
200     fd_extra.nready = 0;
201     fd_extra.maxfd = n;
202     fd_extra.rset = rset;
203     fd_extra.wset = wset;
204     fd_extra.eset = eset;
205     fd_extra.timeout = timeout;
206
207     liblustre_wait_event(0);
208     pthread_mutex_lock(&fd_extra.mutex);
209     gettimeofday(&fd_extra.submit_time, NULL);
210     fd_extra.submitted = 1;
211     LASSERT(__global_procbridge);
212     procbridge_wakeup_nal(__global_procbridge);
213
214 again:
215     if (fd_extra.submitted)
216         pthread_cond_wait(&fd_extra.cond, &fd_extra.mutex);
217     pthread_mutex_unlock(&fd_extra.mutex);
218
219     liblustre_wait_event(0);
220
221     pthread_mutex_lock(&fd_extra.mutex);
222     if (fd_extra.submitted)
223         goto again;
224     pthread_mutex_unlock(&fd_extra.mutex);
225
226     LASSERT(fd_extra.nready >= 0);
227     LASSERT(fd_extra.submitted == 0);
228     return fd_extra.nready;
229 }
230
231 static int merge_fds(int max, fd_set *rset, fd_set *wset, fd_set *eset)
232 {
233     int i;
234
235     LASSERT(rset);
236     LASSERT(wset);
237     LASSERT(eset);
238
239     for (i = 0; i < __FD_SETSIZE/__NFDBITS; i++) {
240         LASSERT(!fd_extra.rset ||
241                 !(__FDS_BITS(rset)[i] & __FDS_BITS(fd_extra.rset)[i]));
242         LASSERT(!fd_extra.wset ||
243                 !(__FDS_BITS(wset)[i] & __FDS_BITS(fd_extra.wset)[i]));
244         LASSERT(!fd_extra.eset ||
245                 !(__FDS_BITS(eset)[i] & __FDS_BITS(fd_extra.eset)[i]));
246
247         if (fd_extra.rset && __FDS_BITS(fd_extra.rset)[i])
248             __FDS_BITS(rset)[i] |= __FDS_BITS(fd_extra.rset)[i];
249         if (fd_extra.wset && __FDS_BITS(fd_extra.wset)[i])
250             __FDS_BITS(wset)[i] |= __FDS_BITS(fd_extra.wset)[i];
251         if (fd_extra.eset && __FDS_BITS(fd_extra.eset)[i])
252             __FDS_BITS(eset)[i] |= __FDS_BITS(fd_extra.eset)[i];
253     }
254
255     return (fd_extra.maxfd > max ? fd_extra.maxfd : max);
256 }
257
258 static inline
259 int timeval_ge(struct timeval *tv1, struct timeval *tv2)
260 {
261     LASSERT(tv1 && tv2);
262     return ((tv1->tv_sec - tv2->tv_sec) * 1000000 +
263             (tv1->tv_usec - tv2->tv_usec) >= 0);
264 }
265
266 /*
267  * choose the most recent timeout value
268  */
269 static struct timeval *choose_timeout(struct timeval *tv1,
270                                       struct timeval *tv2)
271 {
272     if (!tv1)
273         return tv2;
274     else if (!tv2)
275         return tv1;
276
277     if (timeval_ge(tv1, tv2))
278         return tv2;
279     else
280         return tv1;
281 }
282
283 /* Function: select_timer_block
284  * Arguments: until: an absolute time when the select should return
285  * 
286  *   This function dispatches the various file descriptors' handler
287  *   functions, if the kernel indicates there is io available.
288  */
289 void select_timer_block(when until)
290 {
291     fd_set fds[3];
292     struct timeval timeout;
293     struct timeval *timeout_pointer, *select_timeout;
294     int max, nready, nexec;
295     int fd_handling;
296
297 again:
298     if (until) {
299         when interval;
300
301         interval = until - now();
302         timeout.tv_sec = (interval >> 32);
303         timeout.tv_usec = ((interval << 32) / 1000000) >> 32;
304         timeout_pointer = &timeout;
305     } else
306         timeout_pointer = NULL;
307
308     fd_handling = 0;
309     max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]);
310     select_timeout = timeout_pointer;
311
312     pthread_mutex_lock(&fd_extra.mutex);
313     fd_handling = fd_extra.submitted;
314     pthread_mutex_unlock(&fd_extra.mutex);
315     if (fd_handling) {
316         max = merge_fds(max, &fds[0], &fds[1], &fds[2]);
317         select_timeout = choose_timeout(timeout_pointer, fd_extra.timeout);
318     }
319
320     /* XXX only compile for linux */
321 #if __WORDSIZE == 64
322     nready = syscall(SYS_select, max, &fds[0], &fds[1], &fds[2],
323                      select_timeout);
324 #else
325     nready = syscall(SYS__newselect, max, &fds[0], &fds[1], &fds[2],
326                      select_timeout);
327 #endif
328     if (nready < 0) {
329         CERROR("select return err %d, errno %d\n", nready, errno);
330         return;
331     }
332
333     if (nready) {
334         nexec = execute_callbacks(&fds[0], &fds[1], &fds[2]);
335         nready -= nexec;
336     } else
337         nexec = 0;
338
339     /* even both nready & nexec are 0, we still need try to wakeup
340      * upper thread since it may have timed out
341      */
342     if (fd_handling) {
343         LASSERT(nready >= 0);
344
345         pthread_mutex_lock(&fd_extra.mutex);
346         if (nready) {
347             if (fd_extra.rset)
348                 *fd_extra.rset = fds[0];
349             if (fd_extra.wset)
350                 *fd_extra.wset = fds[1];
351             if (fd_extra.eset)
352                 *fd_extra.eset = fds[2];
353             fd_extra.nready = nready;
354             fd_extra.submitted = 0;
355         } else {
356             struct timeval t;
357
358             fd_extra.nready = 0;
359             if (fd_extra.timeout) {
360                 gettimeofday(&t, NULL);
361                 if (timeval_ge(&t, &fd_extra.submit_time))
362                     fd_extra.submitted = 0;
363             }
364         }
365
366         pthread_cond_signal(&fd_extra.cond);
367         pthread_mutex_unlock(&fd_extra.mutex);
368     }
369
370     /* haven't found portals event, go back to loop if time
371      * is not expired */
372     if (!nexec) {
373         if (timeout_pointer == NULL || now() >= until)
374             goto again;
375     }
376 }
377
378 #else /* !ENABLE_SELECT_DISPATCH */
379
380 /* Function: select_timer_block
381  * Arguments: until: an absolute time when the select should return
382  * 
383  *   This function dispatches the various file descriptors' handler
384  *   functions, if the kernel indicates there is io available.
385  */
386 void select_timer_block(when until)
387 {
388     fd_set fds[3];
389     struct timeval timeout;
390     struct timeval *timeout_pointer;
391     int max, nready;
392
393 again:
394     if (until) {
395         when interval;
396         interval = until - now();
397         timeout.tv_sec = (interval >> 32);
398         timeout.tv_usec = ((interval << 32) / 1000000) >> 32;
399         timeout_pointer = &timeout;
400     } else
401         timeout_pointer = NULL;
402
403     max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]);
404
405     nready = select(max, &fds[0], &fds[1], &fds[2], timeout_pointer);
406     if (nready > 0)
407         execute_callbacks(&fds[0], &fds[1], &fds[2]);
408 }
409 #endif /* ENABLE_SELECT_DISPATCH */
410
411 /* Function: init_unix_timer()
412  *   is called to initialize the library 
413  */
414 void init_unix_timer()
415 {
416     io_handlers=0;
417     gettimeofday(&beginning_of_epoch, 0);
418     initialize_timer(select_timer_block);
419 }