extern spinlock_t stack_backtrace_lock;
char *portals_debug_dumpstack(void);
-char *portals_nid2str(int nal, ptl_nid_t nid, char *str);
void portals_run_upcall(char **argv);
void portals_run_lbug_upcall(char * file, const char *fn, const int line);
void portals_debug_dumplog(void);
# include <stdlib.h>
#ifndef __CYGWIN__
# include <stdint.h>
+#else
+# include <cygwin-ioctl.h>
#endif
# include <unistd.h>
# include <time.h>
getpid() , stack, ## a);
#endif
+/* support decl needed both by kernel and liblustre */
+char *portals_nid2str(int nal, ptl_nid_t nid, char *str);
+
#ifndef CURRENT_TIME
# define CURRENT_TIME time(0)
#endif
wh->wh_object_cookie == PTL_WIRE_HANDLE_NONE.wh_object_cookie);
}
-#ifdef __KERNEL__
#define state_lock(nal,flagsp) \
do { \
CDEBUG(D_PORTALS, "taking state lock\n"); \
CDEBUG(D_PORTALS, "releasing state lock\n"); \
nal->cb_sti(nal, flagsp); \
}
-#else
-/* not needed in user space until we thread there */
-#define state_lock(nal,flagsp) \
-do { \
- CDEBUG(D_PORTALS, "taking state lock\n"); \
- CDEBUG(D_PORTALS, "%p:%p\n", nal, flagsp); \
-} while (0)
-
-#define state_unlock(nal,flagsp) \
-{ \
- CDEBUG(D_PORTALS, "releasing state lock\n"); \
- CDEBUG(D_PORTALS, "%p:%p\n", nal, flagsp); \
-}
-#endif /* __KERNEL__ */
#ifndef PTL_USE_SLAB_CACHE
wh->wh_object_cookie == PTL_WIRE_HANDLE_NONE.wh_object_cookie);
}
-#ifdef __KERNEL__
#define state_lock(nal,flagsp) \
do { \
CDEBUG(D_PORTALS, "taking state lock\n"); \
CDEBUG(D_PORTALS, "releasing state lock\n"); \
nal->cb_sti(nal, flagsp); \
}
-#else
-/* not needed in user space until we thread there */
-#define state_lock(nal,flagsp) \
-do { \
- CDEBUG(D_PORTALS, "taking state lock\n"); \
- CDEBUG(D_PORTALS, "%p:%p\n", nal, flagsp); \
-} while (0)
-
-#define state_unlock(nal,flagsp) \
-{ \
- CDEBUG(D_PORTALS, "releasing state lock\n"); \
- CDEBUG(D_PORTALS, "%p:%p\n", nal, flagsp); \
-}
-#endif /* __KERNEL__ */
#ifndef PTL_USE_SLAB_CACHE
int ptl_set_cfg_record_cb(cfg_record_cb_t cb);
/* l_ioctl.c */
+typedef int (ioc_handler_t)(int dev_id, int opc, void *buf);
+void set_ioc_handler(ioc_handler_t *handler);
int register_ioc_dev(int dev_id, const char * dev_name);
void unregister_ioc_dev(int dev_id);
int set_ioctl_dump(char * file);
int ptl_set_cfg_record_cb(cfg_record_cb_t cb);
/* l_ioctl.c */
+typedef int (ioc_handler_t)(int dev_id, int opc, void *buf);
+void set_ioc_handler(ioc_handler_t *handler);
int register_ioc_dev(int dev_id, const char * dev_name);
void unregister_ioc_dev(int dev_id);
int set_ioctl_dump(char * file);
typedef unsigned PTL_SEQ_BASETYPE ptl_seq_t;
#define PTL_SEQ_GT(a,b) (((signed PTL_SEQ_BASETYPE)((a) - (b))) > 0)
+/* XXX
+ * cygwin need the pragma line, not clear if it's needed in other places.
+ * checking!!!
+ */
+#ifdef __CYGWIN__
+#pragma pack(push, 4)
+#endif
typedef struct {
ptl_event_kind_t type;
ptl_process_id_t initiator;
struct timeval arrival_time;
volatile ptl_seq_t sequence;
} ptl_event_t;
+#ifdef __CYGWIN__
+#pragma pop
+#endif
typedef enum {
PTL_ACK_REQ,
static void eq_timeout(int signal)
{
+ sigset_t set;
+
+ /* signal will be automatically disabled in sig handler,
+ * must enable it before long jump
+ */
+ sigemptyset(&set);
+ sigaddset(&set, SIGALRM);
+ sigprocmask(SIG_UNBLOCK, &set, NULL);
+
longjmp(eq_jumpbuf, -1);
}
int PtlEQWait_timeout(ptl_handle_eq_t eventq_in, ptl_event_t * event_out,
int timeout)
{
- static void (*prev) (int);
+ static void (*prev) (int) = NULL;
static int left_over;
time_t time_at_start;
int rc;
left_over = alarm(timeout);
prev = signal(SIGALRM, eq_timeout);
time_at_start = time(NULL);
- if (left_over < timeout)
+ if (left_over && left_over < timeout)
alarm(left_over);
rc = PtlEQWait(eventq_in, event_out);
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
-#include <syscall.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <portals/types.h>
+#include <portals/list.h>
+#include <portals/lib-types.h>
+#include <portals/socknal.h>
+#include <linux/kp30.h>
#include <connection.h>
+#include <pthread.h>
#include <errno.h>
-
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
/* global variable: acceptor port */
unsigned short tcpnal_acceptor_port = 988;
*/
static int compare_connection(void *arg1, void *arg2)
{
- connection c = arg1;
- unsigned int * id = arg2;
- return((c->ip==id[0]) && (c->port==id[1]));
+ connection c = arg1;
+ unsigned int * id = arg2;
+#if 0
+ return((c->ip==id[0]) && (c->port==id[1]));
+#else
+ /* CFS specific hacking */
+ return (c->ip == id[0]);
+#endif
}
*/
static unsigned int connection_key(unsigned int *id)
{
+#if 0
return(id[0]^id[1]);
+#else
+ /* CFS specific hacking */
+ return (unsigned int) id[0];
+#endif
}
unsigned char *dest,
int len)
{
- int offset=0,rc;
+ int offset = 0,rc;
- if (len){
+ if (len) {
do {
- if((rc=syscall(SYS_read, c->fd, dest+offset, len-offset))<=0){
- if (errno==EINTR) {
- rc=0;
+#ifndef __CYGWIN__
+ rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
+#else
+ rc = recv(c->fd, dest+offset, len-offset, 0);
+#endif
+ if (rc <= 0) {
+ if (errno == EINTR) {
+ rc = 0;
} else {
remove_connection(c);
- return(0);
+ return (0);
}
}
- offset+=rc;
- } while (offset<len);
+ offset += rc;
+ } while (offset < len);
}
- return(1);
+ return (1);
}
static int connection_input(void *d)
unsigned int nid=*((unsigned int *)&s.sin_addr);
/* cfs specific hack */
//unsigned short pid=s.sin_port;
+ pthread_mutex_lock(&m->conn_lock);
allocate_connection(m,htonl(nid),0/*pid*/,fd);
+ pthread_mutex_unlock(&m->conn_lock);
return(1);
}
+/* FIXME assuming little endian, cleanup!! */
+#define __cpu_to_le64(x) ((__u64)(x))
+#define __le64_to_cpu(x) ((__u64)(x))
+#define __cpu_to_le32(x) ((__u32)(x))
+#define __le32_to_cpu(x) ((__u32)(x))
+#define __cpu_to_le16(x) ((__u16)(x))
+#define __le16_to_cpu(x) ((__u16)(x))
+
+extern ptl_nid_t tcpnal_mynid;
+
+int
+tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
+{
+ int rc;
+ ptl_hdr_t hdr;
+ ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
+
+ LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
+
+ memset (&hdr, 0, sizeof (hdr));
+ hmv->magic = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
+ hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
+ hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
+
+ hdr.src_nid = __cpu_to_le64 (tcpnal_mynid);
+ hdr.type = __cpu_to_le32 (PTL_MSG_HELLO);
+
+ hdr.msg.hello.type = __cpu_to_le32 (type);
+ hdr.msg.hello.incarnation = 0;
+
+ /* Assume sufficient socket buffering for this message */
+ rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
+ if (rc <= 0) {
+ CERROR ("Error %d sending HELLO to %llx\n", rc, *nid);
+ return (rc);
+ }
+
+ rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
+ if (rc <= 0) {
+ CERROR ("Error %d reading HELLO from %llx\n", rc, *nid);
+ return (rc);
+ }
+
+ if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
+ CERROR ("Bad magic %#08x (%#08x expected) from %llx\n",
+ __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
+ return (-EPROTO);
+ }
+
+ if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
+ hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
+ CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
+ " from %llx\n",
+ __le16_to_cpu (hmv->version_major),
+ __le16_to_cpu (hmv->version_minor),
+ PORTALS_PROTO_VERSION_MAJOR,
+ PORTALS_PROTO_VERSION_MINOR,
+ *nid);
+ return (-EPROTO);
+ }
+
+#if (PORTALS_PROTO_VERSION_MAJOR != 0)
+# error "This code only understands protocol version 0.x"
+#endif
+ /* version 0 sends magic/version as the dest_nid of a 'hello' header,
+ * so read the rest of it in now... */
+
+ rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
+ if (rc <= 0) {
+ CERROR ("Error %d reading rest of HELLO hdr from %llx\n",
+ rc, *nid);
+ return (rc);
+ }
+
+ /* ...and check we got what we expected */
+ if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
+ hdr.payload_length != __cpu_to_le32 (0)) {
+ CERROR ("Expecting a HELLO hdr with 0 payload,"
+ " but got type %d with %d payload from %llx\n",
+ __le32_to_cpu (hdr.type),
+ __le32_to_cpu (hdr.payload_length), *nid);
+ return (-EPROTO);
+ }
+
+ if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
+ CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
+ return (-EPROTO);
+ }
+
+ if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */
+ *nid = __le64_to_cpu(hdr.src_nid);
+ } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
+ CERROR ("Connected to nid %llx, but expecting %llx\n",
+ __le64_to_cpu (hdr.src_nid), *nid);
+ return (-EPROTO);
+ }
+
+ return (0);
+}
/* Function: force_tcp_connection
* Arguments: t: tcpnal
unsigned int ip,
unsigned short port)
{
- connection c;
+ connection conn;
struct sockaddr_in addr;
unsigned int id[2];
port = tcpnal_acceptor_port;
- id[0]=ip;
- id[1]=port;
+ id[0] = ip;
+ id[1] = port;
- if (!(c=hash_table_find(m->connections,id))){
+ pthread_mutex_lock(&m->conn_lock);
+
+ conn = hash_table_find(m->connections, id);
+ if (!conn) {
int fd;
+ int option;
+ ptl_nid_t peernid = PTL_NID_ANY;
bzero((char *) &addr, sizeof(addr));
addr.sin_family = AF_INET;
perror("tcpnal socket failed");
exit(-1);
}
- if (connect(fd,
- (struct sockaddr *)&addr,
- sizeof(struct sockaddr_in)))
- {
- perror("tcpnal connect");
- return(0);
- }
- return(allocate_connection(m,ip,port,fd));
+ if (connect(fd, (struct sockaddr *)&addr,
+ sizeof(struct sockaddr_in))) {
+ perror("tcpnal connect");
+ return(0);
+ }
+
+#if 1
+ option = 1;
+ setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
+ option = 1<<20;
+ setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
+ option = 1<<20;
+ setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
+#endif
+
+ /* say hello */
+ if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, 0))
+ exit(-1);
+
+ conn = allocate_connection(m, ip, port, fd);
}
- return(c);
+
+ pthread_mutex_unlock(&m->conn_lock);
+ return (conn);
}
bzero((char *) &addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = 0;
- addr.sin_port = port;
-
+ addr.sin_port = htons(port);
+
if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
perror ("tcpnal bind");
return(0);
int (*input)(void *, void *),
void *a)
{
- manager m=(manager)malloc(sizeof(struct manager));
- m->connections=hash_create_table(compare_connection,connection_key);
- m->handler=input;
- m->handler_arg=a;
- if (bind_socket(m,pid)) return(m);
+ manager m = (manager)malloc(sizeof(struct manager));
+ m->connections = hash_create_table(compare_connection,connection_key);
+ m->handler = input;
+ m->handler_arg = a;
+ pthread_mutex_init(&m->conn_lock, 0);
+
+ if (bind_socket(m,pid))
+ return(m);
+
free(m);
return(0);
}
typedef struct manager {
table connections;
+ pthread_mutex_t conn_lock; /* protect connections table */
int bound;
io_handler bound_handler;
int (*handler)(void *, void *);
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
-#include <syscall.h>
#include <procbridge.h>
#include <pqtimer.h>
#include <dispatch.h>
* forwards a packaged api call from the 'api' side to the 'library'
* side, and collects the result
*/
-#define forward_failure(operand,fd,buffer,length)\
- if(syscall(SYS_##operand,fd,buffer,length)!=length){\
- lib_fini(b->nal_cb);\
- return(PTL_SEGV);\
- }
static int procbridge_forward(nal_t *n, int id, void *args, size_t args_len,
- void *ret, size_t ret_len)
+ void *ret, ptl_size_t ret_len)
{
- bridge b=(bridge)n->nal_data;
- procbridge p=(procbridge)b->local;
- int lib=p->to_lib[1];
- int k;
+ bridge b = (bridge) n->nal_data;
- forward_failure(write,lib, &id, sizeof(id));
- forward_failure(write,lib,&args_len, sizeof(args_len));
- forward_failure(write,lib,&ret_len, sizeof(ret_len));
- forward_failure(write,lib,args, args_len);
-
- do {
- k=syscall(SYS_read, p->from_lib[0], ret, ret_len);
- } while ((k!=ret_len) && (errno += EINTR));
+ if (id == PTL_FINI) {
+ lib_fini(b->nal_cb);
- if(k!=ret_len){
- perror("nal: read return block");
- return PTL_SEGV;
+ if (b->shutdown)
+ (*b->shutdown)(b);
}
+
+ lib_dispatch(b->nal_cb, NULL, id, args, ret);
+
return (PTL_OK);
}
-#undef forward_failure
/* Function: shutdown
{
bridge b=(bridge)n->nal_data;
procbridge p=(procbridge)b->local;
- int code=PTL_FINI;
- syscall(SYS_write, p->to_lib[1],&code,sizeof(code));
- syscall(SYS_read, p->from_lib[0],&code,sizeof(code));
+ p->nal_flags |= NAL_FLAG_STOPPING;
- syscall(SYS_close, p->to_lib[0]);
- syscall(SYS_close, p->to_lib[1]);
- syscall(SYS_close, p->from_lib[0]);
- syscall(SYS_close, p->from_lib[1]);
+ do {
+ pthread_mutex_lock(&p->mutex);
+ if (p->nal_flags & NAL_FLAG_STOPPED) {
+ pthread_mutex_unlock(&p->mutex);
+ break;
+ }
+ pthread_cond_wait(&p->cond, &p->mutex);
+ pthread_mutex_unlock(&p->mutex);
+ } while (1);
free(p);
return(0);
unlock: procbridge_unlock
};
-/* Function: bridge_init
+ptl_nid_t tcpnal_mynid;
+
+/* Function: procbridge_interface
*
* Arguments: pid: requested process id (port offset)
* PTL_ID_ANY not supported.
* initializes the tcp nal. we define unix_failure as an
* error wrapper to cut down clutter.
*/
-#define unix_failure(operand,fd,buffer,length,text)\
- if(syscall(SYS_##operand,fd,buffer,length)!=length){\
- perror(text);\
- return(NULL);\
- }
-#if 0
-static nal_t *bridge_init(ptl_interface_t nal,
- ptl_pid_t pid_request,
- ptl_ni_limits_t *desired,
- ptl_ni_limits_t *actual,
- int *rc)
-{
- procbridge p;
- bridge b;
- static int initialized=0;
- ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
-
- if(initialized) return (&api_nal);
-
- init_unix_timer();
-
- b=(bridge)malloc(sizeof(struct bridge));
- p=(procbridge)malloc(sizeof(struct procbridge));
- api_nal.nal_data=b;
- b->local=p;
-
- if(pipe(p->to_lib) || pipe(p->from_lib)) {
- perror("nal_init: pipe");
- return(NULL);
- }
-
- if (desired) limits = *desired;
- unix_failure(write,p->to_lib[1], &pid_request, sizeof(pid_request),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &nal, sizeof(ptl_interface_t),
- "nal_init: write");
-
- if(pthread_create(&p->t, NULL, nal_thread, b)) {
- perror("nal_init: pthread_create");
- return(NULL);
- }
-
- unix_failure(read,p->from_lib[0], actual, sizeof(ptl_ni_limits_t),
- "tcp_init: read");
- unix_failure(read,p->from_lib[0], rc, sizeof(rc),
- "nal_init: read");
-
- if(*rc) return(NULL);
-
- initialized = 1;
- pthread_mutex_init(&p->mutex,0);
- pthread_cond_init(&p->cond, 0);
-
- return (&api_nal);
-}
-#endif
-
-ptl_nid_t tcpnal_mynid;
-
nal_t *procbridge_interface(int num_interface,
ptl_pt_index_t ptl_size,
ptl_ac_index_t acl_size,
ptl_pid_t requested_pid)
{
+ nal_init_args_t args;
procbridge p;
bridge b;
static int initialized=0;
ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
- int rc, nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
+ int nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
if(initialized) return (&api_nal);
api_nal.nal_data=b;
b->local=p;
- if(pipe(p->to_lib) || pipe(p->from_lib)) {
- perror("nal_init: pipe");
- return(NULL);
- }
-
if (ptl_size)
limits.max_ptable_index = ptl_size;
if (acl_size)
limits.max_atable_index = acl_size;
- unix_failure(write,p->to_lib[1], &requested_pid, sizeof(requested_pid),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &nal_type, sizeof(nal_type),
- "nal_init: write");
+ args.nia_requested_pid = requested_pid;
+ args.nia_limits = &limits;
+ args.nia_nal_type = nal_type;
+ args.nia_bridge = b;
- if(pthread_create(&p->t, NULL, nal_thread, b)) {
+ /* init procbridge */
+ pthread_mutex_init(&p->mutex,0);
+ pthread_cond_init(&p->cond, 0);
+ p->nal_flags = 0;
+ pthread_mutex_init(&p->nal_cb_lock, 0);
+
+ if (pthread_create(&p->t, NULL, nal_thread, &args)) {
perror("nal_init: pthread_create");
return(NULL);
}
- unix_failure(read,p->from_lib[0], &rc, sizeof(rc),
- "nal_init: read");
-
- if(rc) return(NULL);
+ do {
+ pthread_mutex_lock(&p->mutex);
+ if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) {
+ pthread_mutex_unlock(&p->mutex);
+ break;
+ }
+ pthread_cond_wait(&p->cond, &p->mutex);
+ pthread_mutex_unlock(&p->mutex);
+ } while (1);
+
+ if (p->nal_flags & NAL_FLAG_STOPPED)
+ return (NULL);
b->nal_cb->ni.nid = tcpnal_mynid;
initialized = 1;
- pthread_mutex_init(&p->mutex,0);
- pthread_cond_init(&p->cond, 0);
return (&api_nal);
}
-#undef unix_failure
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Portals, http://www.sf.net/projects/sandiaportals/
*/
#include <ipmap.h>
+#define NAL_FLAG_RUNNING 1
+#define NAL_FLAG_STOPPING 2
+#define NAL_FLAG_STOPPED 4
+
typedef struct procbridge {
+ /* sync between user threads and nal thread */
pthread_t t;
pthread_cond_t cond;
pthread_mutex_t mutex;
- int to_lib[2];
- int from_lib[2];
+
+ int nal_flags;
+
+ pthread_mutex_t nal_cb_lock;
} *procbridge;
+typedef struct nal_init_args {
+ ptl_pid_t nia_requested_pid;
+ ptl_ni_limits_t *nia_limits;
+ int nia_nal_type;
+ bridge nia_bridge;
+} nal_init_args_t;
+
extern void *nal_thread(void *);
extern void set_address(bridge t,ptl_pid_t pidrequest);
extern nal_t *procbridge_interface(int num_interface,
- ptl_pt_index_t ptl_size,
- ptl_ac_index_t acl_size,
- ptl_pid_t requested_pid);
+ ptl_pt_index_t ptl_size,
+ ptl_ac_index_t acl_size,
+ ptl_pid_t requested_pid);
#endif
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
-#include <syscall.h>
#include <procbridge.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h>
#include <timer.h>
-//#include <util/pqtimer.h>
#include <dispatch.h>
/* the following functions are stubs to satisfy the nal definition
static void nal_cli(nal_cb_t *nal,
unsigned long *flags)
{
+ bridge b = (bridge) nal->nal_data;
+ procbridge p = (procbridge) b->local;
+
+ pthread_mutex_lock(&p->nal_cb_lock);
}
static void nal_sti(nal_cb_t *nal,
unsigned long *flags)
{
+ bridge b = (bridge)nal->nal_data;
+ procbridge p = (procbridge) b->local;
+
+ pthread_mutex_unlock(&p->nal_cb_lock);
}
{
return 0;
}
-
-
-
-/* Function: data_from_api
- * Arguments: t: the nal state for this interface
- * Returns: whether to continue reading from the pipe
- *
- * data_from_api() reads data from the api side in response
- * to a select.
- *
- * We define data_failure() for syntactic convenience
- * of unix error reporting.
- */
-
-#define data_failure(operand,fd,buffer,length)\
- if(syscall(SYS_##operand,fd,buffer,length)!=length){\
- lib_fini(b->nal_cb);\
- return(0);\
- }
-static int data_from_api(void *arg)
-{
- bridge b = arg;
- procbridge p=(procbridge)b->local;
- /* where are these two sizes derived from ??*/
- char arg_block[ 256 ];
- char ret_block[ 128 ];
- ptl_size_t arg_len,ret_len;
- int fd=p->to_lib[0];
- int index;
-
- data_failure(read,fd, &index, sizeof(index));
-
- if (index==PTL_FINI) {
- lib_fini(b->nal_cb);
- if (b->shutdown) (*b->shutdown)(b);
- syscall(SYS_write, p->from_lib[1],&b->alive,sizeof(b->alive));
-
- /* a heavy-handed but convenient way of shutting down
- the lower side thread */
- pthread_exit(0);
- }
-
- data_failure(read,fd, &arg_len, sizeof(arg_len));
- data_failure(read,fd, &ret_len, sizeof(ret_len));
- data_failure(read,fd, arg_block, arg_len);
-
- lib_dispatch(b->nal_cb, NULL, index, arg_block, ret_block);
-
- data_failure(write,p->from_lib[1],ret_block, ret_len);
- return(1);
-}
-#undef data_failure
-
-
static void wakeup_topside(void *z)
{
- bridge b=z;
- procbridge p=b->local;
+ bridge b = z;
+ procbridge p = b->local;
+ int stop;
pthread_mutex_lock(&p->mutex);
+ stop = p->nal_flags & NAL_FLAG_STOPPING;
+ if (stop)
+ p->nal_flags |= NAL_FLAG_STOPPED;
pthread_cond_broadcast(&p->cond);
pthread_mutex_unlock(&p->mutex);
+
+ if (stop)
+ pthread_exit(0);
}
void *nal_thread(void *z)
{
- bridge b=z;
+ nal_init_args_t *args = (nal_init_args_t *) z;
+ bridge b = args->nia_bridge;
procbridge p=b->local;
int rc;
ptl_pid_t pid_request;
b->nal_cb->cb_sti=nal_sti;
b->nal_cb->cb_dist=nal_dist;
-
- register_io_handler(p->to_lib[0],READ_HANDLER,data_from_api,(void *)b);
-
- if(!(rc = syscall(SYS_read, p->to_lib[0], &pid_request, sizeof(pid_request))))
- perror("procbridge read from api");
- if(!(rc = syscall(SYS_read, p->to_lib[0], &desired, sizeof(ptl_ni_limits_t))))
- perror("procbridge read from api");
- if(!(rc = syscall(SYS_read, p->to_lib[0], &nal_type, sizeof(nal_type))))
- perror("procbridge read from api");
+ pid_request = args->nia_requested_pid;
+ desired = *args->nia_limits;
+ nal_type = args->nia_nal_type;
actual = desired;
LIMIT(desired.max_match_entries,actual.max_match_entries,MAX_MES);
* it is non-zero since something went wrong.
*/
/* this should perform error checking */
-#if 0
- write(p->from_lib[1], &actual, sizeof(ptl_ni_limits_t));
-#endif
- syscall(SYS_write, p->from_lib[1], &rc, sizeof(rc));
-
- if(!rc) {
+ pthread_mutex_lock(&p->mutex);
+ p->nal_flags |= rc ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING;
+ pthread_cond_broadcast(&p->cond);
+ pthread_mutex_unlock(&p->mutex);
+
+ if (!rc) {
/* the thunk function is called each time the timer loop
performs an operation and returns to blocking mode. we
overload this function to inform the api side that
return(0);
}
#undef LIMIT
-
static void set_flag(io_handler n,fd_set *fds)
{
- if (n->type & READ_HANDLER) FD_SET(n->fd,fds);
- if (n->type & WRITE_HANDLER) FD_SET(n->fd,fds+1);
- if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd,fds+2);
+ if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]);
+ if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]);
+ if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]);
}
timeout_pointer=&timeout;
} else timeout_pointer=0;
- FD_ZERO(fds);
- FD_ZERO(fds+1);
- FD_ZERO(fds+2);
+
+ /* FIXME
+ * temporarily add timer for endless waiting problem.
+ * FIXME
+ */
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
+ timeout_pointer=&timeout;
+
+ FD_ZERO(&fds[0]);
+ FD_ZERO(&fds[1]);
+ FD_ZERO(&fds[2]);
for (k=&io_handlers;*k;){
if ((*k)->disabled){
j=*k;
k=&(*k)->next;
}
}
- result=select(FD_SETSIZE,fds,fds+1,fds+2,timeout_pointer);
+
+ result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer);
if (result > 0)
for (j=io_handlers;j;j=j->next){
if (!(j->disabled) &&
- ((FD_ISSET(j->fd,fds) && (j->type & READ_HANDLER)) ||
- (FD_ISSET(j->fd,fds+1) && (j->type & WRITE_HANDLER)) ||
- (FD_ISSET(j->fd,fds+2) && (j->type & EXCEPTION_HANDLER)))){
+ ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) ||
+ (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) ||
+ (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){
if (!(*j->function)(j->argument))
j->disabled=1;
}
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
-#include <syscall.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <portals/types.h>
+#include <portals/list.h>
+#include <portals/lib-types.h>
+#include <portals/socknal.h>
+#include <linux/kp30.h>
#include <connection.h>
+#include <pthread.h>
#include <errno.h>
-
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
/* global variable: acceptor port */
unsigned short tcpnal_acceptor_port = 988;
*/
static int compare_connection(void *arg1, void *arg2)
{
- connection c = arg1;
- unsigned int * id = arg2;
- return((c->ip==id[0]) && (c->port==id[1]));
+ connection c = arg1;
+ unsigned int * id = arg2;
+#if 0
+ return((c->ip==id[0]) && (c->port==id[1]));
+#else
+ /* CFS specific hacking */
+ return (c->ip == id[0]);
+#endif
}
*/
static unsigned int connection_key(unsigned int *id)
{
+#if 0
return(id[0]^id[1]);
+#else
+ /* CFS specific hacking */
+ return (unsigned int) id[0];
+#endif
}
unsigned char *dest,
int len)
{
- int offset=0,rc;
+ int offset = 0,rc;
- if (len){
+ if (len) {
do {
- if((rc=syscall(SYS_read, c->fd, dest+offset, len-offset))<=0){
- if (errno==EINTR) {
- rc=0;
+#ifndef __CYGWIN__
+ rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
+#else
+ rc = recv(c->fd, dest+offset, len-offset, 0);
+#endif
+ if (rc <= 0) {
+ if (errno == EINTR) {
+ rc = 0;
} else {
remove_connection(c);
- return(0);
+ return (0);
}
}
- offset+=rc;
- } while (offset<len);
+ offset += rc;
+ } while (offset < len);
}
- return(1);
+ return (1);
}
static int connection_input(void *d)
unsigned int nid=*((unsigned int *)&s.sin_addr);
/* cfs specific hack */
//unsigned short pid=s.sin_port;
+ pthread_mutex_lock(&m->conn_lock);
allocate_connection(m,htonl(nid),0/*pid*/,fd);
+ pthread_mutex_unlock(&m->conn_lock);
return(1);
}
+/* FIXME assuming little endian, cleanup!! */
+#define __cpu_to_le64(x) ((__u64)(x))
+#define __le64_to_cpu(x) ((__u64)(x))
+#define __cpu_to_le32(x) ((__u32)(x))
+#define __le32_to_cpu(x) ((__u32)(x))
+#define __cpu_to_le16(x) ((__u16)(x))
+#define __le16_to_cpu(x) ((__u16)(x))
+
+extern ptl_nid_t tcpnal_mynid;
+
+int
+tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
+{
+ int rc;
+ ptl_hdr_t hdr;
+ ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
+
+ LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
+
+ memset (&hdr, 0, sizeof (hdr));
+ hmv->magic = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
+ hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
+ hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
+
+ hdr.src_nid = __cpu_to_le64 (tcpnal_mynid);
+ hdr.type = __cpu_to_le32 (PTL_MSG_HELLO);
+
+ hdr.msg.hello.type = __cpu_to_le32 (type);
+ hdr.msg.hello.incarnation = 0;
+
+ /* Assume sufficient socket buffering for this message */
+ rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
+ if (rc <= 0) {
+ CERROR ("Error %d sending HELLO to %llx\n", rc, *nid);
+ return (rc);
+ }
+
+ rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
+ if (rc <= 0) {
+ CERROR ("Error %d reading HELLO from %llx\n", rc, *nid);
+ return (rc);
+ }
+
+ if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
+ CERROR ("Bad magic %#08x (%#08x expected) from %llx\n",
+ __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
+ return (-EPROTO);
+ }
+
+ if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
+ hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
+ CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
+ " from %llx\n",
+ __le16_to_cpu (hmv->version_major),
+ __le16_to_cpu (hmv->version_minor),
+ PORTALS_PROTO_VERSION_MAJOR,
+ PORTALS_PROTO_VERSION_MINOR,
+ *nid);
+ return (-EPROTO);
+ }
+
+#if (PORTALS_PROTO_VERSION_MAJOR != 0)
+# error "This code only understands protocol version 0.x"
+#endif
+ /* version 0 sends magic/version as the dest_nid of a 'hello' header,
+ * so read the rest of it in now... */
+
+ rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
+ if (rc <= 0) {
+ CERROR ("Error %d reading rest of HELLO hdr from %llx\n",
+ rc, *nid);
+ return (rc);
+ }
+
+ /* ...and check we got what we expected */
+ if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
+ hdr.payload_length != __cpu_to_le32 (0)) {
+ CERROR ("Expecting a HELLO hdr with 0 payload,"
+ " but got type %d with %d payload from %llx\n",
+ __le32_to_cpu (hdr.type),
+ __le32_to_cpu (hdr.payload_length), *nid);
+ return (-EPROTO);
+ }
+
+ if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
+ CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
+ return (-EPROTO);
+ }
+
+ if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */
+ *nid = __le64_to_cpu(hdr.src_nid);
+ } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
+ CERROR ("Connected to nid %llx, but expecting %llx\n",
+ __le64_to_cpu (hdr.src_nid), *nid);
+ return (-EPROTO);
+ }
+
+ return (0);
+}
/* Function: force_tcp_connection
* Arguments: t: tcpnal
unsigned int ip,
unsigned short port)
{
- connection c;
+ connection conn;
struct sockaddr_in addr;
unsigned int id[2];
port = tcpnal_acceptor_port;
- id[0]=ip;
- id[1]=port;
+ id[0] = ip;
+ id[1] = port;
- if (!(c=hash_table_find(m->connections,id))){
+ pthread_mutex_lock(&m->conn_lock);
+
+ conn = hash_table_find(m->connections, id);
+ if (!conn) {
int fd;
+ int option;
+ ptl_nid_t peernid = PTL_NID_ANY;
bzero((char *) &addr, sizeof(addr));
addr.sin_family = AF_INET;
perror("tcpnal socket failed");
exit(-1);
}
- if (connect(fd,
- (struct sockaddr *)&addr,
- sizeof(struct sockaddr_in)))
- {
- perror("tcpnal connect");
- return(0);
- }
- return(allocate_connection(m,ip,port,fd));
+ if (connect(fd, (struct sockaddr *)&addr,
+ sizeof(struct sockaddr_in))) {
+ perror("tcpnal connect");
+ return(0);
+ }
+
+#if 1
+ option = 1;
+ setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
+ option = 1<<20;
+ setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
+ option = 1<<20;
+ setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
+#endif
+
+ /* say hello */
+ if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, 0))
+ exit(-1);
+
+ conn = allocate_connection(m, ip, port, fd);
}
- return(c);
+
+ pthread_mutex_unlock(&m->conn_lock);
+ return (conn);
}
bzero((char *) &addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = 0;
- addr.sin_port = port;
-
+ addr.sin_port = htons(port);
+
if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
perror ("tcpnal bind");
return(0);
int (*input)(void *, void *),
void *a)
{
- manager m=(manager)malloc(sizeof(struct manager));
- m->connections=hash_create_table(compare_connection,connection_key);
- m->handler=input;
- m->handler_arg=a;
- if (bind_socket(m,pid)) return(m);
+ manager m = (manager)malloc(sizeof(struct manager));
+ m->connections = hash_create_table(compare_connection,connection_key);
+ m->handler = input;
+ m->handler_arg = a;
+ pthread_mutex_init(&m->conn_lock, 0);
+
+ if (bind_socket(m,pid))
+ return(m);
+
free(m);
return(0);
}
typedef struct manager {
table connections;
+ pthread_mutex_t conn_lock; /* protect connections table */
int bound;
io_handler bound_handler;
int (*handler)(void *, void *);
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
-#include <syscall.h>
#include <procbridge.h>
#include <pqtimer.h>
#include <dispatch.h>
* forwards a packaged api call from the 'api' side to the 'library'
* side, and collects the result
*/
-#define forward_failure(operand,fd,buffer,length)\
- if(syscall(SYS_##operand,fd,buffer,length)!=length){\
- lib_fini(b->nal_cb);\
- return(PTL_SEGV);\
- }
static int procbridge_forward(nal_t *n, int id, void *args, size_t args_len,
- void *ret, size_t ret_len)
+ void *ret, ptl_size_t ret_len)
{
- bridge b=(bridge)n->nal_data;
- procbridge p=(procbridge)b->local;
- int lib=p->to_lib[1];
- int k;
+ bridge b = (bridge) n->nal_data;
- forward_failure(write,lib, &id, sizeof(id));
- forward_failure(write,lib,&args_len, sizeof(args_len));
- forward_failure(write,lib,&ret_len, sizeof(ret_len));
- forward_failure(write,lib,args, args_len);
-
- do {
- k=syscall(SYS_read, p->from_lib[0], ret, ret_len);
- } while ((k!=ret_len) && (errno += EINTR));
+ if (id == PTL_FINI) {
+ lib_fini(b->nal_cb);
- if(k!=ret_len){
- perror("nal: read return block");
- return PTL_SEGV;
+ if (b->shutdown)
+ (*b->shutdown)(b);
}
+
+ lib_dispatch(b->nal_cb, NULL, id, args, ret);
+
return (PTL_OK);
}
-#undef forward_failure
/* Function: shutdown
{
bridge b=(bridge)n->nal_data;
procbridge p=(procbridge)b->local;
- int code=PTL_FINI;
- syscall(SYS_write, p->to_lib[1],&code,sizeof(code));
- syscall(SYS_read, p->from_lib[0],&code,sizeof(code));
+ p->nal_flags |= NAL_FLAG_STOPPING;
- syscall(SYS_close, p->to_lib[0]);
- syscall(SYS_close, p->to_lib[1]);
- syscall(SYS_close, p->from_lib[0]);
- syscall(SYS_close, p->from_lib[1]);
+ do {
+ pthread_mutex_lock(&p->mutex);
+ if (p->nal_flags & NAL_FLAG_STOPPED) {
+ pthread_mutex_unlock(&p->mutex);
+ break;
+ }
+ pthread_cond_wait(&p->cond, &p->mutex);
+ pthread_mutex_unlock(&p->mutex);
+ } while (1);
free(p);
return(0);
unlock: procbridge_unlock
};
-/* Function: bridge_init
+ptl_nid_t tcpnal_mynid;
+
+/* Function: procbridge_interface
*
* Arguments: pid: requested process id (port offset)
* PTL_ID_ANY not supported.
* initializes the tcp nal. we define unix_failure as an
* error wrapper to cut down clutter.
*/
-#define unix_failure(operand,fd,buffer,length,text)\
- if(syscall(SYS_##operand,fd,buffer,length)!=length){\
- perror(text);\
- return(NULL);\
- }
-#if 0
-static nal_t *bridge_init(ptl_interface_t nal,
- ptl_pid_t pid_request,
- ptl_ni_limits_t *desired,
- ptl_ni_limits_t *actual,
- int *rc)
-{
- procbridge p;
- bridge b;
- static int initialized=0;
- ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
-
- if(initialized) return (&api_nal);
-
- init_unix_timer();
-
- b=(bridge)malloc(sizeof(struct bridge));
- p=(procbridge)malloc(sizeof(struct procbridge));
- api_nal.nal_data=b;
- b->local=p;
-
- if(pipe(p->to_lib) || pipe(p->from_lib)) {
- perror("nal_init: pipe");
- return(NULL);
- }
-
- if (desired) limits = *desired;
- unix_failure(write,p->to_lib[1], &pid_request, sizeof(pid_request),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &nal, sizeof(ptl_interface_t),
- "nal_init: write");
-
- if(pthread_create(&p->t, NULL, nal_thread, b)) {
- perror("nal_init: pthread_create");
- return(NULL);
- }
-
- unix_failure(read,p->from_lib[0], actual, sizeof(ptl_ni_limits_t),
- "tcp_init: read");
- unix_failure(read,p->from_lib[0], rc, sizeof(rc),
- "nal_init: read");
-
- if(*rc) return(NULL);
-
- initialized = 1;
- pthread_mutex_init(&p->mutex,0);
- pthread_cond_init(&p->cond, 0);
-
- return (&api_nal);
-}
-#endif
-
-ptl_nid_t tcpnal_mynid;
-
nal_t *procbridge_interface(int num_interface,
ptl_pt_index_t ptl_size,
ptl_ac_index_t acl_size,
ptl_pid_t requested_pid)
{
+ nal_init_args_t args;
procbridge p;
bridge b;
static int initialized=0;
ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
- int rc, nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
+ int nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
if(initialized) return (&api_nal);
api_nal.nal_data=b;
b->local=p;
- if(pipe(p->to_lib) || pipe(p->from_lib)) {
- perror("nal_init: pipe");
- return(NULL);
- }
-
if (ptl_size)
limits.max_ptable_index = ptl_size;
if (acl_size)
limits.max_atable_index = acl_size;
- unix_failure(write,p->to_lib[1], &requested_pid, sizeof(requested_pid),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &nal_type, sizeof(nal_type),
- "nal_init: write");
+ args.nia_requested_pid = requested_pid;
+ args.nia_limits = &limits;
+ args.nia_nal_type = nal_type;
+ args.nia_bridge = b;
- if(pthread_create(&p->t, NULL, nal_thread, b)) {
+ /* init procbridge */
+ pthread_mutex_init(&p->mutex,0);
+ pthread_cond_init(&p->cond, 0);
+ p->nal_flags = 0;
+ pthread_mutex_init(&p->nal_cb_lock, 0);
+
+ if (pthread_create(&p->t, NULL, nal_thread, &args)) {
perror("nal_init: pthread_create");
return(NULL);
}
- unix_failure(read,p->from_lib[0], &rc, sizeof(rc),
- "nal_init: read");
-
- if(rc) return(NULL);
+ do {
+ pthread_mutex_lock(&p->mutex);
+ if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) {
+ pthread_mutex_unlock(&p->mutex);
+ break;
+ }
+ pthread_cond_wait(&p->cond, &p->mutex);
+ pthread_mutex_unlock(&p->mutex);
+ } while (1);
+
+ if (p->nal_flags & NAL_FLAG_STOPPED)
+ return (NULL);
b->nal_cb->ni.nid = tcpnal_mynid;
initialized = 1;
- pthread_mutex_init(&p->mutex,0);
- pthread_cond_init(&p->cond, 0);
return (&api_nal);
}
-#undef unix_failure
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Portals, http://www.sf.net/projects/sandiaportals/
*/
#include <ipmap.h>
+#define NAL_FLAG_RUNNING 1
+#define NAL_FLAG_STOPPING 2
+#define NAL_FLAG_STOPPED 4
+
typedef struct procbridge {
+ /* sync between user threads and nal thread */
pthread_t t;
pthread_cond_t cond;
pthread_mutex_t mutex;
- int to_lib[2];
- int from_lib[2];
+
+ int nal_flags;
+
+ pthread_mutex_t nal_cb_lock;
} *procbridge;
+typedef struct nal_init_args {
+ ptl_pid_t nia_requested_pid;
+ ptl_ni_limits_t *nia_limits;
+ int nia_nal_type;
+ bridge nia_bridge;
+} nal_init_args_t;
+
extern void *nal_thread(void *);
extern void set_address(bridge t,ptl_pid_t pidrequest);
extern nal_t *procbridge_interface(int num_interface,
- ptl_pt_index_t ptl_size,
- ptl_ac_index_t acl_size,
- ptl_pid_t requested_pid);
+ ptl_pt_index_t ptl_size,
+ ptl_ac_index_t acl_size,
+ ptl_pid_t requested_pid);
#endif
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
-#include <syscall.h>
#include <procbridge.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h>
#include <timer.h>
-//#include <util/pqtimer.h>
#include <dispatch.h>
/* the following functions are stubs to satisfy the nal definition
static void nal_cli(nal_cb_t *nal,
unsigned long *flags)
{
+ bridge b = (bridge) nal->nal_data;
+ procbridge p = (procbridge) b->local;
+
+ pthread_mutex_lock(&p->nal_cb_lock);
}
static void nal_sti(nal_cb_t *nal,
unsigned long *flags)
{
+ bridge b = (bridge)nal->nal_data;
+ procbridge p = (procbridge) b->local;
+
+ pthread_mutex_unlock(&p->nal_cb_lock);
}
{
return 0;
}
-
-
-
-/* Function: data_from_api
- * Arguments: t: the nal state for this interface
- * Returns: whether to continue reading from the pipe
- *
- * data_from_api() reads data from the api side in response
- * to a select.
- *
- * We define data_failure() for syntactic convenience
- * of unix error reporting.
- */
-
-#define data_failure(operand,fd,buffer,length)\
- if(syscall(SYS_##operand,fd,buffer,length)!=length){\
- lib_fini(b->nal_cb);\
- return(0);\
- }
-static int data_from_api(void *arg)
-{
- bridge b = arg;
- procbridge p=(procbridge)b->local;
- /* where are these two sizes derived from ??*/
- char arg_block[ 256 ];
- char ret_block[ 128 ];
- ptl_size_t arg_len,ret_len;
- int fd=p->to_lib[0];
- int index;
-
- data_failure(read,fd, &index, sizeof(index));
-
- if (index==PTL_FINI) {
- lib_fini(b->nal_cb);
- if (b->shutdown) (*b->shutdown)(b);
- syscall(SYS_write, p->from_lib[1],&b->alive,sizeof(b->alive));
-
- /* a heavy-handed but convenient way of shutting down
- the lower side thread */
- pthread_exit(0);
- }
-
- data_failure(read,fd, &arg_len, sizeof(arg_len));
- data_failure(read,fd, &ret_len, sizeof(ret_len));
- data_failure(read,fd, arg_block, arg_len);
-
- lib_dispatch(b->nal_cb, NULL, index, arg_block, ret_block);
-
- data_failure(write,p->from_lib[1],ret_block, ret_len);
- return(1);
-}
-#undef data_failure
-
-
static void wakeup_topside(void *z)
{
- bridge b=z;
- procbridge p=b->local;
+ bridge b = z;
+ procbridge p = b->local;
+ int stop;
pthread_mutex_lock(&p->mutex);
+ stop = p->nal_flags & NAL_FLAG_STOPPING;
+ if (stop)
+ p->nal_flags |= NAL_FLAG_STOPPED;
pthread_cond_broadcast(&p->cond);
pthread_mutex_unlock(&p->mutex);
+
+ if (stop)
+ pthread_exit(0);
}
void *nal_thread(void *z)
{
- bridge b=z;
+ nal_init_args_t *args = (nal_init_args_t *) z;
+ bridge b = args->nia_bridge;
procbridge p=b->local;
int rc;
ptl_pid_t pid_request;
b->nal_cb->cb_sti=nal_sti;
b->nal_cb->cb_dist=nal_dist;
-
- register_io_handler(p->to_lib[0],READ_HANDLER,data_from_api,(void *)b);
-
- if(!(rc = syscall(SYS_read, p->to_lib[0], &pid_request, sizeof(pid_request))))
- perror("procbridge read from api");
- if(!(rc = syscall(SYS_read, p->to_lib[0], &desired, sizeof(ptl_ni_limits_t))))
- perror("procbridge read from api");
- if(!(rc = syscall(SYS_read, p->to_lib[0], &nal_type, sizeof(nal_type))))
- perror("procbridge read from api");
+ pid_request = args->nia_requested_pid;
+ desired = *args->nia_limits;
+ nal_type = args->nia_nal_type;
actual = desired;
LIMIT(desired.max_match_entries,actual.max_match_entries,MAX_MES);
* it is non-zero since something went wrong.
*/
/* this should perform error checking */
-#if 0
- write(p->from_lib[1], &actual, sizeof(ptl_ni_limits_t));
-#endif
- syscall(SYS_write, p->from_lib[1], &rc, sizeof(rc));
-
- if(!rc) {
+ pthread_mutex_lock(&p->mutex);
+ p->nal_flags |= rc ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING;
+ pthread_cond_broadcast(&p->cond);
+ pthread_mutex_unlock(&p->mutex);
+
+ if (!rc) {
/* the thunk function is called each time the timer loop
performs an operation and returns to blocking mode. we
overload this function to inform the api side that
return(0);
}
#undef LIMIT
-
static void set_flag(io_handler n,fd_set *fds)
{
- if (n->type & READ_HANDLER) FD_SET(n->fd,fds);
- if (n->type & WRITE_HANDLER) FD_SET(n->fd,fds+1);
- if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd,fds+2);
+ if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]);
+ if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]);
+ if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]);
}
timeout_pointer=&timeout;
} else timeout_pointer=0;
- FD_ZERO(fds);
- FD_ZERO(fds+1);
- FD_ZERO(fds+2);
+
+ /* FIXME
+ * temporarily add timer for endless waiting problem.
+ * FIXME
+ */
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
+ timeout_pointer=&timeout;
+
+ FD_ZERO(&fds[0]);
+ FD_ZERO(&fds[1]);
+ FD_ZERO(&fds[2]);
for (k=&io_handlers;*k;){
if ((*k)->disabled){
j=*k;
k=&(*k)->next;
}
}
- result=select(FD_SETSIZE,fds,fds+1,fds+2,timeout_pointer);
+
+ result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer);
if (result > 0)
for (j=io_handlers;j;j=j->next){
if (!(j->disabled) &&
- ((FD_ISSET(j->fd,fds) && (j->type & READ_HANDLER)) ||
- (FD_ISSET(j->fd,fds+1) && (j->type & WRITE_HANDLER)) ||
- (FD_ISSET(j->fd,fds+2) && (j->type & EXCEPTION_HANDLER)))){
+ ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) ||
+ (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) ||
+ (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){
if (!(*j->function)(j->argument))
j->disabled=1;
}
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
-#include <syscall.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <bridge.h>
#include <ipmap.h>
#include <connection.h>
+#include <pthread.h>
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
/* Function: tcpnal_send
* Arguments: nal: pointer to my nal control block
*
* sends a packet to the peer, after insuring that a connection exists
*/
-#warning FIXME: "param 'type' is newly added, make use of it!!"
int tcpnal_send(nal_cb_t *n,
void *private,
lib_msg_t *cookie,
{
connection c;
bridge b=(bridge)n->nal_data;
- struct iovec tiov[2];
- int count = 1;
+ struct iovec tiov[257];
+ static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER;
+#ifdef __CYGWIN__
+ int i;
+#endif
if (!(c=force_tcp_connection((manager)b->lower,
PNAL_IP(nid,b),
LASSERT (niov <= 1);
if (len) syscall(SYS_write, c->fd,iov[0].iov_base,len);
#else
- LASSERT (niov <= 1);
+ LASSERT (niov <= 256);
tiov[0].iov_base = hdr;
tiov[0].iov_len = sizeof(ptl_hdr_t);
- if (len) {
- tiov[1].iov_base = iov[0].iov_base;
- tiov[1].iov_len = len;
- count++;
- }
+ if (niov > 0)
+ memcpy(&tiov[1], iov, niov * sizeof(struct iovec));
- syscall(SYS_writev, c->fd, tiov, count);
+ pthread_mutex_lock(&send_lock);
+#ifndef __CYGWIN__
+ syscall(SYS_writev, c->fd, tiov, niov+1);
+#else
+ for (i = 0; i <= niov; i++)
+ send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0);
+#endif
+ pthread_mutex_unlock(&send_lock);
#endif
lib_finalize(n, private, cookie);
size_t rlen)
{
- if (mlen) {
- LASSERT (niov <= 1);
- read_connection(private,iov[0].iov_base,mlen);
- lib_finalize(n, private, cookie);
- }
+ int i;
+
+ if (!niov)
+ goto finalize;
+
+ LASSERT(mlen);
+ LASSERT(rlen);
+ LASSERT(rlen >= mlen);
+
+ /* FIXME
+ * 1. Is this effecient enough? change to use readv() directly?
+ * 2. need check return from read_connection()
+ * - MeiJia
+ */
+ for (i = 0; i < niov; i++)
+ read_connection(private, iov[i].iov_base, iov[i].iov_len);
+
+finalize:
+ lib_finalize(n, private, cookie);
if (mlen!=rlen){
char *trash=malloc(rlen-mlen);
*/
static int from_connection(void *a, void *d)
{
- connection c = d;
- bridge b=a;
- ptl_hdr_t hdr;
-
- if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
- lib_parse(b->nal_cb, &hdr, c);
- return(1);
- }
- return(0);
+ connection c = d;
+ bridge b = a;
+ ptl_hdr_t hdr;
+
+ if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
+ lib_parse(b->nal_cb, &hdr, c);
+ return(1);
+ }
+ return(0);
}
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
-#include <syscall.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <bridge.h>
#include <ipmap.h>
#include <connection.h>
+#include <pthread.h>
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
/* Function: tcpnal_send
* Arguments: nal: pointer to my nal control block
*
* sends a packet to the peer, after insuring that a connection exists
*/
-#warning FIXME: "param 'type' is newly added, make use of it!!"
int tcpnal_send(nal_cb_t *n,
void *private,
lib_msg_t *cookie,
{
connection c;
bridge b=(bridge)n->nal_data;
- struct iovec tiov[2];
- int count = 1;
+ struct iovec tiov[257];
+ static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER;
+#ifdef __CYGWIN__
+ int i;
+#endif
if (!(c=force_tcp_connection((manager)b->lower,
PNAL_IP(nid,b),
LASSERT (niov <= 1);
if (len) syscall(SYS_write, c->fd,iov[0].iov_base,len);
#else
- LASSERT (niov <= 1);
+ LASSERT (niov <= 256);
tiov[0].iov_base = hdr;
tiov[0].iov_len = sizeof(ptl_hdr_t);
- if (len) {
- tiov[1].iov_base = iov[0].iov_base;
- tiov[1].iov_len = len;
- count++;
- }
+ if (niov > 0)
+ memcpy(&tiov[1], iov, niov * sizeof(struct iovec));
- syscall(SYS_writev, c->fd, tiov, count);
+ pthread_mutex_lock(&send_lock);
+#ifndef __CYGWIN__
+ syscall(SYS_writev, c->fd, tiov, niov+1);
+#else
+ for (i = 0; i <= niov; i++)
+ send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0);
+#endif
+ pthread_mutex_unlock(&send_lock);
#endif
lib_finalize(n, private, cookie);
size_t rlen)
{
- if (mlen) {
- LASSERT (niov <= 1);
- read_connection(private,iov[0].iov_base,mlen);
- lib_finalize(n, private, cookie);
- }
+ int i;
+
+ if (!niov)
+ goto finalize;
+
+ LASSERT(mlen);
+ LASSERT(rlen);
+ LASSERT(rlen >= mlen);
+
+ /* FIXME
+ * 1. Is this effecient enough? change to use readv() directly?
+ * 2. need check return from read_connection()
+ * - MeiJia
+ */
+ for (i = 0; i < niov; i++)
+ read_connection(private, iov[i].iov_base, iov[i].iov_len);
+
+finalize:
+ lib_finalize(n, private, cookie);
if (mlen!=rlen){
char *trash=malloc(rlen-mlen);
*/
static int from_connection(void *a, void *d)
{
- connection c = d;
- bridge b=a;
- ptl_hdr_t hdr;
-
- if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
- lib_parse(b->nal_cb, &hdr, c);
- return(1);
- }
- return(0);
+ connection c = d;
+ bridge b = a;
+ ptl_hdr_t hdr;
+
+ if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
+ lib_parse(b->nal_cb, &hdr, c);
+ return(1);
+ }
+ return(0);
}
COMPILE = $(CC) -Wall -g -I$(srcdir)/../include
LINK = $(CC) -o $@
-sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck gmnalnid
+
+if LIBLUSTRE
+tmp=
+else
+tmp=gmnalnid
+endif
+
+sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck $(tmp)
lib_LIBRARIES = libptlctl.a
acceptor_SOURCES = acceptor.c # -lefence
#include <errno.h>
#include <unistd.h>
#include <time.h>
-#include <syscall.h>
+#ifndef __CYGWIN__
+# include <syscall.h>
+#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/mman.h>
+
#define BUG() /* workaround for module.h includes */
#include <linux/version.h>
strerror(errno));
return -1;
}
-#ifndef SYS_fstat64
-#define __SYS_fstat__ SYS_fstat
+
+#ifndef __CYGWIN__
+# ifndef SYS_fstat64
+# define __SYS_fstat__ SYS_fstat
+# else
+# define __SYS_fstat__ SYS_fstat64
+# endif
+ rc = syscall(__SYS_fstat__, fd, &statbuf);
#else
-#define __SYS_fstat__ SYS_fstat64
+ rc = fstat(fd, &statbuf);
#endif
- rc = syscall(__SYS_fstat__, fd, &statbuf);
if (rc < 0) {
fprintf(stderr, "fstat failed: %s\n", strerror(errno));
goto out;
return 0;
}
-
int jt_dbg_modules(int argc, char **argv)
{
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
#include <linux/slab.h>
#include <linux/pagemap.h>
#include <asm/div64.h>
+#include <linux/seq_file.h>
#else
#include <liblustre.h>
#endif
#include <linux/obd_class.h>
#include <linux/obd_lov.h>
#include <linux/obd_ost.h>
-#include <linux/seq_file.h>
#include <linux/lprocfs_status.h>
#include "lov_internal.h"
lck->rpcl_it = NULL;
}
-#ifdef __KERNEL__
static inline void mdc_get_rpc_lock(struct mdc_rpc_lock *lck,
struct lookup_intent *it)
{
up(&lck->rpcl_sem);
}
}
-#endif
#define DEBUG_SUBSYSTEM S_MDS
#ifndef __KERNEL__
+# include <fcntl.h>
# include <liblustre.h>
#endif
#include <linux/lustre_idl.h>
#include <linux/lustre_mds.h>
#include "mdc_internal.h"
+#ifndef __KERNEL__
+/* some liblustre hackings here */
+#ifndef O_DIRECTORY
+#define O_DIRECTORY 0
+#endif
+#endif
+
void mdc_readdir_pack(struct ptlrpc_request *req, __u64 offset, __u32 size,
struct ll_fid *mdc_fid)
{
#define EXPORT_SYMTAB
#endif
+#ifdef __KERNEL__
#include <linux/fs.h>
+#else
+#include <liblustre.h>
+#endif
+
#include <linux/obd_class.h>
#include <linux/lustre_log.h>
#include <portals/list.h>
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_initialize);
-
+
int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
int count, struct llog_logid *logid)
{
# define EXPORT_SYMTAB
#endif
+#ifndef __KERNEL__
+#include <liblustre.h>
+#else
#include <linux/module.h>
#include <linux/obd_class.h>
#include <linux/lustre_idl.h>
+#endif
#ifdef __KERNEL__
#include <linux/fs.h>
extern spinlock_t stack_backtrace_lock;
char *portals_debug_dumpstack(void);
-char *portals_nid2str(int nal, ptl_nid_t nid, char *str);
void portals_run_upcall(char **argv);
void portals_run_lbug_upcall(char * file, const char *fn, const int line);
void portals_debug_dumplog(void);
# include <stdlib.h>
#ifndef __CYGWIN__
# include <stdint.h>
+#else
+# include <cygwin-ioctl.h>
#endif
# include <unistd.h>
# include <time.h>
getpid() , stack, ## a);
#endif
+/* support decl needed both by kernel and liblustre */
+char *portals_nid2str(int nal, ptl_nid_t nid, char *str);
+
#ifndef CURRENT_TIME
# define CURRENT_TIME time(0)
#endif
wh->wh_object_cookie == PTL_WIRE_HANDLE_NONE.wh_object_cookie);
}
-#ifdef __KERNEL__
#define state_lock(nal,flagsp) \
do { \
CDEBUG(D_PORTALS, "taking state lock\n"); \
CDEBUG(D_PORTALS, "releasing state lock\n"); \
nal->cb_sti(nal, flagsp); \
}
-#else
-/* not needed in user space until we thread there */
-#define state_lock(nal,flagsp) \
-do { \
- CDEBUG(D_PORTALS, "taking state lock\n"); \
- CDEBUG(D_PORTALS, "%p:%p\n", nal, flagsp); \
-} while (0)
-
-#define state_unlock(nal,flagsp) \
-{ \
- CDEBUG(D_PORTALS, "releasing state lock\n"); \
- CDEBUG(D_PORTALS, "%p:%p\n", nal, flagsp); \
-}
-#endif /* __KERNEL__ */
#ifndef PTL_USE_SLAB_CACHE
int ptl_set_cfg_record_cb(cfg_record_cb_t cb);
/* l_ioctl.c */
+typedef int (ioc_handler_t)(int dev_id, int opc, void *buf);
+void set_ioc_handler(ioc_handler_t *handler);
int register_ioc_dev(int dev_id, const char * dev_name);
void unregister_ioc_dev(int dev_id);
int set_ioctl_dump(char * file);
typedef unsigned PTL_SEQ_BASETYPE ptl_seq_t;
#define PTL_SEQ_GT(a,b) (((signed PTL_SEQ_BASETYPE)((a) - (b))) > 0)
+/* XXX
+ * cygwin need the pragma line, not clear if it's needed in other places.
+ * checking!!!
+ */
+#ifdef __CYGWIN__
+#pragma pack(push, 4)
+#endif
typedef struct {
ptl_event_kind_t type;
ptl_process_id_t initiator;
struct timeval arrival_time;
volatile ptl_seq_t sequence;
} ptl_event_t;
+#ifdef __CYGWIN__
+#pragma pop
+#endif
typedef enum {
PTL_ACK_REQ,
static void eq_timeout(int signal)
{
+ sigset_t set;
+
+ /* signal will be automatically disabled in sig handler,
+ * must enable it before long jump
+ */
+ sigemptyset(&set);
+ sigaddset(&set, SIGALRM);
+ sigprocmask(SIG_UNBLOCK, &set, NULL);
+
longjmp(eq_jumpbuf, -1);
}
int PtlEQWait_timeout(ptl_handle_eq_t eventq_in, ptl_event_t * event_out,
int timeout)
{
- static void (*prev) (int);
+ static void (*prev) (int) = NULL;
static int left_over;
time_t time_at_start;
int rc;
left_over = alarm(timeout);
prev = signal(SIGALRM, eq_timeout);
time_at_start = time(NULL);
- if (left_over < timeout)
+ if (left_over && left_over < timeout)
alarm(left_over);
rc = PtlEQWait(eventq_in, event_out);
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
-#include <syscall.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <portals/types.h>
+#include <portals/list.h>
+#include <portals/lib-types.h>
+#include <portals/socknal.h>
+#include <linux/kp30.h>
#include <connection.h>
+#include <pthread.h>
#include <errno.h>
-
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
/* global variable: acceptor port */
unsigned short tcpnal_acceptor_port = 988;
*/
static int compare_connection(void *arg1, void *arg2)
{
- connection c = arg1;
- unsigned int * id = arg2;
- return((c->ip==id[0]) && (c->port==id[1]));
+ connection c = arg1;
+ unsigned int * id = arg2;
+#if 0
+ return((c->ip==id[0]) && (c->port==id[1]));
+#else
+ /* CFS specific hacking */
+ return (c->ip == id[0]);
+#endif
}
*/
static unsigned int connection_key(unsigned int *id)
{
+#if 0
return(id[0]^id[1]);
+#else
+ /* CFS specific hacking */
+ return (unsigned int) id[0];
+#endif
}
unsigned char *dest,
int len)
{
- int offset=0,rc;
+ int offset = 0,rc;
- if (len){
+ if (len) {
do {
- if((rc=syscall(SYS_read, c->fd, dest+offset, len-offset))<=0){
- if (errno==EINTR) {
- rc=0;
+#ifndef __CYGWIN__
+ rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
+#else
+ rc = recv(c->fd, dest+offset, len-offset, 0);
+#endif
+ if (rc <= 0) {
+ if (errno == EINTR) {
+ rc = 0;
} else {
remove_connection(c);
- return(0);
+ return (0);
}
}
- offset+=rc;
- } while (offset<len);
+ offset += rc;
+ } while (offset < len);
}
- return(1);
+ return (1);
}
static int connection_input(void *d)
unsigned int nid=*((unsigned int *)&s.sin_addr);
/* cfs specific hack */
//unsigned short pid=s.sin_port;
+ pthread_mutex_lock(&m->conn_lock);
allocate_connection(m,htonl(nid),0/*pid*/,fd);
+ pthread_mutex_unlock(&m->conn_lock);
return(1);
}
+/* FIXME assuming little endian, cleanup!! */
+#define __cpu_to_le64(x) ((__u64)(x))
+#define __le64_to_cpu(x) ((__u64)(x))
+#define __cpu_to_le32(x) ((__u32)(x))
+#define __le32_to_cpu(x) ((__u32)(x))
+#define __cpu_to_le16(x) ((__u16)(x))
+#define __le16_to_cpu(x) ((__u16)(x))
+
+extern ptl_nid_t tcpnal_mynid;
+
+int
+tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
+{
+ int rc;
+ ptl_hdr_t hdr;
+ ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
+
+ LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
+
+ memset (&hdr, 0, sizeof (hdr));
+ hmv->magic = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
+ hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
+ hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
+
+ hdr.src_nid = __cpu_to_le64 (tcpnal_mynid);
+ hdr.type = __cpu_to_le32 (PTL_MSG_HELLO);
+
+ hdr.msg.hello.type = __cpu_to_le32 (type);
+ hdr.msg.hello.incarnation = 0;
+
+ /* Assume sufficient socket buffering for this message */
+ rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
+ if (rc <= 0) {
+ CERROR ("Error %d sending HELLO to %llx\n", rc, *nid);
+ return (rc);
+ }
+
+ rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
+ if (rc <= 0) {
+ CERROR ("Error %d reading HELLO from %llx\n", rc, *nid);
+ return (rc);
+ }
+
+ if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
+ CERROR ("Bad magic %#08x (%#08x expected) from %llx\n",
+ __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
+ return (-EPROTO);
+ }
+
+ if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
+ hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
+ CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
+ " from %llx\n",
+ __le16_to_cpu (hmv->version_major),
+ __le16_to_cpu (hmv->version_minor),
+ PORTALS_PROTO_VERSION_MAJOR,
+ PORTALS_PROTO_VERSION_MINOR,
+ *nid);
+ return (-EPROTO);
+ }
+
+#if (PORTALS_PROTO_VERSION_MAJOR != 0)
+# error "This code only understands protocol version 0.x"
+#endif
+ /* version 0 sends magic/version as the dest_nid of a 'hello' header,
+ * so read the rest of it in now... */
+
+ rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
+ if (rc <= 0) {
+ CERROR ("Error %d reading rest of HELLO hdr from %llx\n",
+ rc, *nid);
+ return (rc);
+ }
+
+ /* ...and check we got what we expected */
+ if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
+ hdr.payload_length != __cpu_to_le32 (0)) {
+ CERROR ("Expecting a HELLO hdr with 0 payload,"
+ " but got type %d with %d payload from %llx\n",
+ __le32_to_cpu (hdr.type),
+ __le32_to_cpu (hdr.payload_length), *nid);
+ return (-EPROTO);
+ }
+
+ if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
+ CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
+ return (-EPROTO);
+ }
+
+ if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */
+ *nid = __le64_to_cpu(hdr.src_nid);
+ } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
+ CERROR ("Connected to nid %llx, but expecting %llx\n",
+ __le64_to_cpu (hdr.src_nid), *nid);
+ return (-EPROTO);
+ }
+
+ return (0);
+}
/* Function: force_tcp_connection
* Arguments: t: tcpnal
unsigned int ip,
unsigned short port)
{
- connection c;
+ connection conn;
struct sockaddr_in addr;
unsigned int id[2];
port = tcpnal_acceptor_port;
- id[0]=ip;
- id[1]=port;
+ id[0] = ip;
+ id[1] = port;
- if (!(c=hash_table_find(m->connections,id))){
+ pthread_mutex_lock(&m->conn_lock);
+
+ conn = hash_table_find(m->connections, id);
+ if (!conn) {
int fd;
+ int option;
+ ptl_nid_t peernid = PTL_NID_ANY;
bzero((char *) &addr, sizeof(addr));
addr.sin_family = AF_INET;
perror("tcpnal socket failed");
exit(-1);
}
- if (connect(fd,
- (struct sockaddr *)&addr,
- sizeof(struct sockaddr_in)))
- {
- perror("tcpnal connect");
- return(0);
- }
- return(allocate_connection(m,ip,port,fd));
+ if (connect(fd, (struct sockaddr *)&addr,
+ sizeof(struct sockaddr_in))) {
+ perror("tcpnal connect");
+ return(0);
+ }
+
+#if 1
+ option = 1;
+ setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
+ option = 1<<20;
+ setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
+ option = 1<<20;
+ setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
+#endif
+
+ /* say hello */
+ if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, 0))
+ exit(-1);
+
+ conn = allocate_connection(m, ip, port, fd);
}
- return(c);
+
+ pthread_mutex_unlock(&m->conn_lock);
+ return (conn);
}
bzero((char *) &addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = 0;
- addr.sin_port = port;
-
+ addr.sin_port = htons(port);
+
if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
perror ("tcpnal bind");
return(0);
int (*input)(void *, void *),
void *a)
{
- manager m=(manager)malloc(sizeof(struct manager));
- m->connections=hash_create_table(compare_connection,connection_key);
- m->handler=input;
- m->handler_arg=a;
- if (bind_socket(m,pid)) return(m);
+ manager m = (manager)malloc(sizeof(struct manager));
+ m->connections = hash_create_table(compare_connection,connection_key);
+ m->handler = input;
+ m->handler_arg = a;
+ pthread_mutex_init(&m->conn_lock, 0);
+
+ if (bind_socket(m,pid))
+ return(m);
+
free(m);
return(0);
}
typedef struct manager {
table connections;
+ pthread_mutex_t conn_lock; /* protect connections table */
int bound;
io_handler bound_handler;
int (*handler)(void *, void *);
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
-#include <syscall.h>
#include <procbridge.h>
#include <pqtimer.h>
#include <dispatch.h>
* forwards a packaged api call from the 'api' side to the 'library'
* side, and collects the result
*/
-#define forward_failure(operand,fd,buffer,length)\
- if(syscall(SYS_##operand,fd,buffer,length)!=length){\
- lib_fini(b->nal_cb);\
- return(PTL_SEGV);\
- }
static int procbridge_forward(nal_t *n, int id, void *args, size_t args_len,
- void *ret, size_t ret_len)
+ void *ret, ptl_size_t ret_len)
{
- bridge b=(bridge)n->nal_data;
- procbridge p=(procbridge)b->local;
- int lib=p->to_lib[1];
- int k;
+ bridge b = (bridge) n->nal_data;
- forward_failure(write,lib, &id, sizeof(id));
- forward_failure(write,lib,&args_len, sizeof(args_len));
- forward_failure(write,lib,&ret_len, sizeof(ret_len));
- forward_failure(write,lib,args, args_len);
-
- do {
- k=syscall(SYS_read, p->from_lib[0], ret, ret_len);
- } while ((k!=ret_len) && (errno += EINTR));
+ if (id == PTL_FINI) {
+ lib_fini(b->nal_cb);
- if(k!=ret_len){
- perror("nal: read return block");
- return PTL_SEGV;
+ if (b->shutdown)
+ (*b->shutdown)(b);
}
+
+ lib_dispatch(b->nal_cb, NULL, id, args, ret);
+
return (PTL_OK);
}
-#undef forward_failure
/* Function: shutdown
{
bridge b=(bridge)n->nal_data;
procbridge p=(procbridge)b->local;
- int code=PTL_FINI;
- syscall(SYS_write, p->to_lib[1],&code,sizeof(code));
- syscall(SYS_read, p->from_lib[0],&code,sizeof(code));
+ p->nal_flags |= NAL_FLAG_STOPPING;
- syscall(SYS_close, p->to_lib[0]);
- syscall(SYS_close, p->to_lib[1]);
- syscall(SYS_close, p->from_lib[0]);
- syscall(SYS_close, p->from_lib[1]);
+ do {
+ pthread_mutex_lock(&p->mutex);
+ if (p->nal_flags & NAL_FLAG_STOPPED) {
+ pthread_mutex_unlock(&p->mutex);
+ break;
+ }
+ pthread_cond_wait(&p->cond, &p->mutex);
+ pthread_mutex_unlock(&p->mutex);
+ } while (1);
free(p);
return(0);
unlock: procbridge_unlock
};
-/* Function: bridge_init
+ptl_nid_t tcpnal_mynid;
+
+/* Function: procbridge_interface
*
* Arguments: pid: requested process id (port offset)
* PTL_ID_ANY not supported.
* initializes the tcp nal. we define unix_failure as an
* error wrapper to cut down clutter.
*/
-#define unix_failure(operand,fd,buffer,length,text)\
- if(syscall(SYS_##operand,fd,buffer,length)!=length){\
- perror(text);\
- return(NULL);\
- }
-#if 0
-static nal_t *bridge_init(ptl_interface_t nal,
- ptl_pid_t pid_request,
- ptl_ni_limits_t *desired,
- ptl_ni_limits_t *actual,
- int *rc)
-{
- procbridge p;
- bridge b;
- static int initialized=0;
- ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
-
- if(initialized) return (&api_nal);
-
- init_unix_timer();
-
- b=(bridge)malloc(sizeof(struct bridge));
- p=(procbridge)malloc(sizeof(struct procbridge));
- api_nal.nal_data=b;
- b->local=p;
-
- if(pipe(p->to_lib) || pipe(p->from_lib)) {
- perror("nal_init: pipe");
- return(NULL);
- }
-
- if (desired) limits = *desired;
- unix_failure(write,p->to_lib[1], &pid_request, sizeof(pid_request),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &nal, sizeof(ptl_interface_t),
- "nal_init: write");
-
- if(pthread_create(&p->t, NULL, nal_thread, b)) {
- perror("nal_init: pthread_create");
- return(NULL);
- }
-
- unix_failure(read,p->from_lib[0], actual, sizeof(ptl_ni_limits_t),
- "tcp_init: read");
- unix_failure(read,p->from_lib[0], rc, sizeof(rc),
- "nal_init: read");
-
- if(*rc) return(NULL);
-
- initialized = 1;
- pthread_mutex_init(&p->mutex,0);
- pthread_cond_init(&p->cond, 0);
-
- return (&api_nal);
-}
-#endif
-
-ptl_nid_t tcpnal_mynid;
-
nal_t *procbridge_interface(int num_interface,
ptl_pt_index_t ptl_size,
ptl_ac_index_t acl_size,
ptl_pid_t requested_pid)
{
+ nal_init_args_t args;
procbridge p;
bridge b;
static int initialized=0;
ptl_ni_limits_t limits = {-1,-1,-1,-1,-1};
- int rc, nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
+ int nal_type = PTL_IFACE_TCP;/* PTL_IFACE_DEFAULT FIXME hack */
if(initialized) return (&api_nal);
api_nal.nal_data=b;
b->local=p;
- if(pipe(p->to_lib) || pipe(p->from_lib)) {
- perror("nal_init: pipe");
- return(NULL);
- }
-
if (ptl_size)
limits.max_ptable_index = ptl_size;
if (acl_size)
limits.max_atable_index = acl_size;
- unix_failure(write,p->to_lib[1], &requested_pid, sizeof(requested_pid),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &limits, sizeof(ptl_ni_limits_t),
- "nal_init: write");
- unix_failure(write,p->to_lib[1], &nal_type, sizeof(nal_type),
- "nal_init: write");
+ args.nia_requested_pid = requested_pid;
+ args.nia_limits = &limits;
+ args.nia_nal_type = nal_type;
+ args.nia_bridge = b;
- if(pthread_create(&p->t, NULL, nal_thread, b)) {
+ /* init procbridge */
+ pthread_mutex_init(&p->mutex,0);
+ pthread_cond_init(&p->cond, 0);
+ p->nal_flags = 0;
+ pthread_mutex_init(&p->nal_cb_lock, 0);
+
+ if (pthread_create(&p->t, NULL, nal_thread, &args)) {
perror("nal_init: pthread_create");
return(NULL);
}
- unix_failure(read,p->from_lib[0], &rc, sizeof(rc),
- "nal_init: read");
-
- if(rc) return(NULL);
+ do {
+ pthread_mutex_lock(&p->mutex);
+ if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) {
+ pthread_mutex_unlock(&p->mutex);
+ break;
+ }
+ pthread_cond_wait(&p->cond, &p->mutex);
+ pthread_mutex_unlock(&p->mutex);
+ } while (1);
+
+ if (p->nal_flags & NAL_FLAG_STOPPED)
+ return (NULL);
b->nal_cb->ni.nid = tcpnal_mynid;
initialized = 1;
- pthread_mutex_init(&p->mutex,0);
- pthread_cond_init(&p->cond, 0);
return (&api_nal);
}
-#undef unix_failure
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Portals, http://www.sf.net/projects/sandiaportals/
*/
#include <ipmap.h>
+#define NAL_FLAG_RUNNING 1
+#define NAL_FLAG_STOPPING 2
+#define NAL_FLAG_STOPPED 4
+
typedef struct procbridge {
+ /* sync between user threads and nal thread */
pthread_t t;
pthread_cond_t cond;
pthread_mutex_t mutex;
- int to_lib[2];
- int from_lib[2];
+
+ int nal_flags;
+
+ pthread_mutex_t nal_cb_lock;
} *procbridge;
+typedef struct nal_init_args {
+ ptl_pid_t nia_requested_pid;
+ ptl_ni_limits_t *nia_limits;
+ int nia_nal_type;
+ bridge nia_bridge;
+} nal_init_args_t;
+
extern void *nal_thread(void *);
extern void set_address(bridge t,ptl_pid_t pidrequest);
extern nal_t *procbridge_interface(int num_interface,
- ptl_pt_index_t ptl_size,
- ptl_ac_index_t acl_size,
- ptl_pid_t requested_pid);
+ ptl_pt_index_t ptl_size,
+ ptl_ac_index_t acl_size,
+ ptl_pid_t requested_pid);
#endif
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
-#include <syscall.h>
#include <procbridge.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <errno.h>
#include <timer.h>
-//#include <util/pqtimer.h>
#include <dispatch.h>
/* the following functions are stubs to satisfy the nal definition
static void nal_cli(nal_cb_t *nal,
unsigned long *flags)
{
+ bridge b = (bridge) nal->nal_data;
+ procbridge p = (procbridge) b->local;
+
+ pthread_mutex_lock(&p->nal_cb_lock);
}
static void nal_sti(nal_cb_t *nal,
unsigned long *flags)
{
+ bridge b = (bridge)nal->nal_data;
+ procbridge p = (procbridge) b->local;
+
+ pthread_mutex_unlock(&p->nal_cb_lock);
}
{
return 0;
}
-
-
-
-/* Function: data_from_api
- * Arguments: t: the nal state for this interface
- * Returns: whether to continue reading from the pipe
- *
- * data_from_api() reads data from the api side in response
- * to a select.
- *
- * We define data_failure() for syntactic convenience
- * of unix error reporting.
- */
-
-#define data_failure(operand,fd,buffer,length)\
- if(syscall(SYS_##operand,fd,buffer,length)!=length){\
- lib_fini(b->nal_cb);\
- return(0);\
- }
-static int data_from_api(void *arg)
-{
- bridge b = arg;
- procbridge p=(procbridge)b->local;
- /* where are these two sizes derived from ??*/
- char arg_block[ 256 ];
- char ret_block[ 128 ];
- ptl_size_t arg_len,ret_len;
- int fd=p->to_lib[0];
- int index;
-
- data_failure(read,fd, &index, sizeof(index));
-
- if (index==PTL_FINI) {
- lib_fini(b->nal_cb);
- if (b->shutdown) (*b->shutdown)(b);
- syscall(SYS_write, p->from_lib[1],&b->alive,sizeof(b->alive));
-
- /* a heavy-handed but convenient way of shutting down
- the lower side thread */
- pthread_exit(0);
- }
-
- data_failure(read,fd, &arg_len, sizeof(arg_len));
- data_failure(read,fd, &ret_len, sizeof(ret_len));
- data_failure(read,fd, arg_block, arg_len);
-
- lib_dispatch(b->nal_cb, NULL, index, arg_block, ret_block);
-
- data_failure(write,p->from_lib[1],ret_block, ret_len);
- return(1);
-}
-#undef data_failure
-
-
static void wakeup_topside(void *z)
{
- bridge b=z;
- procbridge p=b->local;
+ bridge b = z;
+ procbridge p = b->local;
+ int stop;
pthread_mutex_lock(&p->mutex);
+ stop = p->nal_flags & NAL_FLAG_STOPPING;
+ if (stop)
+ p->nal_flags |= NAL_FLAG_STOPPED;
pthread_cond_broadcast(&p->cond);
pthread_mutex_unlock(&p->mutex);
+
+ if (stop)
+ pthread_exit(0);
}
void *nal_thread(void *z)
{
- bridge b=z;
+ nal_init_args_t *args = (nal_init_args_t *) z;
+ bridge b = args->nia_bridge;
procbridge p=b->local;
int rc;
ptl_pid_t pid_request;
b->nal_cb->cb_sti=nal_sti;
b->nal_cb->cb_dist=nal_dist;
-
- register_io_handler(p->to_lib[0],READ_HANDLER,data_from_api,(void *)b);
-
- if(!(rc = syscall(SYS_read, p->to_lib[0], &pid_request, sizeof(pid_request))))
- perror("procbridge read from api");
- if(!(rc = syscall(SYS_read, p->to_lib[0], &desired, sizeof(ptl_ni_limits_t))))
- perror("procbridge read from api");
- if(!(rc = syscall(SYS_read, p->to_lib[0], &nal_type, sizeof(nal_type))))
- perror("procbridge read from api");
+ pid_request = args->nia_requested_pid;
+ desired = *args->nia_limits;
+ nal_type = args->nia_nal_type;
actual = desired;
LIMIT(desired.max_match_entries,actual.max_match_entries,MAX_MES);
* it is non-zero since something went wrong.
*/
/* this should perform error checking */
-#if 0
- write(p->from_lib[1], &actual, sizeof(ptl_ni_limits_t));
-#endif
- syscall(SYS_write, p->from_lib[1], &rc, sizeof(rc));
-
- if(!rc) {
+ pthread_mutex_lock(&p->mutex);
+ p->nal_flags |= rc ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING;
+ pthread_cond_broadcast(&p->cond);
+ pthread_mutex_unlock(&p->mutex);
+
+ if (!rc) {
/* the thunk function is called each time the timer loop
performs an operation and returns to blocking mode. we
overload this function to inform the api side that
return(0);
}
#undef LIMIT
-
static void set_flag(io_handler n,fd_set *fds)
{
- if (n->type & READ_HANDLER) FD_SET(n->fd,fds);
- if (n->type & WRITE_HANDLER) FD_SET(n->fd,fds+1);
- if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd,fds+2);
+ if (n->type & READ_HANDLER) FD_SET(n->fd, &fds[0]);
+ if (n->type & WRITE_HANDLER) FD_SET(n->fd,&fds[1]);
+ if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, &fds[2]);
}
timeout_pointer=&timeout;
} else timeout_pointer=0;
- FD_ZERO(fds);
- FD_ZERO(fds+1);
- FD_ZERO(fds+2);
+
+ /* FIXME
+ * temporarily add timer for endless waiting problem.
+ * FIXME
+ */
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
+ timeout_pointer=&timeout;
+
+ FD_ZERO(&fds[0]);
+ FD_ZERO(&fds[1]);
+ FD_ZERO(&fds[2]);
for (k=&io_handlers;*k;){
if ((*k)->disabled){
j=*k;
k=&(*k)->next;
}
}
- result=select(FD_SETSIZE,fds,fds+1,fds+2,timeout_pointer);
+
+ result=select(FD_SETSIZE, &fds[0], &fds[1], &fds[2], timeout_pointer);
if (result > 0)
for (j=io_handlers;j;j=j->next){
if (!(j->disabled) &&
- ((FD_ISSET(j->fd,fds) && (j->type & READ_HANDLER)) ||
- (FD_ISSET(j->fd,fds+1) && (j->type & WRITE_HANDLER)) ||
- (FD_ISSET(j->fd,fds+2) && (j->type & EXCEPTION_HANDLER)))){
+ ((FD_ISSET(j->fd, &fds[0]) && (j->type & READ_HANDLER)) ||
+ (FD_ISSET(j->fd, &fds[1]) && (j->type & WRITE_HANDLER)) ||
+ (FD_ISSET(j->fd, &fds[2]) && (j->type & EXCEPTION_HANDLER)))){
if (!(*j->function)(j->argument))
j->disabled=1;
}
* vim:expandtab:shiftwidth=8:tabstop=8:
*
* Copyright (c) 2002 Cray Inc.
+ * Copyright (c) 2003 Cluster File Systems, Inc.
*
* This file is part of Lustre, http://www.lustre.org.
*
#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
-#include <syscall.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <bridge.h>
#include <ipmap.h>
#include <connection.h>
+#include <pthread.h>
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
/* Function: tcpnal_send
* Arguments: nal: pointer to my nal control block
*
* sends a packet to the peer, after insuring that a connection exists
*/
-#warning FIXME: "param 'type' is newly added, make use of it!!"
int tcpnal_send(nal_cb_t *n,
void *private,
lib_msg_t *cookie,
{
connection c;
bridge b=(bridge)n->nal_data;
- struct iovec tiov[2];
- int count = 1;
+ struct iovec tiov[257];
+ static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER;
+#ifdef __CYGWIN__
+ int i;
+#endif
if (!(c=force_tcp_connection((manager)b->lower,
PNAL_IP(nid,b),
LASSERT (niov <= 1);
if (len) syscall(SYS_write, c->fd,iov[0].iov_base,len);
#else
- LASSERT (niov <= 1);
+ LASSERT (niov <= 256);
tiov[0].iov_base = hdr;
tiov[0].iov_len = sizeof(ptl_hdr_t);
- if (len) {
- tiov[1].iov_base = iov[0].iov_base;
- tiov[1].iov_len = len;
- count++;
- }
+ if (niov > 0)
+ memcpy(&tiov[1], iov, niov * sizeof(struct iovec));
- syscall(SYS_writev, c->fd, tiov, count);
+ pthread_mutex_lock(&send_lock);
+#ifndef __CYGWIN__
+ syscall(SYS_writev, c->fd, tiov, niov+1);
+#else
+ for (i = 0; i <= niov; i++)
+ send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0);
+#endif
+ pthread_mutex_unlock(&send_lock);
#endif
lib_finalize(n, private, cookie);
size_t rlen)
{
- if (mlen) {
- LASSERT (niov <= 1);
- read_connection(private,iov[0].iov_base,mlen);
- lib_finalize(n, private, cookie);
- }
+ int i;
+
+ if (!niov)
+ goto finalize;
+
+ LASSERT(mlen);
+ LASSERT(rlen);
+ LASSERT(rlen >= mlen);
+
+ /* FIXME
+ * 1. Is this effecient enough? change to use readv() directly?
+ * 2. need check return from read_connection()
+ * - MeiJia
+ */
+ for (i = 0; i < niov; i++)
+ read_connection(private, iov[i].iov_base, iov[i].iov_len);
+
+finalize:
+ lib_finalize(n, private, cookie);
if (mlen!=rlen){
char *trash=malloc(rlen-mlen);
*/
static int from_connection(void *a, void *d)
{
- connection c = d;
- bridge b=a;
- ptl_hdr_t hdr;
-
- if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
- lib_parse(b->nal_cb, &hdr, c);
- return(1);
- }
- return(0);
+ connection c = d;
+ bridge b = a;
+ ptl_hdr_t hdr;
+
+ if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))){
+ lib_parse(b->nal_cb, &hdr, c);
+ return(1);
+ }
+ return(0);
}
COMPILE = $(CC) -Wall -g -I$(srcdir)/../include
LINK = $(CC) -o $@
-sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck gmnalnid
+
+if LIBLUSTRE
+tmp=
+else
+tmp=gmnalnid
+endif
+
+sbin_PROGRAMS = acceptor ptlctl debugctl routerstat wirecheck $(tmp)
lib_LIBRARIES = libptlctl.a
acceptor_SOURCES = acceptor.c # -lefence
#include <errno.h>
#include <unistd.h>
#include <time.h>
-#include <syscall.h>
+#ifndef __CYGWIN__
+# include <syscall.h>
+#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/mman.h>
+
#define BUG() /* workaround for module.h includes */
#include <linux/version.h>
strerror(errno));
return -1;
}
-#ifndef SYS_fstat64
-#define __SYS_fstat__ SYS_fstat
+
+#ifndef __CYGWIN__
+# ifndef SYS_fstat64
+# define __SYS_fstat__ SYS_fstat
+# else
+# define __SYS_fstat__ SYS_fstat64
+# endif
+ rc = syscall(__SYS_fstat__, fd, &statbuf);
#else
-#define __SYS_fstat__ SYS_fstat64
+ rc = fstat(fd, &statbuf);
#endif
- rc = syscall(__SYS_fstat__, fd, &statbuf);
if (rc < 0) {
fprintf(stderr, "fstat failed: %s\n", strerror(errno));
goto out;
return 0;
}
-
int jt_dbg_modules(int argc, char **argv)
{
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
#define EXPORT_SYMTAB
#endif
+#ifdef __KERNEL__
#include <linux/fs.h>
+#else
+#include <liblustre.h>
+#endif
+
#include <linux/obd_class.h>
#include <linux/lustre_log.h>
#include <linux/lustre_net.h>
#define EXPORT_SYMTAB
#endif
+#ifdef __KERNEL__
#include <linux/fs.h>
+#else
+#include <liblustre.h>
+#endif
+
#include <linux/obd_class.h>
#include <linux/lustre_log.h>
#include <portals/list.h>
#include <linux/lvfs.h>
+#ifdef __KERNEL__
int llog_origin_connect(struct llog_ctxt *ctxt, int count,
struct llog_logid *logid,
struct llog_ctxt_gen *gen)
RETURN(0);
}
EXPORT_SYMBOL(llog_initiator_connect);
+
+#else /* !__KERNEL__ */
+
+int llog_origin_connect(struct llog_ctxt *ctxt, int count,
+ struct llog_logid *logid,
+ struct llog_ctxt_gen *gen)
+{
+ return 0;
+}
+
+int llog_initiator_connect(struct llog_ctxt *ctxt)
+{
+ return 0;
+}
+#endif
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
+#ifndef __KERNEL__
+#include <liblustre.h>
+#else
#include <linux/version.h>
#include <asm/semaphore.h>
-
#define DEBUG_SUBSYSTEM S_RPC
+#endif
+
#include <linux/obd_support.h>
#include <linux/obd_class.h>
#include "ptlrpc_internal.h"
+#ifdef __KERNEL__
+
static struct ptlrpc_thread *pinger_thread = NULL;
static DECLARE_MUTEX(pinger_sem);
static struct list_head pinger_imports = LIST_HEAD_INIT(pinger_imports);
up(&pinger_sem);
RETURN(0);
}
+
+#else /* !__KERNEL__ */
+
+int ptlrpc_start_pinger(void)
+{
+ return 0;
+}
+
+int ptlrpc_stop_pinger(void)
+{
+ return 0;
+}
+
+int ptlrpc_pinger_add_import(struct obd_import *imp)
+{
+ return 0;
+}
+
+int ptlrpc_pinger_del_import(struct obd_import *imp)
+{
+ return 0;
+}
+
+void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
+{
+}
+
+#endif
rc = Parser_commands();
}
- obd_cleanup(argc, argv);
+ obd_finalize(argc, argv);
return rc;
}