From 158710368a02f079ae552eee413021fef4733963 Mon Sep 17 00:00:00 2001 From: ericm Date: Wed, 20 Aug 2003 07:22:07 +0000 Subject: [PATCH] [tcpnal]: rewrite the data out path. Get rid of pipe as a bridge between user threads and nal threads, which is not so efficient and don't work well in Cygwin. Now user thread might directly call into portals lib via lib_dispatch. 2 issues remain: - sometimes during startup low level nal threads could run into endless waiting. - need more sycronous facilities to prevent race. --- lnet/ulnds/procapi.c | 164 ++++++++++---------------------------- lnet/ulnds/procbridge.h | 22 +++-- lnet/ulnds/proclib.c | 95 +++++----------------- lnet/ulnds/select.c | 21 ++--- lnet/ulnds/socklnd/procapi.c | 164 ++++++++++---------------------------- lnet/ulnds/socklnd/procbridge.h | 22 +++-- lnet/ulnds/socklnd/proclib.c | 95 +++++----------------- lnet/ulnds/socklnd/select.c | 21 ++--- lnet/ulnds/socklnd/tcplnd.c | 20 ++--- lnet/ulnds/tcplnd.c | 20 ++--- lustre/portals/unals/procapi.c | 164 ++++++++++---------------------------- lustre/portals/unals/procbridge.h | 22 +++-- lustre/portals/unals/proclib.c | 95 +++++----------------- lustre/portals/unals/select.c | 21 ++--- lustre/portals/unals/tcpnal.c | 20 ++--- 15 files changed, 309 insertions(+), 657 deletions(-) diff --git a/lnet/ulnds/procapi.c b/lnet/ulnds/procapi.c index dfcd743..724ee74 100644 --- a/lnet/ulnds/procapi.c +++ b/lnet/ulnds/procapi.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * @@ -48,46 +49,22 @@ * forwards a packaged api call from the 'api' side to the 'library' * side, and collects the result */ -/* XXX CFS workaround: - * when multiple threads call forward at the same time, data in the - * pipe will be trampled. add a mutex to serialize the access, as - * a temporary solution. - */ -#define forward_failure(operand,fd,buffer,length)\ - if(syscall(SYS_##operand,fd,buffer,length)!=length){\ - lib_fini(b->nal_cb);\ - return(PTL_SEGV);\ - } static int procbridge_forward(nal_t *n, int id, void *args, ptl_size_t args_len, void *ret, ptl_size_t ret_len) { - bridge b=(bridge)n->nal_data; - procbridge p=(procbridge)b->local; - int lib=p->to_lib[1]; - static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; - int k; - - /* protect the whole access to pipe */ - pthread_mutex_lock(&mut); + bridge b = (bridge) n->nal_data; - forward_failure(write,lib, &id, sizeof(id)); - forward_failure(write,lib,&args_len, sizeof(args_len)); - forward_failure(write,lib,&ret_len, sizeof(ret_len)); - forward_failure(write,lib,args, args_len); + if (id == PTL_FINI) { + lib_fini(b->nal_cb); - do { - k=syscall(SYS_read, p->from_lib[0], ret, ret_len); - } while ((k!=ret_len) && (errno += EINTR)); + if (b->shutdown) + (*b->shutdown)(b); + } - pthread_mutex_unlock(&mut); + lib_dispatch(b->nal_cb, NULL, id, args, ret); - if(k!=ret_len){ - perror("nal: read return block"); - return PTL_SEGV; - } return (PTL_OK); } -#undef forward_failure /* Function: shutdown @@ -101,15 +78,18 @@ static int procbridge_shutdown(nal_t *n, int ni) { bridge b=(bridge)n->nal_data; procbridge p=(procbridge)b->local; - int code=PTL_FINI; - syscall(SYS_write, p->to_lib[1],&code,sizeof(code)); - syscall(SYS_read, p->from_lib[0],&code,sizeof(code)); + p->nal_flags |= NAL_FLAG_STOPPING; - syscall(SYS_close, p->to_lib[0]); - syscall(SYS_close, p->to_lib[1]); - syscall(SYS_close, p->from_lib[0]); - syscall(SYS_close, p->from_lib[1]); + do { + pthread_mutex_lock(&p->mutex); + if (p->nal_flags & NAL_FLAG_STOPPED) { + pthread_mutex_unlock(&p->mutex); + break; + } + pthread_cond_wait(&p->cond, &p->mutex); + pthread_mutex_unlock(&p->mutex); + } while (1); free(p); return(0); @@ -162,7 +142,9 @@ static nal_t api_nal = { unlock: procbridge_unlock }; -/* Function: bridge_init +ptl_nid_t tcpnal_mynid; + +/* Function: procbridge_interface * * Arguments: pid: requested process id (port offset) * PTL_ID_ANY not supported. @@ -176,77 +158,17 @@ static nal_t api_nal = { * initializes the tcp nal. we define unix_failure as an * error wrapper to cut down clutter. */ -#define unix_failure(operand,fd,buffer,length,text)\ - if(syscall(SYS_##operand,fd,buffer,length)!=length){\ - perror(text);\ - return(NULL);\ - } -#if 0 -static nal_t *bridge_init(ptl_interface_t nal, - ptl_pid_t pid_request, - ptl_ni_limits_t *desired, - ptl_ni_limits_t *actual, - int *rc) -{ - procbridge p; - bridge b; - static int initialized=0; - ptl_ni_limits_t limits = {-1,-1,-1,-1,-1}; - - if(initialized) return (&api_nal); - - init_unix_timer(); - - b=(bridge)malloc(sizeof(struct bridge)); - p=(procbridge)malloc(sizeof(struct procbridge)); - api_nal.nal_data=b; - b->local=p; - - if(pipe(p->to_lib) || pipe(p->from_lib)) { - perror("nal_init: pipe"); - return(NULL); - } - - if (desired) limits = *desired; - unix_failure(write,p->to_lib[1], &pid_request, sizeof(pid_request), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &nal, sizeof(ptl_interface_t), - "nal_init: write"); - - if(pthread_create(&p->t, NULL, nal_thread, b)) { - perror("nal_init: pthread_create"); - return(NULL); - } - - unix_failure(read,p->from_lib[0], actual, sizeof(ptl_ni_limits_t), - "tcp_init: read"); - unix_failure(read,p->from_lib[0], rc, sizeof(rc), - "nal_init: read"); - - if(*rc) return(NULL); - - initialized = 1; - pthread_mutex_init(&p->mutex,0); - pthread_cond_init(&p->cond, 0); - - return (&api_nal); -} -#endif - -ptl_nid_t tcpnal_mynid; - nal_t *procbridge_interface(int num_interface, ptl_pt_index_t ptl_size, ptl_ac_index_t acl_size, ptl_pid_t requested_pid) { + nal_init_args_t args; procbridge p; bridge b; static int initialized=0; ptl_ni_limits_t limits = {-1,-1,-1,-1,-1}; - int rc, nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */ + int nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */ if(initialized) return (&api_nal); @@ -257,38 +179,40 @@ nal_t *procbridge_interface(int num_interface, api_nal.nal_data=b; b->local=p; - if(pipe(p->to_lib) || pipe(p->from_lib)) { - perror("nal_init: pipe"); - return(NULL); - } - if (ptl_size) limits.max_ptable_index = ptl_size; if (acl_size) limits.max_atable_index = acl_size; - unix_failure(write,p->to_lib[1], &requested_pid, sizeof(requested_pid), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &nal_type, sizeof(nal_type), - "nal_init: write"); + args.nia_requested_pid = requested_pid; + args.nia_limits = &limits; + args.nia_nal_type = nal_type; + args.nia_bridge = b; - if(pthread_create(&p->t, NULL, nal_thread, b)) { + pthread_mutex_init(&p->mutex,0); + pthread_cond_init(&p->cond, 0); + p->nal_flags = 0; + + if (pthread_create(&p->t, NULL, nal_thread, &args)) { perror("nal_init: pthread_create"); return(NULL); } - unix_failure(read,p->from_lib[0], &rc, sizeof(rc), - "nal_init: read"); - - if(rc) return(NULL); + do { + pthread_mutex_lock(&p->mutex); + if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) { + pthread_mutex_unlock(&p->mutex); + break; + } + pthread_cond_wait(&p->cond, &p->mutex); + pthread_mutex_unlock(&p->mutex); + } while (1); + + if (p->nal_flags & NAL_FLAG_STOPPED) + return (NULL); b->nal_cb->ni.nid = tcpnal_mynid; initialized = 1; - pthread_mutex_init(&p->mutex,0); - pthread_cond_init(&p->cond, 0); return (&api_nal); } -#undef unix_failure diff --git a/lnet/ulnds/procbridge.h b/lnet/ulnds/procbridge.h index 060ae7b..f65b3bf 100644 --- a/lnet/ulnds/procbridge.h +++ b/lnet/ulnds/procbridge.h @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ */ @@ -14,14 +15,25 @@ #include +#define NAL_FLAG_RUNNING 1 +#define NAL_FLAG_STOPPING 2 +#define NAL_FLAG_STOPPED 4 + typedef struct procbridge { pthread_t t; pthread_cond_t cond; pthread_mutex_t mutex; - int to_lib[2]; - int from_lib[2]; + + int nal_flags; } *procbridge; +typedef struct nal_init_args { + ptl_pid_t nia_requested_pid; + ptl_ni_limits_t *nia_limits; + int nia_nal_type; + bridge nia_bridge; +} nal_init_args_t; + extern void *nal_thread(void *); @@ -33,8 +45,8 @@ extern void *nal_thread(void *); extern void set_address(bridge t,ptl_pid_t pidrequest); extern nal_t *procbridge_interface(int num_interface, - ptl_pt_index_t ptl_size, - ptl_ac_index_t acl_size, - ptl_pid_t requested_pid); + ptl_pt_index_t ptl_size, + ptl_ac_index_t acl_size, + ptl_pid_t requested_pid); #endif diff --git a/lnet/ulnds/proclib.c b/lnet/ulnds/proclib.c index c3ee103..38bbf28 100644 --- a/lnet/ulnds/proclib.c +++ b/lnet/ulnds/proclib.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * @@ -38,7 +39,6 @@ #include #include #include -//#include #include /* the following functions are stubs to satisfy the nal definition @@ -108,69 +108,22 @@ static int nal_dist(nal_cb_t *nal, { return 0; } - - - -/* Function: data_from_api - * Arguments: t: the nal state for this interface - * Returns: whether to continue reading from the pipe - * - * data_from_api() reads data from the api side in response - * to a select. - * - * We define data_failure() for syntactic convenience - * of unix error reporting. - */ - -#define data_failure(operand,fd,buffer,length)\ - if(syscall(SYS_##operand,fd,buffer,length)!=length){\ - lib_fini(b->nal_cb);\ - return(0);\ - } -static int data_from_api(void *arg) -{ - bridge b = arg; - procbridge p=(procbridge)b->local; - /* where are these two sizes derived from ??*/ - char arg_block[ 256 ]; - char ret_block[ 128 ]; - ptl_size_t arg_len,ret_len; - int fd=p->to_lib[0]; - int index; - - data_failure(read,fd, &index, sizeof(index)); - - if (index==PTL_FINI) { - lib_fini(b->nal_cb); - if (b->shutdown) (*b->shutdown)(b); - syscall(SYS_write, p->from_lib[1],&b->alive,sizeof(b->alive)); - - /* a heavy-handed but convenient way of shutting down - the lower side thread */ - pthread_exit(0); - } - - data_failure(read,fd, &arg_len, sizeof(arg_len)); - data_failure(read,fd, &ret_len, sizeof(ret_len)); - data_failure(read,fd, arg_block, arg_len); - - lib_dispatch(b->nal_cb, NULL, index, arg_block, ret_block); - - data_failure(write,p->from_lib[1],ret_block, ret_len); - return(1); -} -#undef data_failure - - static void wakeup_topside(void *z) { - bridge b=z; - procbridge p=b->local; + bridge b = z; + procbridge p = b->local; + int stop; pthread_mutex_lock(&p->mutex); + stop = p->nal_flags & NAL_FLAG_STOPPING; + if (stop) + p->nal_flags |= NAL_FLAG_STOPPED; pthread_cond_broadcast(&p->cond); pthread_mutex_unlock(&p->mutex); + + if (stop) + pthread_exit(0); } @@ -195,7 +148,8 @@ nal_initialize nal_table[PTL_IFACE_MAX]={0,tcpnal_init,0}; void *nal_thread(void *z) { - bridge b=z; + nal_init_args_t *args = (nal_init_args_t *) z; + bridge b = args->nia_bridge; procbridge p=b->local; int rc; ptl_pid_t pid_request; @@ -216,15 +170,9 @@ void *nal_thread(void *z) b->nal_cb->cb_sti=nal_sti; b->nal_cb->cb_dist=nal_dist; - - register_io_handler(p->to_lib[0],READ_HANDLER,data_from_api,(void *)b); - - if(!(rc = syscall(SYS_read, p->to_lib[0], &pid_request, sizeof(pid_request)))) - perror("procbridge read from api"); - if(!(rc = syscall(SYS_read, p->to_lib[0], &desired, sizeof(ptl_ni_limits_t)))) - perror("procbridge read from api"); - if(!(rc = syscall(SYS_read, p->to_lib[0], &nal_type, sizeof(nal_type)))) - perror("procbridge read from api"); + pid_request = args->nia_requested_pid; + desired = *args->nia_limits; + nal_type = args->nia_nal_type; actual = desired; LIMIT(desired.max_match_entries,actual.max_match_entries,MAX_MES); @@ -251,12 +199,12 @@ void *nal_thread(void *z) * it is non-zero since something went wrong. */ /* this should perform error checking */ -#if 0 - write(p->from_lib[1], &actual, sizeof(ptl_ni_limits_t)); -#endif - syscall(SYS_write, p->from_lib[1], &rc, sizeof(rc)); - - if(!rc) { + pthread_mutex_lock(&p->mutex); + p->nal_flags |= rc ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING; + pthread_cond_broadcast(&p->cond); + pthread_mutex_unlock(&p->mutex); + + if (!rc) { /* the thunk function is called each time the timer loop performs an operation and returns to blocking mode. we overload this function to inform the api side that @@ -267,4 +215,3 @@ void *nal_thread(void *z) return(0); } #undef LIMIT - diff --git a/lnet/ulnds/select.c b/lnet/ulnds/select.c index c4f84f4..bcfba02 100644 --- a/lnet/ulnds/select.c +++ b/lnet/ulnds/select.c @@ -97,9 +97,9 @@ void remove_io_handler (io_handler i) static void set_flag(io_handler n,fd_set *fds) { - if (n->type & READ_HANDLER) FD_SET(n->fd,fds); - if (n->type & WRITE_HANDLER) FD_SET(n->fd,fds+1); - if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd,fds+2); + if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]); + if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]); + if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]); } @@ -126,9 +126,9 @@ void select_timer_block(when until) timeout_pointer=&timeout; } else timeout_pointer=0; - FD_ZERO(fds); - FD_ZERO(fds+1); - FD_ZERO(fds+2); + FD_ZERO(&fds[0]); + FD_ZERO(&fds[1]); + FD_ZERO(&fds[2]); for (k=&io_handlers;*k;){ if ((*k)->disabled){ j=*k; @@ -140,14 +140,15 @@ void select_timer_block(when until) k=&(*k)->next; } } - result=select(FD_SETSIZE,fds,fds+1,fds+2,timeout_pointer); + + result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer); if (result > 0) for (j=io_handlers;j;j=j->next){ if (!(j->disabled) && - ((FD_ISSET(j->fd,fds) && (j->type & READ_HANDLER)) || - (FD_ISSET(j->fd,fds+1) && (j->type & WRITE_HANDLER)) || - (FD_ISSET(j->fd,fds+2) && (j->type & EXCEPTION_HANDLER)))){ + ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) || + (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) || + (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){ if (!(*j->function)(j->argument)) j->disabled=1; } diff --git a/lnet/ulnds/socklnd/procapi.c b/lnet/ulnds/socklnd/procapi.c index dfcd743..724ee74 100644 --- a/lnet/ulnds/socklnd/procapi.c +++ b/lnet/ulnds/socklnd/procapi.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * @@ -48,46 +49,22 @@ * forwards a packaged api call from the 'api' side to the 'library' * side, and collects the result */ -/* XXX CFS workaround: - * when multiple threads call forward at the same time, data in the - * pipe will be trampled. add a mutex to serialize the access, as - * a temporary solution. - */ -#define forward_failure(operand,fd,buffer,length)\ - if(syscall(SYS_##operand,fd,buffer,length)!=length){\ - lib_fini(b->nal_cb);\ - return(PTL_SEGV);\ - } static int procbridge_forward(nal_t *n, int id, void *args, ptl_size_t args_len, void *ret, ptl_size_t ret_len) { - bridge b=(bridge)n->nal_data; - procbridge p=(procbridge)b->local; - int lib=p->to_lib[1]; - static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; - int k; - - /* protect the whole access to pipe */ - pthread_mutex_lock(&mut); + bridge b = (bridge) n->nal_data; - forward_failure(write,lib, &id, sizeof(id)); - forward_failure(write,lib,&args_len, sizeof(args_len)); - forward_failure(write,lib,&ret_len, sizeof(ret_len)); - forward_failure(write,lib,args, args_len); + if (id == PTL_FINI) { + lib_fini(b->nal_cb); - do { - k=syscall(SYS_read, p->from_lib[0], ret, ret_len); - } while ((k!=ret_len) && (errno += EINTR)); + if (b->shutdown) + (*b->shutdown)(b); + } - pthread_mutex_unlock(&mut); + lib_dispatch(b->nal_cb, NULL, id, args, ret); - if(k!=ret_len){ - perror("nal: read return block"); - return PTL_SEGV; - } return (PTL_OK); } -#undef forward_failure /* Function: shutdown @@ -101,15 +78,18 @@ static int procbridge_shutdown(nal_t *n, int ni) { bridge b=(bridge)n->nal_data; procbridge p=(procbridge)b->local; - int code=PTL_FINI; - syscall(SYS_write, p->to_lib[1],&code,sizeof(code)); - syscall(SYS_read, p->from_lib[0],&code,sizeof(code)); + p->nal_flags |= NAL_FLAG_STOPPING; - syscall(SYS_close, p->to_lib[0]); - syscall(SYS_close, p->to_lib[1]); - syscall(SYS_close, p->from_lib[0]); - syscall(SYS_close, p->from_lib[1]); + do { + pthread_mutex_lock(&p->mutex); + if (p->nal_flags & NAL_FLAG_STOPPED) { + pthread_mutex_unlock(&p->mutex); + break; + } + pthread_cond_wait(&p->cond, &p->mutex); + pthread_mutex_unlock(&p->mutex); + } while (1); free(p); return(0); @@ -162,7 +142,9 @@ static nal_t api_nal = { unlock: procbridge_unlock }; -/* Function: bridge_init +ptl_nid_t tcpnal_mynid; + +/* Function: procbridge_interface * * Arguments: pid: requested process id (port offset) * PTL_ID_ANY not supported. @@ -176,77 +158,17 @@ static nal_t api_nal = { * initializes the tcp nal. we define unix_failure as an * error wrapper to cut down clutter. */ -#define unix_failure(operand,fd,buffer,length,text)\ - if(syscall(SYS_##operand,fd,buffer,length)!=length){\ - perror(text);\ - return(NULL);\ - } -#if 0 -static nal_t *bridge_init(ptl_interface_t nal, - ptl_pid_t pid_request, - ptl_ni_limits_t *desired, - ptl_ni_limits_t *actual, - int *rc) -{ - procbridge p; - bridge b; - static int initialized=0; - ptl_ni_limits_t limits = {-1,-1,-1,-1,-1}; - - if(initialized) return (&api_nal); - - init_unix_timer(); - - b=(bridge)malloc(sizeof(struct bridge)); - p=(procbridge)malloc(sizeof(struct procbridge)); - api_nal.nal_data=b; - b->local=p; - - if(pipe(p->to_lib) || pipe(p->from_lib)) { - perror("nal_init: pipe"); - return(NULL); - } - - if (desired) limits = *desired; - unix_failure(write,p->to_lib[1], &pid_request, sizeof(pid_request), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &nal, sizeof(ptl_interface_t), - "nal_init: write"); - - if(pthread_create(&p->t, NULL, nal_thread, b)) { - perror("nal_init: pthread_create"); - return(NULL); - } - - unix_failure(read,p->from_lib[0], actual, sizeof(ptl_ni_limits_t), - "tcp_init: read"); - unix_failure(read,p->from_lib[0], rc, sizeof(rc), - "nal_init: read"); - - if(*rc) return(NULL); - - initialized = 1; - pthread_mutex_init(&p->mutex,0); - pthread_cond_init(&p->cond, 0); - - return (&api_nal); -} -#endif - -ptl_nid_t tcpnal_mynid; - nal_t *procbridge_interface(int num_interface, ptl_pt_index_t ptl_size, ptl_ac_index_t acl_size, ptl_pid_t requested_pid) { + nal_init_args_t args; procbridge p; bridge b; static int initialized=0; ptl_ni_limits_t limits = {-1,-1,-1,-1,-1}; - int rc, nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */ + int nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */ if(initialized) return (&api_nal); @@ -257,38 +179,40 @@ nal_t *procbridge_interface(int num_interface, api_nal.nal_data=b; b->local=p; - if(pipe(p->to_lib) || pipe(p->from_lib)) { - perror("nal_init: pipe"); - return(NULL); - } - if (ptl_size) limits.max_ptable_index = ptl_size; if (acl_size) limits.max_atable_index = acl_size; - unix_failure(write,p->to_lib[1], &requested_pid, sizeof(requested_pid), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &nal_type, sizeof(nal_type), - "nal_init: write"); + args.nia_requested_pid = requested_pid; + args.nia_limits = &limits; + args.nia_nal_type = nal_type; + args.nia_bridge = b; - if(pthread_create(&p->t, NULL, nal_thread, b)) { + pthread_mutex_init(&p->mutex,0); + pthread_cond_init(&p->cond, 0); + p->nal_flags = 0; + + if (pthread_create(&p->t, NULL, nal_thread, &args)) { perror("nal_init: pthread_create"); return(NULL); } - unix_failure(read,p->from_lib[0], &rc, sizeof(rc), - "nal_init: read"); - - if(rc) return(NULL); + do { + pthread_mutex_lock(&p->mutex); + if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) { + pthread_mutex_unlock(&p->mutex); + break; + } + pthread_cond_wait(&p->cond, &p->mutex); + pthread_mutex_unlock(&p->mutex); + } while (1); + + if (p->nal_flags & NAL_FLAG_STOPPED) + return (NULL); b->nal_cb->ni.nid = tcpnal_mynid; initialized = 1; - pthread_mutex_init(&p->mutex,0); - pthread_cond_init(&p->cond, 0); return (&api_nal); } -#undef unix_failure diff --git a/lnet/ulnds/socklnd/procbridge.h b/lnet/ulnds/socklnd/procbridge.h index 060ae7b..f65b3bf 100644 --- a/lnet/ulnds/socklnd/procbridge.h +++ b/lnet/ulnds/socklnd/procbridge.h @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ */ @@ -14,14 +15,25 @@ #include +#define NAL_FLAG_RUNNING 1 +#define NAL_FLAG_STOPPING 2 +#define NAL_FLAG_STOPPED 4 + typedef struct procbridge { pthread_t t; pthread_cond_t cond; pthread_mutex_t mutex; - int to_lib[2]; - int from_lib[2]; + + int nal_flags; } *procbridge; +typedef struct nal_init_args { + ptl_pid_t nia_requested_pid; + ptl_ni_limits_t *nia_limits; + int nia_nal_type; + bridge nia_bridge; +} nal_init_args_t; + extern void *nal_thread(void *); @@ -33,8 +45,8 @@ extern void *nal_thread(void *); extern void set_address(bridge t,ptl_pid_t pidrequest); extern nal_t *procbridge_interface(int num_interface, - ptl_pt_index_t ptl_size, - ptl_ac_index_t acl_size, - ptl_pid_t requested_pid); + ptl_pt_index_t ptl_size, + ptl_ac_index_t acl_size, + ptl_pid_t requested_pid); #endif diff --git a/lnet/ulnds/socklnd/proclib.c b/lnet/ulnds/socklnd/proclib.c index c3ee103..38bbf28 100644 --- a/lnet/ulnds/socklnd/proclib.c +++ b/lnet/ulnds/socklnd/proclib.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * @@ -38,7 +39,6 @@ #include #include #include -//#include #include /* the following functions are stubs to satisfy the nal definition @@ -108,69 +108,22 @@ static int nal_dist(nal_cb_t *nal, { return 0; } - - - -/* Function: data_from_api - * Arguments: t: the nal state for this interface - * Returns: whether to continue reading from the pipe - * - * data_from_api() reads data from the api side in response - * to a select. - * - * We define data_failure() for syntactic convenience - * of unix error reporting. - */ - -#define data_failure(operand,fd,buffer,length)\ - if(syscall(SYS_##operand,fd,buffer,length)!=length){\ - lib_fini(b->nal_cb);\ - return(0);\ - } -static int data_from_api(void *arg) -{ - bridge b = arg; - procbridge p=(procbridge)b->local; - /* where are these two sizes derived from ??*/ - char arg_block[ 256 ]; - char ret_block[ 128 ]; - ptl_size_t arg_len,ret_len; - int fd=p->to_lib[0]; - int index; - - data_failure(read,fd, &index, sizeof(index)); - - if (index==PTL_FINI) { - lib_fini(b->nal_cb); - if (b->shutdown) (*b->shutdown)(b); - syscall(SYS_write, p->from_lib[1],&b->alive,sizeof(b->alive)); - - /* a heavy-handed but convenient way of shutting down - the lower side thread */ - pthread_exit(0); - } - - data_failure(read,fd, &arg_len, sizeof(arg_len)); - data_failure(read,fd, &ret_len, sizeof(ret_len)); - data_failure(read,fd, arg_block, arg_len); - - lib_dispatch(b->nal_cb, NULL, index, arg_block, ret_block); - - data_failure(write,p->from_lib[1],ret_block, ret_len); - return(1); -} -#undef data_failure - - static void wakeup_topside(void *z) { - bridge b=z; - procbridge p=b->local; + bridge b = z; + procbridge p = b->local; + int stop; pthread_mutex_lock(&p->mutex); + stop = p->nal_flags & NAL_FLAG_STOPPING; + if (stop) + p->nal_flags |= NAL_FLAG_STOPPED; pthread_cond_broadcast(&p->cond); pthread_mutex_unlock(&p->mutex); + + if (stop) + pthread_exit(0); } @@ -195,7 +148,8 @@ nal_initialize nal_table[PTL_IFACE_MAX]={0,tcpnal_init,0}; void *nal_thread(void *z) { - bridge b=z; + nal_init_args_t *args = (nal_init_args_t *) z; + bridge b = args->nia_bridge; procbridge p=b->local; int rc; ptl_pid_t pid_request; @@ -216,15 +170,9 @@ void *nal_thread(void *z) b->nal_cb->cb_sti=nal_sti; b->nal_cb->cb_dist=nal_dist; - - register_io_handler(p->to_lib[0],READ_HANDLER,data_from_api,(void *)b); - - if(!(rc = syscall(SYS_read, p->to_lib[0], &pid_request, sizeof(pid_request)))) - perror("procbridge read from api"); - if(!(rc = syscall(SYS_read, p->to_lib[0], &desired, sizeof(ptl_ni_limits_t)))) - perror("procbridge read from api"); - if(!(rc = syscall(SYS_read, p->to_lib[0], &nal_type, sizeof(nal_type)))) - perror("procbridge read from api"); + pid_request = args->nia_requested_pid; + desired = *args->nia_limits; + nal_type = args->nia_nal_type; actual = desired; LIMIT(desired.max_match_entries,actual.max_match_entries,MAX_MES); @@ -251,12 +199,12 @@ void *nal_thread(void *z) * it is non-zero since something went wrong. */ /* this should perform error checking */ -#if 0 - write(p->from_lib[1], &actual, sizeof(ptl_ni_limits_t)); -#endif - syscall(SYS_write, p->from_lib[1], &rc, sizeof(rc)); - - if(!rc) { + pthread_mutex_lock(&p->mutex); + p->nal_flags |= rc ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING; + pthread_cond_broadcast(&p->cond); + pthread_mutex_unlock(&p->mutex); + + if (!rc) { /* the thunk function is called each time the timer loop performs an operation and returns to blocking mode. we overload this function to inform the api side that @@ -267,4 +215,3 @@ void *nal_thread(void *z) return(0); } #undef LIMIT - diff --git a/lnet/ulnds/socklnd/select.c b/lnet/ulnds/socklnd/select.c index c4f84f4..bcfba02 100644 --- a/lnet/ulnds/socklnd/select.c +++ b/lnet/ulnds/socklnd/select.c @@ -97,9 +97,9 @@ void remove_io_handler (io_handler i) static void set_flag(io_handler n,fd_set *fds) { - if (n->type & READ_HANDLER) FD_SET(n->fd,fds); - if (n->type & WRITE_HANDLER) FD_SET(n->fd,fds+1); - if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd,fds+2); + if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]); + if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]); + if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]); } @@ -126,9 +126,9 @@ void select_timer_block(when until) timeout_pointer=&timeout; } else timeout_pointer=0; - FD_ZERO(fds); - FD_ZERO(fds+1); - FD_ZERO(fds+2); + FD_ZERO(&fds[0]); + FD_ZERO(&fds[1]); + FD_ZERO(&fds[2]); for (k=&io_handlers;*k;){ if ((*k)->disabled){ j=*k; @@ -140,14 +140,15 @@ void select_timer_block(when until) k=&(*k)->next; } } - result=select(FD_SETSIZE,fds,fds+1,fds+2,timeout_pointer); + + result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer); if (result > 0) for (j=io_handlers;j;j=j->next){ if (!(j->disabled) && - ((FD_ISSET(j->fd,fds) && (j->type & READ_HANDLER)) || - (FD_ISSET(j->fd,fds+1) && (j->type & WRITE_HANDLER)) || - (FD_ISSET(j->fd,fds+2) && (j->type & EXCEPTION_HANDLER)))){ + ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) || + (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) || + (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){ if (!(*j->function)(j->argument)) j->disabled=1; } diff --git a/lnet/ulnds/socklnd/tcplnd.c b/lnet/ulnds/socklnd/tcplnd.c index 7c7c94d..ae97f92 100644 --- a/lnet/ulnds/socklnd/tcplnd.c +++ b/lnet/ulnds/socklnd/tcplnd.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * @@ -50,7 +51,6 @@ * * sends a packet to the peer, after insuring that a connection exists */ -#warning FIXME: "param 'type' is newly added, make use of it!!" int tcpnal_send(nal_cb_t *n, void *private, lib_msg_t *cookie, @@ -163,15 +163,15 @@ finalize: */ static int from_connection(void *a, void *d) { - connection c = d; - bridge b=a; - ptl_hdr_t hdr; - - if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){ - lib_parse(b->nal_cb, &hdr, c); - return(1); - } - return(0); + connection c = d; + bridge b = a; + ptl_hdr_t hdr; + + if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){ + lib_parse(b->nal_cb, &hdr, c); + return(1); + } + return(0); } diff --git a/lnet/ulnds/tcplnd.c b/lnet/ulnds/tcplnd.c index 7c7c94d..ae97f92 100644 --- a/lnet/ulnds/tcplnd.c +++ b/lnet/ulnds/tcplnd.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * @@ -50,7 +51,6 @@ * * sends a packet to the peer, after insuring that a connection exists */ -#warning FIXME: "param 'type' is newly added, make use of it!!" int tcpnal_send(nal_cb_t *n, void *private, lib_msg_t *cookie, @@ -163,15 +163,15 @@ finalize: */ static int from_connection(void *a, void *d) { - connection c = d; - bridge b=a; - ptl_hdr_t hdr; - - if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){ - lib_parse(b->nal_cb, &hdr, c); - return(1); - } - return(0); + connection c = d; + bridge b = a; + ptl_hdr_t hdr; + + if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){ + lib_parse(b->nal_cb, &hdr, c); + return(1); + } + return(0); } diff --git a/lustre/portals/unals/procapi.c b/lustre/portals/unals/procapi.c index dfcd743..724ee74 100644 --- a/lustre/portals/unals/procapi.c +++ b/lustre/portals/unals/procapi.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * @@ -48,46 +49,22 @@ * forwards a packaged api call from the 'api' side to the 'library' * side, and collects the result */ -/* XXX CFS workaround: - * when multiple threads call forward at the same time, data in the - * pipe will be trampled. add a mutex to serialize the access, as - * a temporary solution. - */ -#define forward_failure(operand,fd,buffer,length)\ - if(syscall(SYS_##operand,fd,buffer,length)!=length){\ - lib_fini(b->nal_cb);\ - return(PTL_SEGV);\ - } static int procbridge_forward(nal_t *n, int id, void *args, ptl_size_t args_len, void *ret, ptl_size_t ret_len) { - bridge b=(bridge)n->nal_data; - procbridge p=(procbridge)b->local; - int lib=p->to_lib[1]; - static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; - int k; - - /* protect the whole access to pipe */ - pthread_mutex_lock(&mut); + bridge b = (bridge) n->nal_data; - forward_failure(write,lib, &id, sizeof(id)); - forward_failure(write,lib,&args_len, sizeof(args_len)); - forward_failure(write,lib,&ret_len, sizeof(ret_len)); - forward_failure(write,lib,args, args_len); + if (id == PTL_FINI) { + lib_fini(b->nal_cb); - do { - k=syscall(SYS_read, p->from_lib[0], ret, ret_len); - } while ((k!=ret_len) && (errno += EINTR)); + if (b->shutdown) + (*b->shutdown)(b); + } - pthread_mutex_unlock(&mut); + lib_dispatch(b->nal_cb, NULL, id, args, ret); - if(k!=ret_len){ - perror("nal: read return block"); - return PTL_SEGV; - } return (PTL_OK); } -#undef forward_failure /* Function: shutdown @@ -101,15 +78,18 @@ static int procbridge_shutdown(nal_t *n, int ni) { bridge b=(bridge)n->nal_data; procbridge p=(procbridge)b->local; - int code=PTL_FINI; - syscall(SYS_write, p->to_lib[1],&code,sizeof(code)); - syscall(SYS_read, p->from_lib[0],&code,sizeof(code)); + p->nal_flags |= NAL_FLAG_STOPPING; - syscall(SYS_close, p->to_lib[0]); - syscall(SYS_close, p->to_lib[1]); - syscall(SYS_close, p->from_lib[0]); - syscall(SYS_close, p->from_lib[1]); + do { + pthread_mutex_lock(&p->mutex); + if (p->nal_flags & NAL_FLAG_STOPPED) { + pthread_mutex_unlock(&p->mutex); + break; + } + pthread_cond_wait(&p->cond, &p->mutex); + pthread_mutex_unlock(&p->mutex); + } while (1); free(p); return(0); @@ -162,7 +142,9 @@ static nal_t api_nal = { unlock: procbridge_unlock }; -/* Function: bridge_init +ptl_nid_t tcpnal_mynid; + +/* Function: procbridge_interface * * Arguments: pid: requested process id (port offset) * PTL_ID_ANY not supported. @@ -176,77 +158,17 @@ static nal_t api_nal = { * initializes the tcp nal. we define unix_failure as an * error wrapper to cut down clutter. */ -#define unix_failure(operand,fd,buffer,length,text)\ - if(syscall(SYS_##operand,fd,buffer,length)!=length){\ - perror(text);\ - return(NULL);\ - } -#if 0 -static nal_t *bridge_init(ptl_interface_t nal, - ptl_pid_t pid_request, - ptl_ni_limits_t *desired, - ptl_ni_limits_t *actual, - int *rc) -{ - procbridge p; - bridge b; - static int initialized=0; - ptl_ni_limits_t limits = {-1,-1,-1,-1,-1}; - - if(initialized) return (&api_nal); - - init_unix_timer(); - - b=(bridge)malloc(sizeof(struct bridge)); - p=(procbridge)malloc(sizeof(struct procbridge)); - api_nal.nal_data=b; - b->local=p; - - if(pipe(p->to_lib) || pipe(p->from_lib)) { - perror("nal_init: pipe"); - return(NULL); - } - - if (desired) limits = *desired; - unix_failure(write,p->to_lib[1], &pid_request, sizeof(pid_request), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &nal, sizeof(ptl_interface_t), - "nal_init: write"); - - if(pthread_create(&p->t, NULL, nal_thread, b)) { - perror("nal_init: pthread_create"); - return(NULL); - } - - unix_failure(read,p->from_lib[0], actual, sizeof(ptl_ni_limits_t), - "tcp_init: read"); - unix_failure(read,p->from_lib[0], rc, sizeof(rc), - "nal_init: read"); - - if(*rc) return(NULL); - - initialized = 1; - pthread_mutex_init(&p->mutex,0); - pthread_cond_init(&p->cond, 0); - - return (&api_nal); -} -#endif - -ptl_nid_t tcpnal_mynid; - nal_t *procbridge_interface(int num_interface, ptl_pt_index_t ptl_size, ptl_ac_index_t acl_size, ptl_pid_t requested_pid) { + nal_init_args_t args; procbridge p; bridge b; static int initialized=0; ptl_ni_limits_t limits = {-1,-1,-1,-1,-1}; - int rc, nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */ + int nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */ if(initialized) return (&api_nal); @@ -257,38 +179,40 @@ nal_t *procbridge_interface(int num_interface, api_nal.nal_data=b; b->local=p; - if(pipe(p->to_lib) || pipe(p->from_lib)) { - perror("nal_init: pipe"); - return(NULL); - } - if (ptl_size) limits.max_ptable_index = ptl_size; if (acl_size) limits.max_atable_index = acl_size; - unix_failure(write,p->to_lib[1], &requested_pid, sizeof(requested_pid), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t), - "nal_init: write"); - unix_failure(write,p->to_lib[1], &nal_type, sizeof(nal_type), - "nal_init: write"); + args.nia_requested_pid = requested_pid; + args.nia_limits = &limits; + args.nia_nal_type = nal_type; + args.nia_bridge = b; - if(pthread_create(&p->t, NULL, nal_thread, b)) { + pthread_mutex_init(&p->mutex,0); + pthread_cond_init(&p->cond, 0); + p->nal_flags = 0; + + if (pthread_create(&p->t, NULL, nal_thread, &args)) { perror("nal_init: pthread_create"); return(NULL); } - unix_failure(read,p->from_lib[0], &rc, sizeof(rc), - "nal_init: read"); - - if(rc) return(NULL); + do { + pthread_mutex_lock(&p->mutex); + if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) { + pthread_mutex_unlock(&p->mutex); + break; + } + pthread_cond_wait(&p->cond, &p->mutex); + pthread_mutex_unlock(&p->mutex); + } while (1); + + if (p->nal_flags & NAL_FLAG_STOPPED) + return (NULL); b->nal_cb->ni.nid = tcpnal_mynid; initialized = 1; - pthread_mutex_init(&p->mutex,0); - pthread_cond_init(&p->cond, 0); return (&api_nal); } -#undef unix_failure diff --git a/lustre/portals/unals/procbridge.h b/lustre/portals/unals/procbridge.h index 060ae7b..f65b3bf 100644 --- a/lustre/portals/unals/procbridge.h +++ b/lustre/portals/unals/procbridge.h @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ */ @@ -14,14 +15,25 @@ #include +#define NAL_FLAG_RUNNING 1 +#define NAL_FLAG_STOPPING 2 +#define NAL_FLAG_STOPPED 4 + typedef struct procbridge { pthread_t t; pthread_cond_t cond; pthread_mutex_t mutex; - int to_lib[2]; - int from_lib[2]; + + int nal_flags; } *procbridge; +typedef struct nal_init_args { + ptl_pid_t nia_requested_pid; + ptl_ni_limits_t *nia_limits; + int nia_nal_type; + bridge nia_bridge; +} nal_init_args_t; + extern void *nal_thread(void *); @@ -33,8 +45,8 @@ extern void *nal_thread(void *); extern void set_address(bridge t,ptl_pid_t pidrequest); extern nal_t *procbridge_interface(int num_interface, - ptl_pt_index_t ptl_size, - ptl_ac_index_t acl_size, - ptl_pid_t requested_pid); + ptl_pt_index_t ptl_size, + ptl_ac_index_t acl_size, + ptl_pid_t requested_pid); #endif diff --git a/lustre/portals/unals/proclib.c b/lustre/portals/unals/proclib.c index c3ee103..38bbf28 100644 --- a/lustre/portals/unals/proclib.c +++ b/lustre/portals/unals/proclib.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * @@ -38,7 +39,6 @@ #include #include #include -//#include #include /* the following functions are stubs to satisfy the nal definition @@ -108,69 +108,22 @@ static int nal_dist(nal_cb_t *nal, { return 0; } - - - -/* Function: data_from_api - * Arguments: t: the nal state for this interface - * Returns: whether to continue reading from the pipe - * - * data_from_api() reads data from the api side in response - * to a select. - * - * We define data_failure() for syntactic convenience - * of unix error reporting. - */ - -#define data_failure(operand,fd,buffer,length)\ - if(syscall(SYS_##operand,fd,buffer,length)!=length){\ - lib_fini(b->nal_cb);\ - return(0);\ - } -static int data_from_api(void *arg) -{ - bridge b = arg; - procbridge p=(procbridge)b->local; - /* where are these two sizes derived from ??*/ - char arg_block[ 256 ]; - char ret_block[ 128 ]; - ptl_size_t arg_len,ret_len; - int fd=p->to_lib[0]; - int index; - - data_failure(read,fd, &index, sizeof(index)); - - if (index==PTL_FINI) { - lib_fini(b->nal_cb); - if (b->shutdown) (*b->shutdown)(b); - syscall(SYS_write, p->from_lib[1],&b->alive,sizeof(b->alive)); - - /* a heavy-handed but convenient way of shutting down - the lower side thread */ - pthread_exit(0); - } - - data_failure(read,fd, &arg_len, sizeof(arg_len)); - data_failure(read,fd, &ret_len, sizeof(ret_len)); - data_failure(read,fd, arg_block, arg_len); - - lib_dispatch(b->nal_cb, NULL, index, arg_block, ret_block); - - data_failure(write,p->from_lib[1],ret_block, ret_len); - return(1); -} -#undef data_failure - - static void wakeup_topside(void *z) { - bridge b=z; - procbridge p=b->local; + bridge b = z; + procbridge p = b->local; + int stop; pthread_mutex_lock(&p->mutex); + stop = p->nal_flags & NAL_FLAG_STOPPING; + if (stop) + p->nal_flags |= NAL_FLAG_STOPPED; pthread_cond_broadcast(&p->cond); pthread_mutex_unlock(&p->mutex); + + if (stop) + pthread_exit(0); } @@ -195,7 +148,8 @@ nal_initialize nal_table[PTL_IFACE_MAX]={0,tcpnal_init,0}; void *nal_thread(void *z) { - bridge b=z; + nal_init_args_t *args = (nal_init_args_t *) z; + bridge b = args->nia_bridge; procbridge p=b->local; int rc; ptl_pid_t pid_request; @@ -216,15 +170,9 @@ void *nal_thread(void *z) b->nal_cb->cb_sti=nal_sti; b->nal_cb->cb_dist=nal_dist; - - register_io_handler(p->to_lib[0],READ_HANDLER,data_from_api,(void *)b); - - if(!(rc = syscall(SYS_read, p->to_lib[0], &pid_request, sizeof(pid_request)))) - perror("procbridge read from api"); - if(!(rc = syscall(SYS_read, p->to_lib[0], &desired, sizeof(ptl_ni_limits_t)))) - perror("procbridge read from api"); - if(!(rc = syscall(SYS_read, p->to_lib[0], &nal_type, sizeof(nal_type)))) - perror("procbridge read from api"); + pid_request = args->nia_requested_pid; + desired = *args->nia_limits; + nal_type = args->nia_nal_type; actual = desired; LIMIT(desired.max_match_entries,actual.max_match_entries,MAX_MES); @@ -251,12 +199,12 @@ void *nal_thread(void *z) * it is non-zero since something went wrong. */ /* this should perform error checking */ -#if 0 - write(p->from_lib[1], &actual, sizeof(ptl_ni_limits_t)); -#endif - syscall(SYS_write, p->from_lib[1], &rc, sizeof(rc)); - - if(!rc) { + pthread_mutex_lock(&p->mutex); + p->nal_flags |= rc ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING; + pthread_cond_broadcast(&p->cond); + pthread_mutex_unlock(&p->mutex); + + if (!rc) { /* the thunk function is called each time the timer loop performs an operation and returns to blocking mode. we overload this function to inform the api side that @@ -267,4 +215,3 @@ void *nal_thread(void *z) return(0); } #undef LIMIT - diff --git a/lustre/portals/unals/select.c b/lustre/portals/unals/select.c index c4f84f4..bcfba02 100644 --- a/lustre/portals/unals/select.c +++ b/lustre/portals/unals/select.c @@ -97,9 +97,9 @@ void remove_io_handler (io_handler i) static void set_flag(io_handler n,fd_set *fds) { - if (n->type & READ_HANDLER) FD_SET(n->fd,fds); - if (n->type & WRITE_HANDLER) FD_SET(n->fd,fds+1); - if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd,fds+2); + if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]); + if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]); + if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]); } @@ -126,9 +126,9 @@ void select_timer_block(when until) timeout_pointer=&timeout; } else timeout_pointer=0; - FD_ZERO(fds); - FD_ZERO(fds+1); - FD_ZERO(fds+2); + FD_ZERO(&fds[0]); + FD_ZERO(&fds[1]); + FD_ZERO(&fds[2]); for (k=&io_handlers;*k;){ if ((*k)->disabled){ j=*k; @@ -140,14 +140,15 @@ void select_timer_block(when until) k=&(*k)->next; } } - result=select(FD_SETSIZE,fds,fds+1,fds+2,timeout_pointer); + + result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer); if (result > 0) for (j=io_handlers;j;j=j->next){ if (!(j->disabled) && - ((FD_ISSET(j->fd,fds) && (j->type & READ_HANDLER)) || - (FD_ISSET(j->fd,fds+1) && (j->type & WRITE_HANDLER)) || - (FD_ISSET(j->fd,fds+2) && (j->type & EXCEPTION_HANDLER)))){ + ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) || + (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) || + (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){ if (!(*j->function)(j->argument)) j->disabled=1; } diff --git a/lustre/portals/unals/tcpnal.c b/lustre/portals/unals/tcpnal.c index 7c7c94d..ae97f92 100644 --- a/lustre/portals/unals/tcpnal.c +++ b/lustre/portals/unals/tcpnal.c @@ -2,6 +2,7 @@ * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2002 Cray Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ * @@ -50,7 +51,6 @@ * * sends a packet to the peer, after insuring that a connection exists */ -#warning FIXME: "param 'type' is newly added, make use of it!!" int tcpnal_send(nal_cb_t *n, void *private, lib_msg_t *cookie, @@ -163,15 +163,15 @@ finalize: */ static int from_connection(void *a, void *d) { - connection c = d; - bridge b=a; - ptl_hdr_t hdr; - - if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){ - lib_parse(b->nal_cb, &hdr, c); - return(1); - } - return(0); + connection c = d; + bridge b = a; + ptl_hdr_t hdr; + + if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){ + lib_parse(b->nal_cb, &hdr, c); + return(1); + } + return(0); } -- 1.8.3.1