Whamcloud - gitweb
* landed unified portals (b_hd_cleanup_merge_singleportals) on HEAD
[fs/lustre-release.git] / lustre / portals / unals / select.c
index c4ccae1..09e1542 100644 (file)
 #include <sys/time.h>
 #include <sys/types.h>
 #include <stdlib.h>
+#include <syscall.h>
+#include <pthread.h>
+#include <errno.h>
 #include <pqtimer.h>
 #include <dispatch.h>
+#include <procbridge.h>
 
 
 static struct timeval beginning_of_epoch;
@@ -95,40 +99,22 @@ void remove_io_handler (io_handler i)
     i->disabled=1;
 }
 
-static void set_flag(io_handler n,fd_set *fds)
+static void set_flag(io_handler n,fd_set *r, fd_set *w, fd_set *e)
 {
-    if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]);
-    if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]);
-    if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]);
+    if (n->type & READ_HANDLER) FD_SET(n->fd, r);
+    if (n->type & WRITE_HANDLER) FD_SET(n->fd, w);
+    if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, e);
 }
 
-
-/* Function: select_timer_block
- * Arguments: until: an absolute time when the select should return
- * 
- *   This function dispatches the various file descriptors' handler
- *   functions, if the kernel indicates there is io available.
- */
-void select_timer_block(when until)
+static int prepare_fd_sets(fd_set *r, fd_set *w, fd_set *e)
 {
-    fd_set fds[3];
-    struct timeval timeout;
-    struct timeval *timeout_pointer;
-    int result;
     io_handler j;
     io_handler *k;
+    int max = 0;
 
-    /* TODO: loop until the entire interval is expired*/
-    if (until){
-       when interval=until-now();
-        timeout.tv_sec=(interval>>32);
-        timeout.tv_usec=((interval<<32)/1000000)>>32;
-        timeout_pointer=&timeout;
-    } else timeout_pointer=0;
-
-    FD_ZERO(&fds[0]);
-    FD_ZERO(&fds[1]);
-    FD_ZERO(&fds[2]);
+    FD_ZERO(r);
+    FD_ZERO(w);
+    FD_ZERO(e);
     for (k=&io_handlers;*k;){
         if ((*k)->disabled){
             j=*k;
@@ -136,24 +122,291 @@ void select_timer_block(when until)
             free(j);
         }
         if (*k) {
-           set_flag(*k,fds);
+           set_flag(*k,r,w,e);
+            if ((*k)->fd > max)
+                max = (*k)->fd;
            k=&(*k)->next;
        }
     }
+    return max + 1;
+}
+
+static int execute_callbacks(fd_set *r, fd_set *w, fd_set *e)
+{
+    io_handler j;
+    int n = 0, t;
+
+    for (j = io_handlers; j; j = j->next) {
+        if (j->disabled)
+            continue;
+
+        t = 0;
+        if (FD_ISSET(j->fd, r) && (j->type & READ_HANDLER)) {
+            FD_CLR(j->fd, r);
+            t++;
+        }
+        if (FD_ISSET(j->fd, w) && (j->type & WRITE_HANDLER)) {
+            FD_CLR(j->fd, w);
+            t++;
+        }
+        if (FD_ISSET(j->fd, e) && (j->type & EXCEPTION_HANDLER)) {
+            FD_CLR(j->fd, e);
+            t++;
+        }
+        if (t == 0)
+            continue;
+
+        if (!(*j->function)(j->argument))
+            j->disabled = 1;
+
+        n += t;
+    }
+
+    return n;
+}
 
-    result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer);
+#ifdef ENABLE_SELECT_DISPATCH
 
-    if (result > 0)
-        for (j=io_handlers;j;j=j->next){
-            if (!(j->disabled) && 
-                ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) ||
-                 (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) ||
-                 (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){
-                if (!(*j->function)(j->argument))
-                    j->disabled=1;
+static struct {
+    pthread_mutex_t mutex;
+    pthread_cond_t  cond;
+    int             submitted;
+    int             nready;
+    int             maxfd;
+    fd_set         *rset;
+    fd_set         *wset;
+    fd_set         *eset;
+    struct timeval *timeout;
+    struct timeval  submit_time;
+} fd_extra = {
+    PTHREAD_MUTEX_INITIALIZER,
+    PTHREAD_COND_INITIALIZER,
+    0, 0, 0,
+    NULL, NULL, NULL, NULL,
+};
+
+extern int liblustre_wait_event(int timeout);
+extern procbridge __global_procbridge;
+
+/*
+ * this will intercept syscall select() of user apps
+ * such as MPI libs.
+ */
+int select(int n, fd_set *rset, fd_set *wset, fd_set *eset,
+           struct timeval *timeout)
+{
+    LASSERT(fd_extra.submitted == 0);
+
+    fd_extra.nready = 0;
+    fd_extra.maxfd = n;
+    fd_extra.rset = rset;
+    fd_extra.wset = wset;
+    fd_extra.eset = eset;
+    fd_extra.timeout = timeout;
+
+    liblustre_wait_event(0);
+    pthread_mutex_lock(&fd_extra.mutex);
+    gettimeofday(&fd_extra.submit_time, NULL);
+    fd_extra.submitted = 1;
+    LASSERT(__global_procbridge);
+    procbridge_wakeup_nal(__global_procbridge);
+
+again:
+    if (fd_extra.submitted)
+        pthread_cond_wait(&fd_extra.cond, &fd_extra.mutex);
+    pthread_mutex_unlock(&fd_extra.mutex);
+
+    liblustre_wait_event(0);
+
+    pthread_mutex_lock(&fd_extra.mutex);
+    if (fd_extra.submitted)
+        goto again;
+    pthread_mutex_unlock(&fd_extra.mutex);
+
+    LASSERT(fd_extra.nready >= 0);
+    LASSERT(fd_extra.submitted == 0);
+    return fd_extra.nready;
+}
+
+static int merge_fds(int max, fd_set *rset, fd_set *wset, fd_set *eset)
+{
+    int i;
+
+    LASSERT(rset);
+    LASSERT(wset);
+    LASSERT(eset);
+
+    for (i = 0; i < __FD_SETSIZE/__NFDBITS; i++) {
+        LASSERT(!fd_extra.rset ||
+                !(__FDS_BITS(rset)[i] & __FDS_BITS(fd_extra.rset)[i]));
+        LASSERT(!fd_extra.wset ||
+                !(__FDS_BITS(wset)[i] & __FDS_BITS(fd_extra.wset)[i]));
+        LASSERT(!fd_extra.eset ||
+                !(__FDS_BITS(eset)[i] & __FDS_BITS(fd_extra.eset)[i]));
+
+        if (fd_extra.rset && __FDS_BITS(fd_extra.rset)[i])
+            __FDS_BITS(rset)[i] |= __FDS_BITS(fd_extra.rset)[i];
+        if (fd_extra.wset && __FDS_BITS(fd_extra.wset)[i])
+            __FDS_BITS(wset)[i] |= __FDS_BITS(fd_extra.wset)[i];
+        if (fd_extra.eset && __FDS_BITS(fd_extra.eset)[i])
+            __FDS_BITS(eset)[i] |= __FDS_BITS(fd_extra.eset)[i];
+    }
+
+    return (fd_extra.maxfd > max ? fd_extra.maxfd : max);
+}
+
+static inline
+int timeval_ge(struct timeval *tv1, struct timeval *tv2)
+{
+    LASSERT(tv1 && tv2);
+    return ((tv1->tv_sec - tv2->tv_sec) * 1000000 +
+            (tv1->tv_usec - tv2->tv_usec) >= 0);
+}
+
+/*
+ * choose the most recent timeout value
+ */
+static struct timeval *choose_timeout(struct timeval *tv1,
+                                      struct timeval *tv2)
+{
+    if (!tv1)
+        return tv2;
+    else if (!tv2)
+        return tv1;
+
+    if (timeval_ge(tv1, tv2))
+        return tv2;
+    else
+        return tv1;
+}
+
+/* Function: select_timer_block
+ * Arguments: until: an absolute time when the select should return
+ * 
+ *   This function dispatches the various file descriptors' handler
+ *   functions, if the kernel indicates there is io available.
+ */
+void select_timer_block(when until)
+{
+    fd_set fds[3];
+    struct timeval timeout;
+    struct timeval *timeout_pointer, *select_timeout;
+    int max, nready, nexec;
+    int fd_handling;
+
+again:
+    if (until) {
+        when interval;
+
+        interval = until - now();
+        timeout.tv_sec = (interval >> 32);
+        timeout.tv_usec = ((interval << 32) / 1000000) >> 32;
+        timeout_pointer = &timeout;
+    } else
+        timeout_pointer = NULL;
+
+    fd_handling = 0;
+    max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]);
+    select_timeout = timeout_pointer;
+
+    pthread_mutex_lock(&fd_extra.mutex);
+    fd_handling = fd_extra.submitted;
+    pthread_mutex_unlock(&fd_extra.mutex);
+    if (fd_handling) {
+        max = merge_fds(max, &fds[0], &fds[1], &fds[2]);
+        select_timeout = choose_timeout(timeout_pointer, fd_extra.timeout);
+    }
+
+    /* XXX only compile for linux */
+#if __WORDSIZE == 64
+    nready = syscall(SYS_select, max, &fds[0], &fds[1], &fds[2],
+                     select_timeout);
+#else
+    nready = syscall(SYS__newselect, max, &fds[0], &fds[1], &fds[2],
+                     select_timeout);
+#endif
+    if (nready < 0) {
+        CERROR("select return err %d, errno %d\n", nready, errno);
+        return;
+    }
+
+    if (nready) {
+        nexec = execute_callbacks(&fds[0], &fds[1], &fds[2]);
+        nready -= nexec;
+    } else
+        nexec = 0;
+
+    /* even both nready & nexec are 0, we still need try to wakeup
+     * upper thread since it may have timed out
+     */
+    if (fd_handling) {
+        LASSERT(nready >= 0);
+
+        pthread_mutex_lock(&fd_extra.mutex);
+        if (nready) {
+            if (fd_extra.rset)
+                *fd_extra.rset = fds[0];
+            if (fd_extra.wset)
+                *fd_extra.wset = fds[1];
+            if (fd_extra.eset)
+                *fd_extra.eset = fds[2];
+            fd_extra.nready = nready;
+            fd_extra.submitted = 0;
+        } else {
+            struct timeval t;
+
+            fd_extra.nready = 0;
+            if (fd_extra.timeout) {
+                gettimeofday(&t, NULL);
+                if (timeval_ge(&t, &fd_extra.submit_time))
+                    fd_extra.submitted = 0;
             }
         }
+
+        pthread_cond_signal(&fd_extra.cond);
+        pthread_mutex_unlock(&fd_extra.mutex);
+    }
+
+    /* haven't found portals event, go back to loop if time
+     * is not expired */
+    if (!nexec) {
+        if (timeout_pointer == NULL || now() >= until)
+            goto again;
+    }
+}
+
+#else /* !ENABLE_SELECT_DISPATCH */
+
+/* Function: select_timer_block
+ * Arguments: until: an absolute time when the select should return
+ * 
+ *   This function dispatches the various file descriptors' handler
+ *   functions, if the kernel indicates there is io available.
+ */
+void select_timer_block(when until)
+{
+    fd_set fds[3];
+    struct timeval timeout;
+    struct timeval *timeout_pointer;
+    int max, nready;
+
+again:
+    if (until) {
+        when interval;
+        interval = until - now();
+        timeout.tv_sec = (interval >> 32);
+        timeout.tv_usec = ((interval << 32) / 1000000) >> 32;
+        timeout_pointer = &timeout;
+    } else
+        timeout_pointer = NULL;
+
+    max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]);
+
+    nready = select(max, &fds[0], &fds[1], &fds[2], timeout_pointer);
+    if (nready > 0)
+        execute_callbacks(&fds[0], &fds[1], &fds[2]);
 }
+#endif /* ENABLE_SELECT_DISPATCH */
 
 /* Function: init_unix_timer()
  *   is called to initialize the library