Whamcloud - gitweb
[tcpnal]: rewrite the data out path.
authorericm <ericm>
Wed, 20 Aug 2003 07:22:07 +0000 (07:22 +0000)
committerericm <ericm>
Wed, 20 Aug 2003 07:22:07 +0000 (07:22 +0000)
  Get rid of pipe as a bridge between user threads and nal threads, which
  is not so efficient and don't work well in Cygwin. Now user thread might
  directly call into portals lib via lib_dispatch.

  2 issues remain:
  - sometimes during startup low level nal threads could run into endless
    waiting.
  - need more sycronous facilities to prevent race.

15 files changed:
lnet/ulnds/procapi.c
lnet/ulnds/procbridge.h
lnet/ulnds/proclib.c
lnet/ulnds/select.c
lnet/ulnds/socklnd/procapi.c
lnet/ulnds/socklnd/procbridge.h
lnet/ulnds/socklnd/proclib.c
lnet/ulnds/socklnd/select.c
lnet/ulnds/socklnd/tcplnd.c
lnet/ulnds/tcplnd.c
lustre/portals/unals/procapi.c
lustre/portals/unals/procbridge.h
lustre/portals/unals/proclib.c
lustre/portals/unals/select.c
lustre/portals/unals/tcpnal.c

index dfcd743..724ee74 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  *
  * forwards a packaged api call from the 'api' side to the 'library'
  *   side, and collects the result
  */
-/* XXX CFS workaround:
- * when multiple threads call forward at the same time, data in the
- * pipe will be trampled. add a mutex to serialize the access, as
- * a temporary solution.
- */
-#define forward_failure(operand,fd,buffer,length)\
-       if(syscall(SYS_##operand,fd,buffer,length)!=length){\
-          lib_fini(b->nal_cb);\
-          return(PTL_SEGV);\
-       }
 static int procbridge_forward(nal_t *n, int id, void *args, ptl_size_t args_len,
                              void *ret, ptl_size_t ret_len)
 {
-    bridge b=(bridge)n->nal_data;
-    procbridge p=(procbridge)b->local;
-    int lib=p->to_lib[1];
-    static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
-    int k;
-
-    /* protect the whole access to pipe */
-    pthread_mutex_lock(&mut);
+    bridge b = (bridge) n->nal_data;
 
-    forward_failure(write,lib, &id, sizeof(id));
-    forward_failure(write,lib,&args_len, sizeof(args_len));
-    forward_failure(write,lib,&ret_len, sizeof(ret_len));
-    forward_failure(write,lib,args, args_len);
+    if (id == PTL_FINI) {
+            lib_fini(b->nal_cb);
 
-    do {
-        k=syscall(SYS_read, p->from_lib[0], ret, ret_len);
-    } while ((k!=ret_len) && (errno += EINTR));
+            if (b->shutdown)
+                (*b->shutdown)(b);
+    }
 
-    pthread_mutex_unlock(&mut);
+    lib_dispatch(b->nal_cb, NULL, id, args, ret);
 
-    if(k!=ret_len){
-        perror("nal: read return block");
-        return PTL_SEGV;
-    }
     return (PTL_OK);
 }
-#undef forward_failure
 
 
 /* Function: shutdown
@@ -101,15 +78,18 @@ static int procbridge_shutdown(nal_t *n, int ni)
 {
     bridge b=(bridge)n->nal_data;
     procbridge p=(procbridge)b->local;
-    int code=PTL_FINI;
 
-    syscall(SYS_write, p->to_lib[1],&code,sizeof(code));
-    syscall(SYS_read, p->from_lib[0],&code,sizeof(code));
+    p->nal_flags |= NAL_FLAG_STOPPING;
 
-    syscall(SYS_close, p->to_lib[0]);
-    syscall(SYS_close, p->to_lib[1]);
-    syscall(SYS_close, p->from_lib[0]);
-    syscall(SYS_close, p->from_lib[1]);
+    do {
+        pthread_mutex_lock(&p->mutex);
+        if (p->nal_flags & NAL_FLAG_STOPPED) {
+                pthread_mutex_unlock(&p->mutex);
+                break;
+        }
+        pthread_cond_wait(&p->cond, &p->mutex);
+        pthread_mutex_unlock(&p->mutex);
+    } while (1);
 
     free(p);
     return(0);
@@ -162,7 +142,9 @@ static nal_t api_nal = {
     unlock:   procbridge_unlock
 };
 
-/* Function: bridge_init
+ptl_nid_t tcpnal_mynid;
+
+/* Function: procbridge_interface
  *
  * Arguments:  pid: requested process id (port offset)
  *                  PTL_ID_ANY not supported.
@@ -176,77 +158,17 @@ static nal_t api_nal = {
  * initializes the tcp nal. we define unix_failure as an
  * error wrapper to cut down clutter.
  */
-#define unix_failure(operand,fd,buffer,length,text)\
-       if(syscall(SYS_##operand,fd,buffer,length)!=length){\
-          perror(text);\
-          return(NULL);\
-       }
-#if 0
-static nal_t *bridge_init(ptl_interface_t nal,
-                          ptl_pid_t pid_request,
-                          ptl_ni_limits_t *desired,
-                          ptl_ni_limits_t *actual,
-                          int *rc)
-{
-    procbridge p;
-    bridge b;
-    static int initialized=0;
-    ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
-
-    if(initialized) return (&api_nal);
-
-    init_unix_timer();
-
-    b=(bridge)malloc(sizeof(struct bridge));
-    p=(procbridge)malloc(sizeof(struct procbridge));
-    api_nal.nal_data=b;
-    b->local=p;
-
-    if(pipe(p->to_lib) || pipe(p->from_lib)) {
-        perror("nal_init: pipe");
-        return(NULL);
-    }
-
-    if (desired) limits = *desired;
-    unix_failure(write,p->to_lib[1], &pid_request, sizeof(pid_request),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &nal, sizeof(ptl_interface_t),
-                       "nal_init: write");
-
-    if(pthread_create(&p->t, NULL, nal_thread, b)) {
-        perror("nal_init: pthread_create");
-        return(NULL);
-    }
-
-    unix_failure(read,p->from_lib[0], actual, sizeof(ptl_ni_limits_t),
-                 "tcp_init: read");
-    unix_failure(read,p->from_lib[0], rc, sizeof(rc),
-                 "nal_init: read");
-
-    if(*rc) return(NULL);
-
-    initialized = 1;
-    pthread_mutex_init(&p->mutex,0);
-    pthread_cond_init(&p->cond, 0);
-
-    return (&api_nal);
-}
-#endif
-
-ptl_nid_t tcpnal_mynid;
-
 nal_t *procbridge_interface(int num_interface,
                             ptl_pt_index_t ptl_size,
                             ptl_ac_index_t acl_size,
                             ptl_pid_t requested_pid)
 {
+    nal_init_args_t args;
     procbridge p;
     bridge b;
     static int initialized=0;
     ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
-    int rc, nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
+    int nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
 
     if(initialized) return (&api_nal);
 
@@ -257,38 +179,40 @@ nal_t *procbridge_interface(int num_interface,
     api_nal.nal_data=b;
     b->local=p;
 
-    if(pipe(p->to_lib) || pipe(p->from_lib)) {
-        perror("nal_init: pipe");
-        return(NULL);
-    }
-
     if (ptl_size)
            limits.max_ptable_index = ptl_size;
     if (acl_size)
            limits.max_atable_index = acl_size;
 
-    unix_failure(write,p->to_lib[1], &requested_pid, sizeof(requested_pid),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &nal_type, sizeof(nal_type),
-                       "nal_init: write");
+    args.nia_requested_pid = requested_pid;
+    args.nia_limits = &limits;
+    args.nia_nal_type = nal_type;
+    args.nia_bridge = b;
 
-    if(pthread_create(&p->t, NULL, nal_thread, b)) {
+    pthread_mutex_init(&p->mutex,0);
+    pthread_cond_init(&p->cond, 0);
+    p->nal_flags = 0;
+
+    if (pthread_create(&p->t, NULL, nal_thread, &args)) {
         perror("nal_init: pthread_create");
         return(NULL);
     }
 
-    unix_failure(read,p->from_lib[0], &rc, sizeof(rc),
-                 "nal_init: read");
-
-    if(rc) return(NULL);
+    do {
+        pthread_mutex_lock(&p->mutex);
+        if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) {
+                pthread_mutex_unlock(&p->mutex);
+                break;
+        }
+        pthread_cond_wait(&p->cond, &p->mutex);
+        pthread_mutex_unlock(&p->mutex);
+    } while (1);
+
+    if (p->nal_flags & NAL_FLAG_STOPPED)
+        return (NULL);
 
     b->nal_cb->ni.nid = tcpnal_mynid;
     initialized = 1;
-    pthread_mutex_init(&p->mutex,0);
-    pthread_cond_init(&p->cond, 0);
 
     return (&api_nal);
 }
-#undef unix_failure
index 060ae7b..f65b3bf 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  */
 #include <ipmap.h>
 
 
+#define NAL_FLAG_RUNNING        1
+#define NAL_FLAG_STOPPING       2
+#define NAL_FLAG_STOPPED        4
+
 typedef struct procbridge {
     pthread_t t;
     pthread_cond_t cond;
     pthread_mutex_t mutex;
-    int to_lib[2];
-    int from_lib[2];
+
+    int nal_flags;
 } *procbridge;
 
+typedef struct nal_init_args {
+    ptl_pid_t        nia_requested_pid;
+    ptl_ni_limits_t *nia_limits;
+    int              nia_nal_type;
+    bridge           nia_bridge;
+} nal_init_args_t;
+
 extern void *nal_thread(void *);
 
 
@@ -33,8 +45,8 @@ extern void *nal_thread(void *);
 
 extern void set_address(bridge t,ptl_pid_t pidrequest);
 extern nal_t *procbridge_interface(int num_interface,
-                            ptl_pt_index_t ptl_size,
-                            ptl_ac_index_t acl_size,
-                            ptl_pid_t requested_pid);
+                                   ptl_pt_index_t ptl_size,
+                                   ptl_ac_index_t acl_size,
+                                   ptl_pid_t requested_pid);
 
 #endif
index c3ee103..38bbf28 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  *
@@ -38,7 +39,6 @@
 #include <netdb.h>
 #include <errno.h>
 #include <timer.h>
-//#include <util/pqtimer.h>
 #include <dispatch.h>
 
 /* the following functions are stubs to satisfy the nal definition
@@ -108,69 +108,22 @@ static int nal_dist(nal_cb_t *nal,
 {
     return 0;
 }
-    
-
-
-/* Function:  data_from_api
- * Arguments: t: the nal state for this interface
- * Returns: whether to continue reading from the pipe
- *
- *   data_from_api() reads data from the api side in response
- *   to a select.
- *
- *   We define data_failure() for syntactic convenience
- *   of unix error reporting.
- */
-
-#define data_failure(operand,fd,buffer,length)\
-       if(syscall(SYS_##operand,fd,buffer,length)!=length){\
-          lib_fini(b->nal_cb);\
-          return(0);\
-       }
-static int data_from_api(void *arg)
-{
-        bridge b = arg;
-    procbridge p=(procbridge)b->local;
-    /* where are these two sizes derived from ??*/
-    char arg_block[ 256 ];
-    char ret_block[ 128 ];
-    ptl_size_t arg_len,ret_len;
-    int fd=p->to_lib[0];
-    int index;
-
-    data_failure(read,fd, &index, sizeof(index));
-
-    if (index==PTL_FINI) {
-        lib_fini(b->nal_cb);
-        if (b->shutdown) (*b->shutdown)(b);
-        syscall(SYS_write, p->from_lib[1],&b->alive,sizeof(b->alive));
-
-        /* a heavy-handed but convenient way of shutting down
-           the lower side thread */
-        pthread_exit(0);
-    }
-
-    data_failure(read,fd, &arg_len, sizeof(arg_len));
-    data_failure(read,fd, &ret_len, sizeof(ret_len));
-    data_failure(read,fd, arg_block, arg_len);
-
-    lib_dispatch(b->nal_cb, NULL, index, arg_block, ret_block);
-
-    data_failure(write,p->from_lib[1],ret_block, ret_len);
-    return(1);
-}
-#undef data_failure
-
-
 
 static void wakeup_topside(void *z)
 {
-    bridge b=z;
-    procbridge p=b->local;
+    bridge b = z;
+    procbridge p = b->local;
+    int stop;
 
     pthread_mutex_lock(&p->mutex);
+    stop = p->nal_flags & NAL_FLAG_STOPPING;
+    if (stop)
+        p->nal_flags |= NAL_FLAG_STOPPED;
     pthread_cond_broadcast(&p->cond);
     pthread_mutex_unlock(&p->mutex);
+
+    if (stop)
+        pthread_exit(0);
 }
 
 
@@ -195,7 +148,8 @@ nal_initialize nal_table[PTL_IFACE_MAX]={0,tcpnal_init,0};
 
 void *nal_thread(void *z)
 {
-    bridge b=z;
+    nal_init_args_t *args = (nal_init_args_t *) z;
+    bridge b = args->nia_bridge;
     procbridge p=b->local;
     int rc;
     ptl_pid_t pid_request;
@@ -216,15 +170,9 @@ void *nal_thread(void *z)
     b->nal_cb->cb_sti=nal_sti;
     b->nal_cb->cb_dist=nal_dist;
 
-
-    register_io_handler(p->to_lib[0],READ_HANDLER,data_from_api,(void *)b);
-
-    if(!(rc = syscall(SYS_read, p->to_lib[0], &pid_request, sizeof(pid_request))))
-        perror("procbridge read from api");
-    if(!(rc = syscall(SYS_read, p->to_lib[0], &desired, sizeof(ptl_ni_limits_t))))
-        perror("procbridge read from api");
-    if(!(rc = syscall(SYS_read, p->to_lib[0], &nal_type, sizeof(nal_type))))
-        perror("procbridge read from api");
+    pid_request = args->nia_requested_pid;
+    desired = *args->nia_limits;
+    nal_type = args->nia_nal_type;
 
     actual = desired;
     LIMIT(desired.max_match_entries,actual.max_match_entries,MAX_MES);
@@ -251,12 +199,12 @@ void *nal_thread(void *z)
      * it is non-zero since something went wrong.
      */
     /* this should perform error checking */
-#if 0
-    write(p->from_lib[1], &actual, sizeof(ptl_ni_limits_t));
-#endif
-    syscall(SYS_write, p->from_lib[1], &rc, sizeof(rc));
-    
-    if(!rc) {
+    pthread_mutex_lock(&p->mutex);
+    p->nal_flags |= rc ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING;
+    pthread_cond_broadcast(&p->cond);
+    pthread_mutex_unlock(&p->mutex);
+
+    if (!rc) {
         /* the thunk function is called each time the timer loop
            performs an operation and returns to blocking mode. we
            overload this function to inform the api side that
@@ -267,4 +215,3 @@ void *nal_thread(void *z)
     return(0);
 }
 #undef LIMIT
-
index c4f84f4..bcfba02 100644 (file)
@@ -97,9 +97,9 @@ void remove_io_handler (io_handler i)
 
 static void set_flag(io_handler n,fd_set *fds)
 {
-    if (n->type & READ_HANDLER) FD_SET(n->fd,fds);
-    if (n->type & WRITE_HANDLER) FD_SET(n->fd,fds+1);
-    if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd,fds+2);
+    if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]);
+    if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]);
+    if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]);
 }
 
 
@@ -126,9 +126,9 @@ void select_timer_block(when until)
         timeout_pointer=&timeout;
     } else timeout_pointer=0;
 
-    FD_ZERO(fds);
-    FD_ZERO(fds+1);
-    FD_ZERO(fds+2);
+    FD_ZERO(&fds[0]);
+    FD_ZERO(&fds[1]);
+    FD_ZERO(&fds[2]);
     for (k=&io_handlers;*k;){
         if ((*k)->disabled){
             j=*k;
@@ -140,14 +140,15 @@ void select_timer_block(when until)
            k=&(*k)->next;
        }
     }
-    result=select(FD_SETSIZE,fds,fds+1,fds+2,timeout_pointer);
+
+    result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer);
 
     if (result > 0)
         for (j=io_handlers;j;j=j->next){
             if (!(j->disabled) && 
-                ((FD_ISSET(j->fd,fds) && (j->type & READ_HANDLER)) ||
-                 (FD_ISSET(j->fd,fds+1) && (j->type & WRITE_HANDLER)) ||
-                 (FD_ISSET(j->fd,fds+2) && (j->type & EXCEPTION_HANDLER)))){
+                ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) ||
+                 (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) ||
+                 (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){
                 if (!(*j->function)(j->argument))
                     j->disabled=1;
             }
index dfcd743..724ee74 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  *
  * forwards a packaged api call from the 'api' side to the 'library'
  *   side, and collects the result
  */
-/* XXX CFS workaround:
- * when multiple threads call forward at the same time, data in the
- * pipe will be trampled. add a mutex to serialize the access, as
- * a temporary solution.
- */
-#define forward_failure(operand,fd,buffer,length)\
-       if(syscall(SYS_##operand,fd,buffer,length)!=length){\
-          lib_fini(b->nal_cb);\
-          return(PTL_SEGV);\
-       }
 static int procbridge_forward(nal_t *n, int id, void *args, ptl_size_t args_len,
                              void *ret, ptl_size_t ret_len)
 {
-    bridge b=(bridge)n->nal_data;
-    procbridge p=(procbridge)b->local;
-    int lib=p->to_lib[1];
-    static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
-    int k;
-
-    /* protect the whole access to pipe */
-    pthread_mutex_lock(&mut);
+    bridge b = (bridge) n->nal_data;
 
-    forward_failure(write,lib, &id, sizeof(id));
-    forward_failure(write,lib,&args_len, sizeof(args_len));
-    forward_failure(write,lib,&ret_len, sizeof(ret_len));
-    forward_failure(write,lib,args, args_len);
+    if (id == PTL_FINI) {
+            lib_fini(b->nal_cb);
 
-    do {
-        k=syscall(SYS_read, p->from_lib[0], ret, ret_len);
-    } while ((k!=ret_len) && (errno += EINTR));
+            if (b->shutdown)
+                (*b->shutdown)(b);
+    }
 
-    pthread_mutex_unlock(&mut);
+    lib_dispatch(b->nal_cb, NULL, id, args, ret);
 
-    if(k!=ret_len){
-        perror("nal: read return block");
-        return PTL_SEGV;
-    }
     return (PTL_OK);
 }
-#undef forward_failure
 
 
 /* Function: shutdown
@@ -101,15 +78,18 @@ static int procbridge_shutdown(nal_t *n, int ni)
 {
     bridge b=(bridge)n->nal_data;
     procbridge p=(procbridge)b->local;
-    int code=PTL_FINI;
 
-    syscall(SYS_write, p->to_lib[1],&code,sizeof(code));
-    syscall(SYS_read, p->from_lib[0],&code,sizeof(code));
+    p->nal_flags |= NAL_FLAG_STOPPING;
 
-    syscall(SYS_close, p->to_lib[0]);
-    syscall(SYS_close, p->to_lib[1]);
-    syscall(SYS_close, p->from_lib[0]);
-    syscall(SYS_close, p->from_lib[1]);
+    do {
+        pthread_mutex_lock(&p->mutex);
+        if (p->nal_flags & NAL_FLAG_STOPPED) {
+                pthread_mutex_unlock(&p->mutex);
+                break;
+        }
+        pthread_cond_wait(&p->cond, &p->mutex);
+        pthread_mutex_unlock(&p->mutex);
+    } while (1);
 
     free(p);
     return(0);
@@ -162,7 +142,9 @@ static nal_t api_nal = {
     unlock:   procbridge_unlock
 };
 
-/* Function: bridge_init
+ptl_nid_t tcpnal_mynid;
+
+/* Function: procbridge_interface
  *
  * Arguments:  pid: requested process id (port offset)
  *                  PTL_ID_ANY not supported.
@@ -176,77 +158,17 @@ static nal_t api_nal = {
  * initializes the tcp nal. we define unix_failure as an
  * error wrapper to cut down clutter.
  */
-#define unix_failure(operand,fd,buffer,length,text)\
-       if(syscall(SYS_##operand,fd,buffer,length)!=length){\
-          perror(text);\
-          return(NULL);\
-       }
-#if 0
-static nal_t *bridge_init(ptl_interface_t nal,
-                          ptl_pid_t pid_request,
-                          ptl_ni_limits_t *desired,
-                          ptl_ni_limits_t *actual,
-                          int *rc)
-{
-    procbridge p;
-    bridge b;
-    static int initialized=0;
-    ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
-
-    if(initialized) return (&api_nal);
-
-    init_unix_timer();
-
-    b=(bridge)malloc(sizeof(struct bridge));
-    p=(procbridge)malloc(sizeof(struct procbridge));
-    api_nal.nal_data=b;
-    b->local=p;
-
-    if(pipe(p->to_lib) || pipe(p->from_lib)) {
-        perror("nal_init: pipe");
-        return(NULL);
-    }
-
-    if (desired) limits = *desired;
-    unix_failure(write,p->to_lib[1], &pid_request, sizeof(pid_request),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &nal, sizeof(ptl_interface_t),
-                       "nal_init: write");
-
-    if(pthread_create(&p->t, NULL, nal_thread, b)) {
-        perror("nal_init: pthread_create");
-        return(NULL);
-    }
-
-    unix_failure(read,p->from_lib[0], actual, sizeof(ptl_ni_limits_t),
-                 "tcp_init: read");
-    unix_failure(read,p->from_lib[0], rc, sizeof(rc),
-                 "nal_init: read");
-
-    if(*rc) return(NULL);
-
-    initialized = 1;
-    pthread_mutex_init(&p->mutex,0);
-    pthread_cond_init(&p->cond, 0);
-
-    return (&api_nal);
-}
-#endif
-
-ptl_nid_t tcpnal_mynid;
-
 nal_t *procbridge_interface(int num_interface,
                             ptl_pt_index_t ptl_size,
                             ptl_ac_index_t acl_size,
                             ptl_pid_t requested_pid)
 {
+    nal_init_args_t args;
     procbridge p;
     bridge b;
     static int initialized=0;
     ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
-    int rc, nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
+    int nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
 
     if(initialized) return (&api_nal);
 
@@ -257,38 +179,40 @@ nal_t *procbridge_interface(int num_interface,
     api_nal.nal_data=b;
     b->local=p;
 
-    if(pipe(p->to_lib) || pipe(p->from_lib)) {
-        perror("nal_init: pipe");
-        return(NULL);
-    }
-
     if (ptl_size)
            limits.max_ptable_index = ptl_size;
     if (acl_size)
            limits.max_atable_index = acl_size;
 
-    unix_failure(write,p->to_lib[1], &requested_pid, sizeof(requested_pid),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &nal_type, sizeof(nal_type),
-                       "nal_init: write");
+    args.nia_requested_pid = requested_pid;
+    args.nia_limits = &limits;
+    args.nia_nal_type = nal_type;
+    args.nia_bridge = b;
 
-    if(pthread_create(&p->t, NULL, nal_thread, b)) {
+    pthread_mutex_init(&p->mutex,0);
+    pthread_cond_init(&p->cond, 0);
+    p->nal_flags = 0;
+
+    if (pthread_create(&p->t, NULL, nal_thread, &args)) {
         perror("nal_init: pthread_create");
         return(NULL);
     }
 
-    unix_failure(read,p->from_lib[0], &rc, sizeof(rc),
-                 "nal_init: read");
-
-    if(rc) return(NULL);
+    do {
+        pthread_mutex_lock(&p->mutex);
+        if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) {
+                pthread_mutex_unlock(&p->mutex);
+                break;
+        }
+        pthread_cond_wait(&p->cond, &p->mutex);
+        pthread_mutex_unlock(&p->mutex);
+    } while (1);
+
+    if (p->nal_flags & NAL_FLAG_STOPPED)
+        return (NULL);
 
     b->nal_cb->ni.nid = tcpnal_mynid;
     initialized = 1;
-    pthread_mutex_init(&p->mutex,0);
-    pthread_cond_init(&p->cond, 0);
 
     return (&api_nal);
 }
-#undef unix_failure
index 060ae7b..f65b3bf 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  */
 #include <ipmap.h>
 
 
+#define NAL_FLAG_RUNNING        1
+#define NAL_FLAG_STOPPING       2
+#define NAL_FLAG_STOPPED        4
+
 typedef struct procbridge {
     pthread_t t;
     pthread_cond_t cond;
     pthread_mutex_t mutex;
-    int to_lib[2];
-    int from_lib[2];
+
+    int nal_flags;
 } *procbridge;
 
+typedef struct nal_init_args {
+    ptl_pid_t        nia_requested_pid;
+    ptl_ni_limits_t *nia_limits;
+    int              nia_nal_type;
+    bridge           nia_bridge;
+} nal_init_args_t;
+
 extern void *nal_thread(void *);
 
 
@@ -33,8 +45,8 @@ extern void *nal_thread(void *);
 
 extern void set_address(bridge t,ptl_pid_t pidrequest);
 extern nal_t *procbridge_interface(int num_interface,
-                            ptl_pt_index_t ptl_size,
-                            ptl_ac_index_t acl_size,
-                            ptl_pid_t requested_pid);
+                                   ptl_pt_index_t ptl_size,
+                                   ptl_ac_index_t acl_size,
+                                   ptl_pid_t requested_pid);
 
 #endif
index c3ee103..38bbf28 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  *
@@ -38,7 +39,6 @@
 #include <netdb.h>
 #include <errno.h>
 #include <timer.h>
-//#include <util/pqtimer.h>
 #include <dispatch.h>
 
 /* the following functions are stubs to satisfy the nal definition
@@ -108,69 +108,22 @@ static int nal_dist(nal_cb_t *nal,
 {
     return 0;
 }
-    
-
-
-/* Function:  data_from_api
- * Arguments: t: the nal state for this interface
- * Returns: whether to continue reading from the pipe
- *
- *   data_from_api() reads data from the api side in response
- *   to a select.
- *
- *   We define data_failure() for syntactic convenience
- *   of unix error reporting.
- */
-
-#define data_failure(operand,fd,buffer,length)\
-       if(syscall(SYS_##operand,fd,buffer,length)!=length){\
-          lib_fini(b->nal_cb);\
-          return(0);\
-       }
-static int data_from_api(void *arg)
-{
-        bridge b = arg;
-    procbridge p=(procbridge)b->local;
-    /* where are these two sizes derived from ??*/
-    char arg_block[ 256 ];
-    char ret_block[ 128 ];
-    ptl_size_t arg_len,ret_len;
-    int fd=p->to_lib[0];
-    int index;
-
-    data_failure(read,fd, &index, sizeof(index));
-
-    if (index==PTL_FINI) {
-        lib_fini(b->nal_cb);
-        if (b->shutdown) (*b->shutdown)(b);
-        syscall(SYS_write, p->from_lib[1],&b->alive,sizeof(b->alive));
-
-        /* a heavy-handed but convenient way of shutting down
-           the lower side thread */
-        pthread_exit(0);
-    }
-
-    data_failure(read,fd, &arg_len, sizeof(arg_len));
-    data_failure(read,fd, &ret_len, sizeof(ret_len));
-    data_failure(read,fd, arg_block, arg_len);
-
-    lib_dispatch(b->nal_cb, NULL, index, arg_block, ret_block);
-
-    data_failure(write,p->from_lib[1],ret_block, ret_len);
-    return(1);
-}
-#undef data_failure
-
-
 
 static void wakeup_topside(void *z)
 {
-    bridge b=z;
-    procbridge p=b->local;
+    bridge b = z;
+    procbridge p = b->local;
+    int stop;
 
     pthread_mutex_lock(&p->mutex);
+    stop = p->nal_flags & NAL_FLAG_STOPPING;
+    if (stop)
+        p->nal_flags |= NAL_FLAG_STOPPED;
     pthread_cond_broadcast(&p->cond);
     pthread_mutex_unlock(&p->mutex);
+
+    if (stop)
+        pthread_exit(0);
 }
 
 
@@ -195,7 +148,8 @@ nal_initialize nal_table[PTL_IFACE_MAX]={0,tcpnal_init,0};
 
 void *nal_thread(void *z)
 {
-    bridge b=z;
+    nal_init_args_t *args = (nal_init_args_t *) z;
+    bridge b = args->nia_bridge;
     procbridge p=b->local;
     int rc;
     ptl_pid_t pid_request;
@@ -216,15 +170,9 @@ void *nal_thread(void *z)
     b->nal_cb->cb_sti=nal_sti;
     b->nal_cb->cb_dist=nal_dist;
 
-
-    register_io_handler(p->to_lib[0],READ_HANDLER,data_from_api,(void *)b);
-
-    if(!(rc = syscall(SYS_read, p->to_lib[0], &pid_request, sizeof(pid_request))))
-        perror("procbridge read from api");
-    if(!(rc = syscall(SYS_read, p->to_lib[0], &desired, sizeof(ptl_ni_limits_t))))
-        perror("procbridge read from api");
-    if(!(rc = syscall(SYS_read, p->to_lib[0], &nal_type, sizeof(nal_type))))
-        perror("procbridge read from api");
+    pid_request = args->nia_requested_pid;
+    desired = *args->nia_limits;
+    nal_type = args->nia_nal_type;
 
     actual = desired;
     LIMIT(desired.max_match_entries,actual.max_match_entries,MAX_MES);
@@ -251,12 +199,12 @@ void *nal_thread(void *z)
      * it is non-zero since something went wrong.
      */
     /* this should perform error checking */
-#if 0
-    write(p->from_lib[1], &actual, sizeof(ptl_ni_limits_t));
-#endif
-    syscall(SYS_write, p->from_lib[1], &rc, sizeof(rc));
-    
-    if(!rc) {
+    pthread_mutex_lock(&p->mutex);
+    p->nal_flags |= rc ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING;
+    pthread_cond_broadcast(&p->cond);
+    pthread_mutex_unlock(&p->mutex);
+
+    if (!rc) {
         /* the thunk function is called each time the timer loop
            performs an operation and returns to blocking mode. we
            overload this function to inform the api side that
@@ -267,4 +215,3 @@ void *nal_thread(void *z)
     return(0);
 }
 #undef LIMIT
-
index c4f84f4..bcfba02 100644 (file)
@@ -97,9 +97,9 @@ void remove_io_handler (io_handler i)
 
 static void set_flag(io_handler n,fd_set *fds)
 {
-    if (n->type & READ_HANDLER) FD_SET(n->fd,fds);
-    if (n->type & WRITE_HANDLER) FD_SET(n->fd,fds+1);
-    if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd,fds+2);
+    if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]);
+    if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]);
+    if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]);
 }
 
 
@@ -126,9 +126,9 @@ void select_timer_block(when until)
         timeout_pointer=&timeout;
     } else timeout_pointer=0;
 
-    FD_ZERO(fds);
-    FD_ZERO(fds+1);
-    FD_ZERO(fds+2);
+    FD_ZERO(&fds[0]);
+    FD_ZERO(&fds[1]);
+    FD_ZERO(&fds[2]);
     for (k=&io_handlers;*k;){
         if ((*k)->disabled){
             j=*k;
@@ -140,14 +140,15 @@ void select_timer_block(when until)
            k=&(*k)->next;
        }
     }
-    result=select(FD_SETSIZE,fds,fds+1,fds+2,timeout_pointer);
+
+    result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer);
 
     if (result > 0)
         for (j=io_handlers;j;j=j->next){
             if (!(j->disabled) && 
-                ((FD_ISSET(j->fd,fds) && (j->type & READ_HANDLER)) ||
-                 (FD_ISSET(j->fd,fds+1) && (j->type & WRITE_HANDLER)) ||
-                 (FD_ISSET(j->fd,fds+2) && (j->type & EXCEPTION_HANDLER)))){
+                ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) ||
+                 (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) ||
+                 (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){
                 if (!(*j->function)(j->argument))
                     j->disabled=1;
             }
index 7c7c94d..ae97f92 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  *
@@ -50,7 +51,6 @@
  *
  * sends a packet to the peer, after insuring that a connection exists
  */
-#warning FIXME: "param 'type' is newly added, make use of it!!"
 int tcpnal_send(nal_cb_t *n,
                void *private,
                lib_msg_t *cookie,
@@ -163,15 +163,15 @@ finalize:
  */
 static int from_connection(void *a, void *d)
 {
-        connection c = d;
-        bridge b=a;
-        ptl_hdr_t hdr;
-
-        if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
-                lib_parse(b->nal_cb, &hdr, c);
-                return(1);
-        }
-        return(0);
+    connection c = d;
+    bridge b = a;
+    ptl_hdr_t hdr;
+
+    if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
+        lib_parse(b->nal_cb, &hdr, c);
+        return(1);
+    }
+    return(0);
 }
 
 
index 7c7c94d..ae97f92 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  *
@@ -50,7 +51,6 @@
  *
  * sends a packet to the peer, after insuring that a connection exists
  */
-#warning FIXME: "param 'type' is newly added, make use of it!!"
 int tcpnal_send(nal_cb_t *n,
                void *private,
                lib_msg_t *cookie,
@@ -163,15 +163,15 @@ finalize:
  */
 static int from_connection(void *a, void *d)
 {
-        connection c = d;
-        bridge b=a;
-        ptl_hdr_t hdr;
-
-        if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
-                lib_parse(b->nal_cb, &hdr, c);
-                return(1);
-        }
-        return(0);
+    connection c = d;
+    bridge b = a;
+    ptl_hdr_t hdr;
+
+    if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
+        lib_parse(b->nal_cb, &hdr, c);
+        return(1);
+    }
+    return(0);
 }
 
 
index dfcd743..724ee74 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  *
  * forwards a packaged api call from the 'api' side to the 'library'
  *   side, and collects the result
  */
-/* XXX CFS workaround:
- * when multiple threads call forward at the same time, data in the
- * pipe will be trampled. add a mutex to serialize the access, as
- * a temporary solution.
- */
-#define forward_failure(operand,fd,buffer,length)\
-       if(syscall(SYS_##operand,fd,buffer,length)!=length){\
-          lib_fini(b->nal_cb);\
-          return(PTL_SEGV);\
-       }
 static int procbridge_forward(nal_t *n, int id, void *args, ptl_size_t args_len,
                              void *ret, ptl_size_t ret_len)
 {
-    bridge b=(bridge)n->nal_data;
-    procbridge p=(procbridge)b->local;
-    int lib=p->to_lib[1];
-    static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
-    int k;
-
-    /* protect the whole access to pipe */
-    pthread_mutex_lock(&mut);
+    bridge b = (bridge) n->nal_data;
 
-    forward_failure(write,lib, &id, sizeof(id));
-    forward_failure(write,lib,&args_len, sizeof(args_len));
-    forward_failure(write,lib,&ret_len, sizeof(ret_len));
-    forward_failure(write,lib,args, args_len);
+    if (id == PTL_FINI) {
+            lib_fini(b->nal_cb);
 
-    do {
-        k=syscall(SYS_read, p->from_lib[0], ret, ret_len);
-    } while ((k!=ret_len) && (errno += EINTR));
+            if (b->shutdown)
+                (*b->shutdown)(b);
+    }
 
-    pthread_mutex_unlock(&mut);
+    lib_dispatch(b->nal_cb, NULL, id, args, ret);
 
-    if(k!=ret_len){
-        perror("nal: read return block");
-        return PTL_SEGV;
-    }
     return (PTL_OK);
 }
-#undef forward_failure
 
 
 /* Function: shutdown
@@ -101,15 +78,18 @@ static int procbridge_shutdown(nal_t *n, int ni)
 {
     bridge b=(bridge)n->nal_data;
     procbridge p=(procbridge)b->local;
-    int code=PTL_FINI;
 
-    syscall(SYS_write, p->to_lib[1],&code,sizeof(code));
-    syscall(SYS_read, p->from_lib[0],&code,sizeof(code));
+    p->nal_flags |= NAL_FLAG_STOPPING;
 
-    syscall(SYS_close, p->to_lib[0]);
-    syscall(SYS_close, p->to_lib[1]);
-    syscall(SYS_close, p->from_lib[0]);
-    syscall(SYS_close, p->from_lib[1]);
+    do {
+        pthread_mutex_lock(&p->mutex);
+        if (p->nal_flags & NAL_FLAG_STOPPED) {
+                pthread_mutex_unlock(&p->mutex);
+                break;
+        }
+        pthread_cond_wait(&p->cond, &p->mutex);
+        pthread_mutex_unlock(&p->mutex);
+    } while (1);
 
     free(p);
     return(0);
@@ -162,7 +142,9 @@ static nal_t api_nal = {
     unlock:   procbridge_unlock
 };
 
-/* Function: bridge_init
+ptl_nid_t tcpnal_mynid;
+
+/* Function: procbridge_interface
  *
  * Arguments:  pid: requested process id (port offset)
  *                  PTL_ID_ANY not supported.
@@ -176,77 +158,17 @@ static nal_t api_nal = {
  * initializes the tcp nal. we define unix_failure as an
  * error wrapper to cut down clutter.
  */
-#define unix_failure(operand,fd,buffer,length,text)\
-       if(syscall(SYS_##operand,fd,buffer,length)!=length){\
-          perror(text);\
-          return(NULL);\
-       }
-#if 0
-static nal_t *bridge_init(ptl_interface_t nal,
-                          ptl_pid_t pid_request,
-                          ptl_ni_limits_t *desired,
-                          ptl_ni_limits_t *actual,
-                          int *rc)
-{
-    procbridge p;
-    bridge b;
-    static int initialized=0;
-    ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
-
-    if(initialized) return (&api_nal);
-
-    init_unix_timer();
-
-    b=(bridge)malloc(sizeof(struct bridge));
-    p=(procbridge)malloc(sizeof(struct procbridge));
-    api_nal.nal_data=b;
-    b->local=p;
-
-    if(pipe(p->to_lib) || pipe(p->from_lib)) {
-        perror("nal_init: pipe");
-        return(NULL);
-    }
-
-    if (desired) limits = *desired;
-    unix_failure(write,p->to_lib[1], &pid_request, sizeof(pid_request),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &nal, sizeof(ptl_interface_t),
-                       "nal_init: write");
-
-    if(pthread_create(&p->t, NULL, nal_thread, b)) {
-        perror("nal_init: pthread_create");
-        return(NULL);
-    }
-
-    unix_failure(read,p->from_lib[0], actual, sizeof(ptl_ni_limits_t),
-                 "tcp_init: read");
-    unix_failure(read,p->from_lib[0], rc, sizeof(rc),
-                 "nal_init: read");
-
-    if(*rc) return(NULL);
-
-    initialized = 1;
-    pthread_mutex_init(&p->mutex,0);
-    pthread_cond_init(&p->cond, 0);
-
-    return (&api_nal);
-}
-#endif
-
-ptl_nid_t tcpnal_mynid;
-
 nal_t *procbridge_interface(int num_interface,
                             ptl_pt_index_t ptl_size,
                             ptl_ac_index_t acl_size,
                             ptl_pid_t requested_pid)
 {
+    nal_init_args_t args;
     procbridge p;
     bridge b;
     static int initialized=0;
     ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
-    int rc, nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
+    int nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
 
     if(initialized) return (&api_nal);
 
@@ -257,38 +179,40 @@ nal_t *procbridge_interface(int num_interface,
     api_nal.nal_data=b;
     b->local=p;
 
-    if(pipe(p->to_lib) || pipe(p->from_lib)) {
-        perror("nal_init: pipe");
-        return(NULL);
-    }
-
     if (ptl_size)
            limits.max_ptable_index = ptl_size;
     if (acl_size)
            limits.max_atable_index = acl_size;
 
-    unix_failure(write,p->to_lib[1], &requested_pid, sizeof(requested_pid),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
-                       "nal_init: write");
-    unix_failure(write,p->to_lib[1], &nal_type, sizeof(nal_type),
-                       "nal_init: write");
+    args.nia_requested_pid = requested_pid;
+    args.nia_limits = &limits;
+    args.nia_nal_type = nal_type;
+    args.nia_bridge = b;
 
-    if(pthread_create(&p->t, NULL, nal_thread, b)) {
+    pthread_mutex_init(&p->mutex,0);
+    pthread_cond_init(&p->cond, 0);
+    p->nal_flags = 0;
+
+    if (pthread_create(&p->t, NULL, nal_thread, &args)) {
         perror("nal_init: pthread_create");
         return(NULL);
     }
 
-    unix_failure(read,p->from_lib[0], &rc, sizeof(rc),
-                 "nal_init: read");
-
-    if(rc) return(NULL);
+    do {
+        pthread_mutex_lock(&p->mutex);
+        if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) {
+                pthread_mutex_unlock(&p->mutex);
+                break;
+        }
+        pthread_cond_wait(&p->cond, &p->mutex);
+        pthread_mutex_unlock(&p->mutex);
+    } while (1);
+
+    if (p->nal_flags & NAL_FLAG_STOPPED)
+        return (NULL);
 
     b->nal_cb->ni.nid = tcpnal_mynid;
     initialized = 1;
-    pthread_mutex_init(&p->mutex,0);
-    pthread_cond_init(&p->cond, 0);
 
     return (&api_nal);
 }
-#undef unix_failure
index 060ae7b..f65b3bf 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  */
 #include <ipmap.h>
 
 
+#define NAL_FLAG_RUNNING        1
+#define NAL_FLAG_STOPPING       2
+#define NAL_FLAG_STOPPED        4
+
 typedef struct procbridge {
     pthread_t t;
     pthread_cond_t cond;
     pthread_mutex_t mutex;
-    int to_lib[2];
-    int from_lib[2];
+
+    int nal_flags;
 } *procbridge;
 
+typedef struct nal_init_args {
+    ptl_pid_t        nia_requested_pid;
+    ptl_ni_limits_t *nia_limits;
+    int              nia_nal_type;
+    bridge           nia_bridge;
+} nal_init_args_t;
+
 extern void *nal_thread(void *);
 
 
@@ -33,8 +45,8 @@ extern void *nal_thread(void *);
 
 extern void set_address(bridge t,ptl_pid_t pidrequest);
 extern nal_t *procbridge_interface(int num_interface,
-                            ptl_pt_index_t ptl_size,
-                            ptl_ac_index_t acl_size,
-                            ptl_pid_t requested_pid);
+                                   ptl_pt_index_t ptl_size,
+                                   ptl_ac_index_t acl_size,
+                                   ptl_pid_t requested_pid);
 
 #endif
index c3ee103..38bbf28 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  *
@@ -38,7 +39,6 @@
 #include <netdb.h>
 #include <errno.h>
 #include <timer.h>
-//#include <util/pqtimer.h>
 #include <dispatch.h>
 
 /* the following functions are stubs to satisfy the nal definition
@@ -108,69 +108,22 @@ static int nal_dist(nal_cb_t *nal,
 {
     return 0;
 }
-    
-
-
-/* Function:  data_from_api
- * Arguments: t: the nal state for this interface
- * Returns: whether to continue reading from the pipe
- *
- *   data_from_api() reads data from the api side in response
- *   to a select.
- *
- *   We define data_failure() for syntactic convenience
- *   of unix error reporting.
- */
-
-#define data_failure(operand,fd,buffer,length)\
-       if(syscall(SYS_##operand,fd,buffer,length)!=length){\
-          lib_fini(b->nal_cb);\
-          return(0);\
-       }
-static int data_from_api(void *arg)
-{
-        bridge b = arg;
-    procbridge p=(procbridge)b->local;
-    /* where are these two sizes derived from ??*/
-    char arg_block[ 256 ];
-    char ret_block[ 128 ];
-    ptl_size_t arg_len,ret_len;
-    int fd=p->to_lib[0];
-    int index;
-
-    data_failure(read,fd, &index, sizeof(index));
-
-    if (index==PTL_FINI) {
-        lib_fini(b->nal_cb);
-        if (b->shutdown) (*b->shutdown)(b);
-        syscall(SYS_write, p->from_lib[1],&b->alive,sizeof(b->alive));
-
-        /* a heavy-handed but convenient way of shutting down
-           the lower side thread */
-        pthread_exit(0);
-    }
-
-    data_failure(read,fd, &arg_len, sizeof(arg_len));
-    data_failure(read,fd, &ret_len, sizeof(ret_len));
-    data_failure(read,fd, arg_block, arg_len);
-
-    lib_dispatch(b->nal_cb, NULL, index, arg_block, ret_block);
-
-    data_failure(write,p->from_lib[1],ret_block, ret_len);
-    return(1);
-}
-#undef data_failure
-
-
 
 static void wakeup_topside(void *z)
 {
-    bridge b=z;
-    procbridge p=b->local;
+    bridge b = z;
+    procbridge p = b->local;
+    int stop;
 
     pthread_mutex_lock(&p->mutex);
+    stop = p->nal_flags & NAL_FLAG_STOPPING;
+    if (stop)
+        p->nal_flags |= NAL_FLAG_STOPPED;
     pthread_cond_broadcast(&p->cond);
     pthread_mutex_unlock(&p->mutex);
+
+    if (stop)
+        pthread_exit(0);
 }
 
 
@@ -195,7 +148,8 @@ nal_initialize nal_table[PTL_IFACE_MAX]={0,tcpnal_init,0};
 
 void *nal_thread(void *z)
 {
-    bridge b=z;
+    nal_init_args_t *args = (nal_init_args_t *) z;
+    bridge b = args->nia_bridge;
     procbridge p=b->local;
     int rc;
     ptl_pid_t pid_request;
@@ -216,15 +170,9 @@ void *nal_thread(void *z)
     b->nal_cb->cb_sti=nal_sti;
     b->nal_cb->cb_dist=nal_dist;
 
-
-    register_io_handler(p->to_lib[0],READ_HANDLER,data_from_api,(void *)b);
-
-    if(!(rc = syscall(SYS_read, p->to_lib[0], &pid_request, sizeof(pid_request))))
-        perror("procbridge read from api");
-    if(!(rc = syscall(SYS_read, p->to_lib[0], &desired, sizeof(ptl_ni_limits_t))))
-        perror("procbridge read from api");
-    if(!(rc = syscall(SYS_read, p->to_lib[0], &nal_type, sizeof(nal_type))))
-        perror("procbridge read from api");
+    pid_request = args->nia_requested_pid;
+    desired = *args->nia_limits;
+    nal_type = args->nia_nal_type;
 
     actual = desired;
     LIMIT(desired.max_match_entries,actual.max_match_entries,MAX_MES);
@@ -251,12 +199,12 @@ void *nal_thread(void *z)
      * it is non-zero since something went wrong.
      */
     /* this should perform error checking */
-#if 0
-    write(p->from_lib[1], &actual, sizeof(ptl_ni_limits_t));
-#endif
-    syscall(SYS_write, p->from_lib[1], &rc, sizeof(rc));
-    
-    if(!rc) {
+    pthread_mutex_lock(&p->mutex);
+    p->nal_flags |= rc ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING;
+    pthread_cond_broadcast(&p->cond);
+    pthread_mutex_unlock(&p->mutex);
+
+    if (!rc) {
         /* the thunk function is called each time the timer loop
            performs an operation and returns to blocking mode. we
            overload this function to inform the api side that
@@ -267,4 +215,3 @@ void *nal_thread(void *z)
     return(0);
 }
 #undef LIMIT
-
index c4f84f4..bcfba02 100644 (file)
@@ -97,9 +97,9 @@ void remove_io_handler (io_handler i)
 
 static void set_flag(io_handler n,fd_set *fds)
 {
-    if (n->type & READ_HANDLER) FD_SET(n->fd,fds);
-    if (n->type & WRITE_HANDLER) FD_SET(n->fd,fds+1);
-    if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd,fds+2);
+    if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]);
+    if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]);
+    if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]);
 }
 
 
@@ -126,9 +126,9 @@ void select_timer_block(when until)
         timeout_pointer=&timeout;
     } else timeout_pointer=0;
 
-    FD_ZERO(fds);
-    FD_ZERO(fds+1);
-    FD_ZERO(fds+2);
+    FD_ZERO(&fds[0]);
+    FD_ZERO(&fds[1]);
+    FD_ZERO(&fds[2]);
     for (k=&io_handlers;*k;){
         if ((*k)->disabled){
             j=*k;
@@ -140,14 +140,15 @@ void select_timer_block(when until)
            k=&(*k)->next;
        }
     }
-    result=select(FD_SETSIZE,fds,fds+1,fds+2,timeout_pointer);
+
+    result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer);
 
     if (result > 0)
         for (j=io_handlers;j;j=j->next){
             if (!(j->disabled) && 
-                ((FD_ISSET(j->fd,fds) && (j->type & READ_HANDLER)) ||
-                 (FD_ISSET(j->fd,fds+1) && (j->type & WRITE_HANDLER)) ||
-                 (FD_ISSET(j->fd,fds+2) && (j->type & EXCEPTION_HANDLER)))){
+                ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) ||
+                 (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) ||
+                 (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){
                 if (!(*j->function)(j->argument))
                     j->disabled=1;
             }
index 7c7c94d..ae97f92 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  Copyright (c) 2002 Cray Inc.
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
  *
  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  *
@@ -50,7 +51,6 @@
  *
  * sends a packet to the peer, after insuring that a connection exists
  */
-#warning FIXME: "param 'type' is newly added, make use of it!!"
 int tcpnal_send(nal_cb_t *n,
                void *private,
                lib_msg_t *cookie,
@@ -163,15 +163,15 @@ finalize:
  */
 static int from_connection(void *a, void *d)
 {
-        connection c = d;
-        bridge b=a;
-        ptl_hdr_t hdr;
-
-        if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
-                lib_parse(b->nal_cb, &hdr, c);
-                return(1);
-        }
-        return(0);
+    connection c = d;
+    bridge b = a;
+    ptl_hdr_t hdr;
+
+    if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
+        lib_parse(b->nal_cb, &hdr, c);
+        return(1);
+    }
+    return(0);
 }