From 19205dfea419bac6f3bc58ed1b579b8caf79b895 Mon Sep 17 00:00:00 2001 From: maxim Date: Fri, 28 Sep 2007 21:46:35 +0000 Subject: [PATCH] b=12302 i=liang i=isaac - Landing b_usocklnd on HEAD Bugzilla : 12302 Description: new userspace socklnd Details : Old userspace tcpnal that resided in lnet/ulnds/socklnd replaced with new one - usocklnd. --- lnet/ChangeLog | 6 + lnet/include/libcfs/Makefile.am | 3 +- lnet/include/libcfs/libcfs.h | 7 +- lnet/include/libcfs/linux/libcfs.h | 12 +- lnet/include/libcfs/linux/linux-tcpip.h | 6 +- lnet/include/libcfs/user-lock.h | 34 + lnet/include/libcfs/user-prim.h | 18 + lnet/include/libcfs/user-tcpip.h | 90 +++ lnet/include/lnet/api.h | 8 + lnet/include/lnet/lib-lnet.h | 5 + lnet/include/lnet/lib-types.h | 13 + lnet/include/lnet/socklnd.h | 4 + lnet/klnds/socklnd/socklnd.h | 1 - lnet/libcfs/autoMakefile.am | 5 +- lnet/libcfs/user-lock.c | 106 +++ lnet/libcfs/user-prim.c | 63 ++ lnet/libcfs/user-tcpip.c | 603 +++++++++++++++++ lnet/lnet/acceptor.c | 321 ++++++++- lnet/lnet/api-ni.c | 27 +- lnet/selftest/rpc.c | 2 +- lnet/ulnds/socklnd/Makefile.am | 8 +- lnet/ulnds/socklnd/README | 53 -- lnet/ulnds/socklnd/bridge.h | 23 - lnet/ulnds/socklnd/conn.c | 1081 +++++++++++++++++++++++++++++++ lnet/ulnds/socklnd/connection.c | 615 ------------------ lnet/ulnds/socklnd/connection.h | 35 - lnet/ulnds/socklnd/dispatch.h | 44 -- lnet/ulnds/socklnd/handlers.c | 1005 ++++++++++++++++++++++++++++ lnet/ulnds/socklnd/poll.c | 483 ++++++++++++++ lnet/ulnds/socklnd/pqtimer.c | 226 ------- lnet/ulnds/socklnd/pqtimer.h | 25 - lnet/ulnds/socklnd/procapi.c | 198 ------ lnet/ulnds/socklnd/procbridge.h | 58 -- lnet/ulnds/socklnd/proclib.c | 108 --- lnet/ulnds/socklnd/select.c | 420 ------------ lnet/ulnds/socklnd/table.c | 267 -------- lnet/ulnds/socklnd/table.h | 40 -- lnet/ulnds/socklnd/tcplnd.c | 253 -------- lnet/ulnds/socklnd/timer.h | 30 - lnet/ulnds/socklnd/usocklnd.c | 546 ++++++++++++++++ lnet/ulnds/socklnd/usocklnd.h | 332 ++++++++++ lnet/ulnds/socklnd/usocklnd_cb.c | 176 +++++ 42 files changed, 4940 insertions(+), 2420 deletions(-) create mode 100644 lnet/include/libcfs/user-tcpip.h create mode 100644 lnet/libcfs/user-tcpip.c delete mode 100644 lnet/ulnds/socklnd/README delete mode 100644 lnet/ulnds/socklnd/bridge.h create mode 100644 lnet/ulnds/socklnd/conn.c delete mode 100644 lnet/ulnds/socklnd/connection.c delete mode 100644 lnet/ulnds/socklnd/connection.h delete mode 100644 lnet/ulnds/socklnd/dispatch.h create mode 100644 lnet/ulnds/socklnd/handlers.c create mode 100644 lnet/ulnds/socklnd/poll.c delete mode 100644 lnet/ulnds/socklnd/pqtimer.c delete mode 100644 lnet/ulnds/socklnd/pqtimer.h delete mode 100644 lnet/ulnds/socklnd/procapi.c delete mode 100644 lnet/ulnds/socklnd/procbridge.h delete mode 100644 lnet/ulnds/socklnd/proclib.c delete mode 100644 lnet/ulnds/socklnd/select.c delete mode 100644 lnet/ulnds/socklnd/table.c delete mode 100644 lnet/ulnds/socklnd/table.h delete mode 100644 lnet/ulnds/socklnd/tcplnd.c delete mode 100644 lnet/ulnds/socklnd/timer.h create mode 100644 lnet/ulnds/socklnd/usocklnd.c create mode 100644 lnet/ulnds/socklnd/usocklnd.h create mode 100644 lnet/ulnds/socklnd/usocklnd_cb.c diff --git a/lnet/ChangeLog b/lnet/ChangeLog index de11cf9..ad81625 100644 --- a/lnet/ChangeLog +++ b/lnet/ChangeLog @@ -12,6 +12,12 @@ tbd Cluster File Systems, Inc. mxlnd - MX 1.2.1 or later, ptllnd - Portals 3.3 / UNICOS/lc 1.5.x, 2.0.x +Severity : normal +Bugzilla : 12302 +Description: new userspace socklnd +Details : Old userspace tcpnal that resided in lnet/ulnds/socklnd replaced + with new one - usocklnd. + -------------------------------------------------------------------------------- 2007-09-27 Cluster File Systems, Inc. diff --git a/lnet/include/libcfs/Makefile.am b/lnet/include/libcfs/Makefile.am index 2874a52..5bdb653 100644 --- a/lnet/include/libcfs/Makefile.am +++ b/lnet/include/libcfs/Makefile.am @@ -5,4 +5,5 @@ endif DIST_SUBDIRS := $(SUBDIRS) EXTRA_DIST := curproc.h kp30.h libcfs.h list.h lltrace.h \ - portals_utils.h types.h user-lock.h user-prim.h user-time.h + portals_utils.h types.h user-lock.h user-prim.h user-time.h \ + user-tcpip.h diff --git a/lnet/include/libcfs/libcfs.h b/lnet/include/libcfs/libcfs.h index a617f31b..b607c1a 100644 --- a/lnet/include/libcfs/libcfs.h +++ b/lnet/include/libcfs/libcfs.h @@ -321,9 +321,6 @@ int libcfs_register_ioctl(struct libcfs_ioctl_handler *hand); int libcfs_deregister_ioctl(struct libcfs_ioctl_handler *hand); /* libcfs tcpip */ -#define LNET_ACCEPTOR_MIN_RESERVED_PORT 512 -#define LNET_ACCEPTOR_MAX_RESERVED_PORT 1023 - int libcfs_ipif_query(char *name, int *up, __u32 *ip, __u32 *mask); int libcfs_ipif_enumerate(char ***names); void libcfs_ipif_free_enumeration(char **names, int n); @@ -365,6 +362,10 @@ void lc_watchdog_dumplog(pid_t pid, void *data); /* __KERNEL__ */ #endif +/* need both kernel and user-land acceptor */ +#define LNET_ACCEPTOR_MIN_RESERVED_PORT 512 +#define LNET_ACCEPTOR_MAX_RESERVED_PORT 1023 + /* * libcfs pseudo device operations * diff --git a/lnet/include/libcfs/linux/libcfs.h b/lnet/include/libcfs/linux/libcfs.h index 0aac919..5e025d1 100644 --- a/lnet/include/libcfs/linux/libcfs.h +++ b/lnet/include/libcfs/linux/libcfs.h @@ -8,6 +8,12 @@ #error Do not #include this file directly. #include instead #endif +#ifdef HAVE_ASM_TYPES_H +#include +#else +#include +#endif + #include #include #include @@ -16,12 +22,6 @@ #include #include -#ifdef HAVE_ASM_TYPES_H -#include -#else -#include -#endif - #ifdef __KERNEL__ # include diff --git a/lnet/include/libcfs/linux/linux-tcpip.h b/lnet/include/libcfs/linux/linux-tcpip.h index 2d14904..fb2ac93 100644 --- a/lnet/include/libcfs/linux/linux-tcpip.h +++ b/lnet/include/libcfs/linux/linux-tcpip.h @@ -57,6 +57,10 @@ typedef struct socket cfs_socket_t; #define SOCK_ERROR(so) ((so)->sk->sk_err) #define SOCK_TEST_NOSPACE(so) test_bit(SOCK_NOSPACE, &(so)->flags) -#endif +#else /* !__KERNEL__ */ + +#include "../user-tcpip.h" + +#endif /* __KERNEL__ */ #endif diff --git a/lnet/include/libcfs/user-lock.h b/lnet/include/libcfs/user-lock.h index cea7a6d..97a5a16 100644 --- a/lnet/include/libcfs/user-lock.h +++ b/lnet/include/libcfs/user-lock.h @@ -191,6 +191,40 @@ typedef struct { volatile int counter; } atomic_t; #endif +#ifdef HAVE_LIBPTHREAD +#include + +/* + * Completion + */ + +struct cfs_completion { + int c_done; + pthread_cond_t c_cond; + pthread_mutex_t c_mut; +}; + +void cfs_init_completion(struct cfs_completion *c); +void cfs_fini_completion(struct cfs_completion *c); +void cfs_complete(struct cfs_completion *c); +void cfs_wait_for_completion(struct cfs_completion *c); + +/* + * atomic.h + */ + +typedef struct { volatile int counter; } cfs_atomic_t; + +int cfs_atomic_read(cfs_atomic_t *a); +void cfs_atomic_set(cfs_atomic_t *a, int b); +int cfs_atomic_dec_and_test(cfs_atomic_t *a); +void cfs_atomic_inc(cfs_atomic_t *a); +void cfs_atomic_dec(cfs_atomic_t *a); +void cfs_atomic_add(int b, cfs_atomic_t *a); +void cfs_atomic_sub(int b, cfs_atomic_t *a); + +#endif /* HAVE_LIBPTHREAD */ + /* !__KERNEL__ */ #endif diff --git a/lnet/include/libcfs/user-prim.h b/lnet/include/libcfs/user-prim.h index e7c8cd1..2447bbc 100644 --- a/lnet/include/libcfs/user-prim.h +++ b/lnet/include/libcfs/user-prim.h @@ -48,6 +48,10 @@ #include #include +#ifdef HAVE_LIBPTHREAD +#include +#endif + #ifndef PAGE_SIZE #define PAGE_SIZE (getpagesize()) @@ -282,6 +286,20 @@ static inline int cfs_psdev_deregister(cfs_psdev_t *foo) #define cfs_recalc_sigpending(l) do {} while (0) #define cfs_kernel_thread(l,m,n) LBUG() +#ifdef HAVE_LIBPTHREAD +typedef int (*cfs_thread_t)(void *); +int cfs_create_thread(cfs_thread_t func, void *arg); +#else +#define cfs_create_thread(l,m) LBUG() +#endif + +int cfs_parse_int_tunable(int *value, char *name); +uid_t cfs_curproc_uid(void); + +#define LIBCFS_REALLOC(ptr, size) realloc(ptr, size) + +#define cfs_online_cpus() sysconf(_SC_NPROCESSORS_ONLN) + // static inline void local_irq_save(unsigned long flag) {return;} // static inline void local_irq_restore(unsigned long flag) {return;} diff --git a/lnet/include/libcfs/user-tcpip.h b/lnet/include/libcfs/user-tcpip.h new file mode 100644 index 0000000..342c039 --- /dev/null +++ b/lnet/include/libcfs/user-tcpip.h @@ -0,0 +1,90 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2005 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef __LIBCFS_USER_TCPIP_H__ +#define __LIBCFS_USER_TCPIP_H__ + +#ifndef __LIBCFS_LIBCFS_H__ +#error Do not #include this file directly. #include instead +#endif + +#ifndef __KERNEL__ + +#include + +/* + * Functions to get network interfaces info + */ + +int libcfs_sock_ioctl(int cmd, unsigned long arg); +int libcfs_ipif_query (char *name, int *up, __u32 *ip); +void libcfs_ipif_free_enumeration (char **names, int n); +int libcfs_ipif_enumerate (char ***namesp); + +/* + * Network function used by user-land lnet acceptor + */ + +int libcfs_sock_listen (int *sockp, __u32 local_ip, int local_port, int backlog); +int libcfs_sock_accept (int *newsockp, int sock, __u32 *peer_ip, int *peer_port); +int libcfs_sock_read (int sock, void *buffer, int nob, int timeout); +void libcfs_sock_abort_accept(__u16 port); + +/* + * Network functions of common use + */ + +int libcfs_getpeername(int sock_fd, __u32 *ipaddr_p, __u16 *port_p); +int libcfs_socketpair(int *fdp); +int libcfs_fcntl_nonblock(int fd); +int libcfs_sock_set_nagle(int fd, int nagle); +int libcfs_sock_set_bufsiz(int fd, int bufsiz); +int libcfs_sock_create(int *fdp); +int libcfs_sock_bind_to_port(int fd, __u16 port); +int libcfs_sock_connect(int fd, __u32 ip, __u16 port); +int libcfs_sock_writev(int fd, const struct iovec *vector, int count); +int libcfs_sock_readv(int fd, const struct iovec *vector, int count); + +/* + * Macros for easy printing IP-adresses + */ + +#define NIPQUAD(addr) \ + ((unsigned char *)&addr)[0], \ + ((unsigned char *)&addr)[1], \ + ((unsigned char *)&addr)[2], \ + ((unsigned char *)&addr)[3] + +#if defined(__LITTLE_ENDIAN) || defined(_LITTLE_ENDIAN) +#define HIPQUAD(addr) \ + ((unsigned char *)&addr)[3], \ + ((unsigned char *)&addr)[2], \ + ((unsigned char *)&addr)[1], \ + ((unsigned char *)&addr)[0] +#elif defined(__BIG_ENDIAN) || defined(_BIG_ENDIAN) +#define HIPQUAD NIPQUAD +#else +#error "Undefined byteorder??" +#endif /* __LITTLE_ENDIAN */ + +#endif /* !__KERNEL__ */ + +#endif diff --git a/lnet/include/lnet/api.h b/lnet/include/lnet/api.h index c240aa2..20e9177 100644 --- a/lnet/include/lnet/api.h +++ b/lnet/include/lnet/api.h @@ -99,4 +99,12 @@ int LNetGet(lnet_nid_t self, int LNetSetAsync(lnet_process_id_t id, int nasync); +#ifndef __KERNEL__ +/* Temporary workaround to allow uOSS and test programs force server + * mode in userspace. See comments near ln_server_mode_flag in + * lnet/lib-types.h */ + +void lnet_server_mode(); +#endif + #endif diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index 700059c..37dc5d4 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -653,6 +653,11 @@ int lnet_acceptor_timeout(void); int lnet_acceptor_port(void); #endif +#ifdef HAVE_LIBPTHREAD +int lnet_count_acceptor_nis(lnet_ni_t **first_ni); +int lnet_acceptor_port(void); +#endif + int lnet_acceptor_start(void); void lnet_acceptor_stop(void); diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 6c6dfd3..c03810e 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -335,6 +335,10 @@ typedef struct lnet_lnd /* ensure non-RDMA messages can be received outside liblustre */ int (*lnd_setasync)(struct lnet_ni *ni, lnet_process_id_t id, int nasync); + +#ifdef HAVE_LIBPTHREAD + int (*lnd_accept)(struct lnet_ni *ni, int sock); +#endif #endif } lnd_t; @@ -547,6 +551,15 @@ typedef struct struct list_head ln_active_eqs; lnet_counters_t ln_counters; + +#ifndef __KERNEL__ + /* Temporary workaround to allow uOSS and test programs force + * server mode in userspace. The only place where we use it is + * lnet_prepare(). The only way to turn this flag on is to + * call lnet_server_mode() */ + + int ln_server_mode_flag; +#endif } lnet_t; #endif diff --git a/lnet/include/lnet/socklnd.h b/lnet/include/lnet/socklnd.h index 301f8a8..fbeea15 100644 --- a/lnet/include/lnet/socklnd.h +++ b/lnet/include/lnet/socklnd.h @@ -50,4 +50,8 @@ typedef struct { #define KSOCK_MSG_NOOP 0xc0 /* ksm_u empty */ #define KSOCK_MSG_LNET 0xc1 /* lnet msg */ +/* We need to know this number to parse hello msg from ksocklnd in + * other LND (usocklnd, for example) */ +#define KSOCK_PROTO_V2 2 + #endif diff --git a/lnet/klnds/socklnd/socklnd.h b/lnet/klnds/socklnd/socklnd.h index 825a2af..b83426e 100644 --- a/lnet/klnds/socklnd/socklnd.h +++ b/lnet/klnds/socklnd/socklnd.h @@ -366,7 +366,6 @@ extern ksock_proto_t ksocknal_protocol_v2x; #define KSOCK_PROTO_V1_MAJOR LNET_PROTO_TCP_VERSION_MAJOR #define KSOCK_PROTO_V1_MINOR LNET_PROTO_TCP_VERSION_MINOR #define KSOCK_PROTO_V1 KSOCK_PROTO_V1_MAJOR -#define KSOCK_PROTO_V2 2 static inline int ksocknal_route_mask(void) diff --git a/lnet/libcfs/autoMakefile.am b/lnet/libcfs/autoMakefile.am index 18381c1..628bd61 100644 --- a/lnet/libcfs/autoMakefile.am +++ b/lnet/libcfs/autoMakefile.am @@ -11,7 +11,7 @@ DIST_SUBDIRS := $(SUBDIRS) if LIBLUSTRE noinst_LIBRARIES= libcfs.a -libcfs_a_SOURCES= debug.c user-prim.c user-lock.c +libcfs_a_SOURCES= debug.c user-prim.c user-lock.c user-tcpip.c libcfs_a_CPPFLAGS = $(LLCPPFLAGS) libcfs_a_CFLAGS = $(LLCFLAGS) endif @@ -49,4 +49,5 @@ install-data-hook: $(install_data_hook) EXTRA_DIST := Info.plist MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ linux-*.c linux/*.o darwin/*.o libcfs -DIST_SOURCES := $(libcfs-all-objs:%.o=%.c) tracefile.h user-prim.c user-lock.c +DIST_SOURCES := $(libcfs-all-objs:%.o=%.c) tracefile.h user-prim.c \ + user-lock.c user-tcpip.c diff --git a/lnet/libcfs/user-lock.c b/lnet/libcfs/user-lock.c index a1a6779..c521dc7 100644 --- a/lnet/libcfs/user-lock.c +++ b/lnet/libcfs/user-lock.c @@ -37,6 +37,8 @@ #include #include +#include + /* * Optional debugging (magic stamping and checking ownership) can be added. */ @@ -223,6 +225,110 @@ void up_write(struct rw_semaphore *s) } #endif +#ifdef HAVE_LIBPTHREAD + +/* + * Completion + */ + +void cfs_init_completion(struct cfs_completion *c) +{ + LASSERT(c != NULL); + c->c_done = 0; + pthread_mutex_init(&c->c_mut, NULL); + pthread_cond_init(&c->c_cond, NULL); +} + +void cfs_fini_completion(struct cfs_completion *c) +{ + LASSERT(c != NULL); + pthread_mutex_destroy(&c->c_mut); + pthread_cond_destroy(&c->c_cond); +} + +void cfs_complete(struct cfs_completion *c) +{ + LASSERT(c != NULL); + pthread_mutex_lock(&c->c_mut); + c->c_done++; + pthread_cond_signal(&c->c_cond); + pthread_mutex_unlock(&c->c_mut); +} + +void cfs_wait_for_completion(struct cfs_completion *c) +{ + LASSERT(c != NULL); + pthread_mutex_lock(&c->c_mut); + while (c->c_done == 0) + pthread_cond_wait(&c->c_cond, &c->c_mut); + c->c_done--; + pthread_mutex_unlock(&c->c_mut); +} + +/* + * atomic primitives + */ + +static pthread_mutex_t atomic_guard_lock = PTHREAD_MUTEX_INITIALIZER; + +int cfs_atomic_read(cfs_atomic_t *a) +{ + int r; + + pthread_mutex_lock(&atomic_guard_lock); + r = a->counter; + pthread_mutex_unlock(&atomic_guard_lock); + return r; +} + +void cfs_atomic_set(cfs_atomic_t *a, int b) +{ + pthread_mutex_lock(&atomic_guard_lock); + a->counter = b; + pthread_mutex_unlock(&atomic_guard_lock); +} + +int cfs_atomic_dec_and_test(cfs_atomic_t *a) +{ + int r; + + pthread_mutex_lock(&atomic_guard_lock); + r = --a->counter; + pthread_mutex_unlock(&atomic_guard_lock); + return (r == 0); +} + +void cfs_atomic_inc(cfs_atomic_t *a) +{ + pthread_mutex_lock(&atomic_guard_lock); + ++a->counter; + pthread_mutex_unlock(&atomic_guard_lock); +} + +void cfs_atomic_dec(cfs_atomic_t *a) +{ + pthread_mutex_lock(&atomic_guard_lock); + --a->counter; + pthread_mutex_unlock(&atomic_guard_lock); +} +void cfs_atomic_add(int b, cfs_atomic_t *a) + +{ + pthread_mutex_lock(&atomic_guard_lock); + a->counter += b; + pthread_mutex_unlock(&atomic_guard_lock); +} + +void cfs_atomic_sub(int b, cfs_atomic_t *a) +{ + pthread_mutex_lock(&atomic_guard_lock); + a->counter -= b; + pthread_mutex_unlock(&atomic_guard_lock); +} + +#endif /* HAVE_LIBPTHREAD */ + + /* !__KERNEL__ */ #endif diff --git a/lnet/libcfs/user-prim.c b/lnet/libcfs/user-prim.c index 7bfe074..b1c1966 100644 --- a/lnet/libcfs/user-prim.c +++ b/lnet/libcfs/user-prim.c @@ -139,6 +139,69 @@ int64_t cfs_waitq_timedwait(struct cfs_waitlink *link, int state, int64_t timeou return 0; } +#ifdef HAVE_LIBPTHREAD + +/* + * Threads + */ + +struct lustre_thread_arg { + cfs_thread_t f; + void *arg; +}; +static void *cfs_thread_helper(void *data) +{ + struct lustre_thread_arg *targ = data; + cfs_thread_t f = targ->f; + void *arg = targ->arg; + + free(targ); + + (void)f(arg); + return NULL; +} +int cfs_create_thread(cfs_thread_t func, void *arg) +{ + pthread_t tid; + pthread_attr_t tattr; + int rc; + struct lustre_thread_arg *targ_p = malloc(sizeof(struct lustre_thread_arg)); + + if ( targ_p == NULL ) + return -ENOMEM; + + targ_p->f = func; + targ_p->arg = arg; + + pthread_attr_init(&tattr); + pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED); + rc = pthread_create(&tid, &tattr, cfs_thread_helper, targ_p); + pthread_attr_destroy(&tattr); + return -rc; +} +#endif + +uid_t cfs_curproc_uid(void) +{ + return getuid(); +} + +int cfs_parse_int_tunable(int *value, char *name) +{ + char *env = getenv(name); + char *end; + + if (env == NULL) + return 0; + + *value = strtoull(env, &end, 0); + if (*end == 0) + return 0; + + CERROR("Can't parse tunable %s=%s\n", name, env); + return -EINVAL; +} + /* * Allocator */ diff --git a/lnet/libcfs/user-tcpip.c b/lnet/libcfs/user-tcpip.c new file mode 100644 index 0000000..5637396 --- /dev/null +++ b/lnet/libcfs/user-tcpip.c @@ -0,0 +1,603 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2005 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef __KERNEL__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#if defined(__sun__) || defined(__sun) +#include +#endif +#ifndef __CYGWIN__ +#include +#endif + +#include +#include + +/* + * Functions to get network interfaces info + */ + +int +libcfs_sock_ioctl(int cmd, unsigned long arg) +{ + int fd, rc; + + fd = socket(AF_INET, SOCK_STREAM, 0); + + if (fd < 0) { + rc = -errno; + CERROR("socket() failed: errno==%d\n", errno); + return rc; + } + + rc = ioctl(fd, cmd, arg); + + close(fd); + return rc; +} + +int +libcfs_ipif_query (char *name, int *up, __u32 *ip) +{ + struct ifreq ifr; + int nob; + int rc; + __u32 val; + + nob = strlen(name); + if (nob >= IFNAMSIZ) { + CERROR("Interface name %s too long\n", name); + return -EINVAL; + } + + CLASSERT (sizeof(ifr.ifr_name) >= IFNAMSIZ); + + strcpy(ifr.ifr_name, name); + rc = libcfs_sock_ioctl(SIOCGIFFLAGS, (unsigned long)&ifr); + + if (rc != 0) { + CERROR("Can't get flags for interface %s\n", name); + return rc; + } + + if ((ifr.ifr_flags & IFF_UP) == 0) { + CDEBUG(D_NET, "Interface %s down\n", name); + *up = 0; + *ip = 0; + return 0; + } + + *up = 1; + + strcpy(ifr.ifr_name, name); + ifr.ifr_addr.sa_family = AF_INET; + rc = libcfs_sock_ioctl(SIOCGIFADDR, (unsigned long)&ifr); + + if (rc != 0) { + CERROR("Can't get IP address for interface %s\n", name); + return rc; + } + + val = ((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr.s_addr; + *ip = ntohl(val); + + return 0; +} + +void +libcfs_ipif_free_enumeration (char **names, int n) +{ + int i; + + LASSERT (n > 0); + + for (i = 0; i < n && names[i] != NULL; i++) + LIBCFS_FREE(names[i], IFNAMSIZ); + + LIBCFS_FREE(names, n * sizeof(*names)); +} + +int +libcfs_ipif_enumerate (char ***namesp) +{ + /* Allocate and fill in 'names', returning # interfaces/error */ + char **names; + int nalloc; + int nfound; + struct ifreq *ifr; + struct ifconf ifc; + int rc; + int nob; + int i; + + + nalloc = 16; /* first guess at max interfaces */ + for (;;) { + LIBCFS_ALLOC(ifr, nalloc * sizeof(*ifr)); + if (ifr == NULL) { + CERROR ("ENOMEM enumerating up to %d interfaces\n", + nalloc); + rc = -ENOMEM; + goto out0; + } + + ifc.ifc_buf = (char *)ifr; + ifc.ifc_len = nalloc * sizeof(*ifr); + + rc = libcfs_sock_ioctl(SIOCGIFCONF, (unsigned long)&ifc); + + if (rc < 0) { + CERROR ("Error %d enumerating interfaces\n", rc); + goto out1; + } + + LASSERT (rc == 0); + + nfound = ifc.ifc_len/sizeof(*ifr); + LASSERT (nfound <= nalloc); + + if (nfound < nalloc) + break; + + LIBCFS_FREE(ifr, nalloc * sizeof(*ifr)); + nalloc *= 2; + } + + if (nfound == 0) + goto out1; + + LIBCFS_ALLOC(names, nfound * sizeof(*names)); + if (names == NULL) { + rc = -ENOMEM; + goto out1; + } + /* NULL out all names[i] */ + memset (names, 0, nfound * sizeof(*names)); + + for (i = 0; i < nfound; i++) { + + nob = strlen (ifr[i].ifr_name); + if (nob >= IFNAMSIZ) { + /* no space for terminating NULL */ + CERROR("interface name %.*s too long (%d max)\n", + nob, ifr[i].ifr_name, IFNAMSIZ); + rc = -ENAMETOOLONG; + goto out2; + } + + LIBCFS_ALLOC(names[i], IFNAMSIZ); + if (names[i] == NULL) { + rc = -ENOMEM; + goto out2; + } + + memcpy(names[i], ifr[i].ifr_name, nob); + names[i][nob] = 0; + } + + *namesp = names; + rc = nfound; + + out2: + if (rc < 0) + libcfs_ipif_free_enumeration(names, nfound); + out1: + LIBCFS_FREE(ifr, nalloc * sizeof(*ifr)); + out0: + return rc; +} + +/* + * Network functions used by user-land lnet acceptor + */ + +int +libcfs_sock_listen (int *sockp, __u32 local_ip, int local_port, int backlog) +{ + int rc; + int option; + struct sockaddr_in locaddr; + + *sockp = socket(AF_INET, SOCK_STREAM, 0); + if (*sockp < 0) { + rc = -errno; + CERROR("socket() failed: errno==%d\n", errno); + return rc; + } + + option = 1; + if ( setsockopt(*sockp, SOL_SOCKET, SO_REUSEADDR, + (char *)&option, sizeof (option)) ) { + rc = -errno; + CERROR("setsockopt(SO_REUSEADDR) failed: errno==%d\n", errno); + goto failed; + } + + if (local_ip != 0 || local_port != 0) { + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; + locaddr.sin_port = htons(local_port); + locaddr.sin_addr.s_addr = (local_ip == 0) ? + INADDR_ANY : htonl(local_ip); + + if ( bind(*sockp, (struct sockaddr *)&locaddr, sizeof(locaddr)) ) { + rc = -errno; + if ( errno == -EADDRINUSE ) + CDEBUG(D_NET, "Port %d already in use\n", + local_port); + else + CERROR("bind() to port %d failed: errno==%d\n", + local_port, errno); + goto failed; + } + } + + if ( listen(*sockp, backlog) ) { + rc = -errno; + CERROR("listen() with backlog==%d failed: errno==%d\n", + backlog, errno); + goto failed; + } + + return 0; + + failed: + close(*sockp); + return rc; +} + +int +libcfs_sock_accept (int *newsockp, int sock, __u32 *peer_ip, int *peer_port) +{ + struct sockaddr_in accaddr; + socklen_t accaddr_len = sizeof(struct sockaddr_in); + + *newsockp = accept(sock, (struct sockaddr *)&accaddr, &accaddr_len); + + if ( *newsockp < 0 ) { + CERROR("accept() failed: errno==%d\n", errno); + return -errno; + } + + *peer_ip = ntohl(accaddr.sin_addr.s_addr); + *peer_port = ntohs(accaddr.sin_port); + + return 0; +} + +int +libcfs_sock_read (int sock, void *buffer, int nob, int timeout) +{ + int rc; + struct pollfd pfd; + cfs_time_t start_time = cfs_time_current(); + + pfd.fd = sock; + pfd.events = POLLIN; + pfd.revents = 0; + + /* poll(2) measures timeout in msec */ + timeout *= 1000; + + while (nob != 0 && timeout > 0) { + cfs_time_t current_time; + + rc = poll(&pfd, 1, timeout); + if (rc < 0) + return -errno; + if (rc == 0) + return -ETIMEDOUT; + if ((pfd.revents & POLLIN) == 0) + return -EIO; + + rc = read(sock, buffer, nob); + if (rc < 0) + return -errno; + if (rc == 0) + return -EIO; + + buffer = ((char *)buffer) + rc; + nob -= rc; + + current_time = cfs_time_current(); + timeout -= cfs_duration_sec(cfs_time_sub(cfs_time_current(), + start_time)); + } + + if (nob == 0) + return 0; + else + return -ETIMEDOUT; +} + +/* Just try to connect to localhost to wake up entity that are + * sleeping in accept() */ +void +libcfs_sock_abort_accept(__u16 port) +{ + int fd, rc; + struct sockaddr_in locaddr; + + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; + locaddr.sin_port = htons(port); + locaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); + + fd = socket(AF_INET, SOCK_STREAM, 0); + if ( fd < 0 ) { + CERROR("socket() failed: errno==%d\n", errno); + return; + } + + rc = connect(fd, (struct sockaddr *)&locaddr, sizeof(locaddr)); + if ( rc != 0 ) { + if ( errno != ECONNREFUSED ) + CERROR("connect() failed: errno==%d\n", errno); + else + CDEBUG(D_NET, "Nobody to wake up at %d\n", port); + } + + close(fd); +} + +/* + * Network functions of common use + */ + +int +libcfs_getpeername(int sock_fd, __u32 *ipaddr_p, __u16 *port_p) +{ + int rc; + struct sockaddr_in peer_addr; + socklen_t peer_addr_len = sizeof(peer_addr); + + rc = getpeername(sock_fd, (struct sockaddr *)&peer_addr, &peer_addr_len); + if (rc != 0) + return -errno; + + if (ipaddr_p != NULL) + *ipaddr_p = ntohl(peer_addr.sin_addr.s_addr); + if (port_p != NULL) + *port_p = ntohs(peer_addr.sin_port); + + return 0; +} + +int +libcfs_socketpair(int *fdp) +{ + int rc, i; + + rc = socketpair(AF_UNIX, SOCK_STREAM, 0, fdp); + if (rc != 0) { + rc = -errno; + CERROR ("Cannot create socket pair\n"); + return rc; + } + + for (i = 0; i < 2; i++) { + rc = libcfs_fcntl_nonblock(fdp[i]); + if (rc) { + close(fdp[0]); + close(fdp[1]); + return rc; + } + } + + return 0; +} + +int +libcfs_fcntl_nonblock(int fd) +{ + int rc, flags; + + flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) { + rc = -errno; + CERROR ("Cannot get socket flags\n"); + return rc; + } + + rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (rc != 0) { + rc = -errno; + CERROR ("Cannot set socket flags\n"); + return rc; + } + + return 0; +} + +int +libcfs_sock_set_nagle(int fd, int nagle) +{ + int rc; + int option = nagle ? 0 : 1; + +#if defined(__sun__) || defined(__sun) + rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &option, sizeof(option)); +#else + rc = setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option)); +#endif + + if (rc != 0) { + rc = -errno; + CERROR ("Cannot set NODELAY socket option\n"); + return rc; + } + + return 0; +} + +int +libcfs_sock_set_bufsiz(int fd, int bufsiz) +{ + int rc, option; + + LASSERT (bufsiz != 0); + + option = bufsiz; + rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option)); + if (rc != 0) { + rc = -errno; + CERROR ("Cannot set SNDBUF socket option\n"); + return rc; + } + + option = bufsiz; + rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option)); + if (rc != 0) { + rc = -errno; + CERROR ("Cannot set RCVBUF socket option\n"); + return rc; + } + + return 0; +} + +int +libcfs_sock_create(int *fdp) +{ + int rc, fd, option; + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + rc = -errno; + CERROR ("Cannot create socket\n"); + return rc; + } + + option = 1; + rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + &option, sizeof(option)); + if (rc != 0) { + rc = -errno; + CERROR ("Cannot set SO_REUSEADDR for socket\n"); + close(fd); + return rc; + } + + *fdp = fd; + return 0; +} + +int +libcfs_sock_bind_to_port(int fd, __u16 port) +{ + int rc; + struct sockaddr_in locaddr; + + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; + locaddr.sin_addr.s_addr = INADDR_ANY; + locaddr.sin_port = htons(port); + + rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr)); + if (rc != 0) { + rc = -errno; + CERROR ("Cannot bind to port %d\n", port); + return rc; + } + + return 0; +} + +int +libcfs_sock_connect(int fd, __u32 ip, __u16 port) +{ + int rc; + struct sockaddr_in addr; + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(ip); + addr.sin_port = htons(port); + + rc = connect(fd, (struct sockaddr *)&addr, + sizeof(struct sockaddr_in)); + + if(rc != 0 && errno != EINPROGRESS) { + rc = -errno; + if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) + CERROR ("Cannot connect to %u.%u.%u.%u:%d (err=%d)\n", + HIPQUAD(ip), port, errno); + return rc; + } + + return 0; +} + +/* NB: EPIPE and ECONNRESET are considered as non-fatal + * because: + * 1) it still makes sense to continue reading && + * 2) anyway, poll() will set up POLLHUP|POLLERR flags */ +int libcfs_sock_writev(int fd, const struct iovec *vector, int count) +{ + int rc; + + rc = syscall(SYS_writev, fd, vector, count); + + if (rc == 0) /* write nothing */ + return 0; + + if (rc < 0) { + if (errno == EAGAIN || /* write nothing */ + errno == EPIPE || /* non-fatal error */ + errno == ECONNRESET) /* non-fatal error */ + return 0; + else + return -errno; + } + + return rc; +} + +int libcfs_sock_readv(int fd, const struct iovec *vector, int count) +{ + int rc; + + rc = syscall(SYS_readv, fd, vector, count); + + if (rc == 0) /* EOF */ + return -EIO; + + if (rc < 0) { + if (errno == EAGAIN) /* read nothing */ + return 0; + else + return -errno; + } + + return rc; +} + +#endif /* !__KERNEL__ */ diff --git a/lnet/lnet/acceptor.c b/lnet/lnet/acceptor.c index 18e825c..717058c 100644 --- a/lnet/lnet/acceptor.c +++ b/lnet/lnet/acceptor.c @@ -522,16 +522,335 @@ lnet_acceptor_stop(void) } #else /* __KERNEL__ */ +#ifdef HAVE_LIBPTHREAD + +static char *accept_type; +static int accept_port = 988; +static int accept_backlog; +static int accept_timeout; + +struct { + int pta_shutdown; + int pta_sock; + struct cfs_completion pta_completion; +} lnet_acceptor_state; + +int +lnet_acceptor_port(void) +{ + return accept_port; +} + +int +lnet_parse_int_tunable(int *value, char *name, int dflt) +{ + char *env = getenv(name); + char *end; + + if (env == NULL) { + *value = dflt; + return 0; + } + + *value = strtoull(env, &end, 0); + if (*end == 0) + return 0; + + CERROR("Can't parse tunable %s=%s\n", name, env); + return -EINVAL; +} + +int +lnet_parse_string_tunable(char **value, char *name, char *dflt) +{ + char *env = getenv(name); + + if (env == NULL) + *value = dflt; + else + *value = env; + + return 0; +} + +int +lnet_get_tunables() +{ + int rc; + rc = lnet_parse_string_tunable(&accept_type, "LNET_ACCEPT", "secure"); + + if (rc != 0) + return rc; + + rc = lnet_parse_int_tunable(&accept_port, "LNET_ACCEPT_PORT", 988); + + if (rc != 0) + return rc; + + rc = lnet_parse_int_tunable(&accept_backlog, "LNET_ACCEPT_BACKLOG", 127); + + if (rc != 0) + return rc; + + rc = lnet_parse_int_tunable(&accept_timeout, "LNET_ACCEPT_TIMEOUT", 5); + + if (rc != 0) + return rc; + + CDEBUG(D_NET, "accept_type = %s\n", accept_type); + CDEBUG(D_NET, "accept_port = %d\n", accept_port); + CDEBUG(D_NET, "accept_backlog = %d\n", accept_backlog); + CDEBUG(D_NET, "accept_timeout = %d\n", accept_timeout); + return 0; +} + +static inline int +lnet_accept_magic(__u32 magic, __u32 constant) +{ + return (magic == constant || + magic == __swab32(constant)); +} + +/* user-land lnet_accept() isn't used by any LND's directly. So, we don't + * do it visible outside acceptor.c and we can change its prototype + * freely */ +static int +lnet_accept(int sock, __u32 magic, __u32 peer_ip, int peer_port) +{ + int rc, flip; + lnet_acceptor_connreq_t cr; + lnet_ni_t *ni; + + if (!lnet_accept_magic(magic, LNET_PROTO_ACCEPTOR_MAGIC)) { + LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u magic %08x: " + "unsupported acceptor protocol\n", + HIPQUAD(peer_ip), magic); + return -EPROTO; + } + + flip = (magic != LNET_PROTO_ACCEPTOR_MAGIC); + + rc = libcfs_sock_read(sock, &cr.acr_version, + sizeof(cr.acr_version), + accept_timeout); + if (rc != 0) { + CERROR("Error %d reading connection request version from " + "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); + return -EIO; + } + + if (flip) + __swab32s(&cr.acr_version); + + if (cr.acr_version != LNET_PROTO_ACCEPTOR_VERSION) + return -EPROTO; + + rc = libcfs_sock_read(sock, &cr.acr_nid, + sizeof(cr) - + offsetof(lnet_acceptor_connreq_t, acr_nid), + accept_timeout); + if (rc != 0) { + CERROR("Error %d reading connection request from " + "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); + return -EIO; + } + + if (flip) + __swab64s(&cr.acr_nid); + + ni = lnet_net2ni(LNET_NIDNET(cr.acr_nid)); + + if (ni == NULL || /* no matching net */ + ni->ni_nid != cr.acr_nid) { /* right NET, wrong NID! */ + if (ni != NULL) + lnet_ni_decref(ni); + LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: " + " No matching NI\n", + HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid)); + return -EPERM; + } + + if (ni->ni_lnd->lnd_accept == NULL) { + lnet_ni_decref(ni); + LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: " + " NI doesn not accept IP connections\n", + HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid)); + return -EPERM; + } + + CDEBUG(D_NET, "Accept %s from %u.%u.%u.%u\n", + libcfs_nid2str(cr.acr_nid), HIPQUAD(peer_ip)); + + rc = ni->ni_lnd->lnd_accept(ni, sock); + + lnet_ni_decref(ni); + return rc; +} + +int +lnet_acceptor(void *arg) +{ + char name[16]; + int secure = (int)((unsigned long)arg); + int rc; + int newsock; + __u32 peer_ip; + int peer_port; + __u32 magic; + + snprintf(name, sizeof(name), "acceptor_%03d", accept_port); + cfs_daemonize(name); + cfs_block_allsigs(); + + rc = libcfs_sock_listen(&lnet_acceptor_state.pta_sock, + 0, accept_port, accept_backlog); + if (rc != 0) { + if (rc == -EADDRINUSE) + LCONSOLE_ERROR("Can't start acceptor on port %d: " + "port already in use\n", + accept_port); + else + LCONSOLE_ERROR("Can't start acceptor on port %d: " + "unexpected error %d\n", + accept_port, rc); + + } else { + LCONSOLE(0, "Accept %s, port %d\n", accept_type, accept_port); + } + + /* set init status and unblock parent */ + lnet_acceptor_state.pta_shutdown = rc; + cfs_complete(&lnet_acceptor_state.pta_completion); + + if (rc != 0) + return rc; + + while (lnet_acceptor_state.pta_shutdown == 0) { + + rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock, + &peer_ip, &peer_port); + if (rc != 0) + continue; + + /* maybe we're waken up with libcfs_sock_abort_accept() */ + if ( lnet_acceptor_state.pta_shutdown ) { + close(newsock); + break; + } + + if (secure && peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) { + CERROR("Refusing connection from %u.%u.%u.%u: " + "insecure port %d\n", + HIPQUAD(peer_ip), peer_port); + goto failed; + } + + rc = libcfs_sock_read(newsock, &magic, sizeof(magic), + accept_timeout); + if (rc != 0) { + CERROR("Error %d reading connection request from " + "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); + goto failed; + } + + rc = lnet_accept(newsock, magic, peer_ip, peer_port); + if (rc != 0) + goto failed; + + continue; + + failed: + close(newsock); + } + + close(lnet_acceptor_state.pta_sock); + LCONSOLE(0,"Acceptor stopping\n"); + + /* unblock lnet_acceptor_stop() */ + cfs_complete(&lnet_acceptor_state.pta_completion); + + return 0; +} + +static int skip_waiting_for_completion; int lnet_acceptor_start(void) { - return 0; + long secure; + int rc; + + /* Do nothing if we're liblustre clients */ + if (the_lnet.ln_pid & LNET_PID_USERFLAG) + return 0; + + rc = lnet_get_tunables(); + if ( rc !=0 ) + return rc; + + cfs_init_completion(&lnet_acceptor_state.pta_completion); + + if (!strcmp(accept_type, "secure")) { + secure = 1; + } else if (!strcmp(accept_type, "all")) { + secure = 0; + } else if (!strcmp(accept_type, "none")) { + return 0; + } else { + LCONSOLE_ERROR ("Can't parse 'accept_type=\"%s\"'\n", accept_type); + cfs_fini_completion(&lnet_acceptor_state.pta_completion); + return -EINVAL; + } + + if (lnet_count_acceptor_nis(NULL) == 0) { /* not required */ + skip_waiting_for_completion = 1; + return 0; + } + + rc = cfs_create_thread(lnet_acceptor, (void *)secure); + if (rc) { + CERROR("Can't start acceptor thread: %d\n", rc); + cfs_fini_completion(&lnet_acceptor_state.pta_completion); + return rc; + } + + /* wait for acceptor to startup */ + cfs_wait_for_completion(&lnet_acceptor_state.pta_completion); + + if (lnet_acceptor_state.pta_shutdown == 0) + return 0; + + cfs_fini_completion(&lnet_acceptor_state.pta_completion); + return -ENETDOWN; } void lnet_acceptor_stop(void) { + /* Do nothing if we're liblustre clients */ + if (the_lnet.ln_pid & LNET_PID_USERFLAG) + return; + + if ( !skip_waiting_for_completion ) { + lnet_acceptor_state.pta_shutdown = 1; + libcfs_sock_abort_accept(accept_port); + + /* block until acceptor signals exit */ + cfs_wait_for_completion(&lnet_acceptor_state.pta_completion); + } + + cfs_fini_completion(&lnet_acceptor_state.pta_completion); +} +#else +int +lnet_acceptor_start(void) +{ + return 0; } +void +lnet_acceptor_stop(void) +{ +} +#endif /* !HAVE_LIBPTHREAD */ #endif /* !__KERNEL__ */ diff --git a/lnet/lnet/api-ni.c b/lnet/lnet/api-ni.c index 5f5589c..8581ea4 100644 --- a/lnet/lnet/api-ni.c +++ b/lnet/lnet/api-ni.c @@ -615,6 +615,17 @@ lnet_fini_finalizers(void) LASSERT (list_empty(&the_lnet.ln_finalizeq)); } +#ifndef __KERNEL__ +/* Temporary workaround to allow uOSS and test programs force server + * mode in userspace. See comments near ln_server_mode_flag in + * lnet/lib-types.h */ + +void +lnet_server_mode() { + the_lnet.ln_server_mode_flag = 1; +} +#endif + int lnet_prepare(lnet_pid_t requested_pid) { @@ -630,8 +641,18 @@ lnet_prepare(lnet_pid_t requested_pid) LASSERT ((requested_pid & LNET_PID_USERFLAG) == 0); the_lnet.ln_pid = requested_pid; #else - /* My PID must be unique on this node and flag I'm userspace */ - the_lnet.ln_pid = getpid() | LNET_PID_USERFLAG; + if (the_lnet.ln_server_mode_flag) {/* server case (uOSS) */ + LASSERT ((requested_pid & LNET_PID_USERFLAG) == 0); + + if (cfs_curproc_uid())/* Only root can run user-space server */ + return -EPERM; + the_lnet.ln_pid = requested_pid; + + } else {/* client case (liblustre) */ + + /* My PID must be unique on this node and flag I'm userspace */ + the_lnet.ln_pid = getpid() | LNET_PID_USERFLAG; + } #endif rc = lnet_descriptor_setup(); @@ -837,7 +858,6 @@ lnet_count_acceptor_nis (lnet_ni_t **first_ni) * *first_ni so the acceptor can pass it connections "blind" to retain * binary compatibility. */ int count = 0; -#ifdef __KERNEL__ struct list_head *tmp; lnet_ni_t *ni; @@ -856,7 +876,6 @@ lnet_count_acceptor_nis (lnet_ni_t **first_ni) } LNET_UNLOCK(); -#endif return count; } diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index baf8935..dcd3b49 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -1587,7 +1587,7 @@ srpc_startup (void) #ifdef __KERNEL__ rc = LNetNIInit(LUSTRE_SRV_LNET_PID); #else - rc = LNetNIInit(getpid()); + rc = LNetNIInit(getpid() | LNET_PID_USERFLAG); #endif if (rc < 0) { CERROR ("LNetNIInit() has failed: %d\n", rc); diff --git a/lnet/ulnds/socklnd/Makefile.am b/lnet/ulnds/socklnd/Makefile.am index f970be9..d94443c 100644 --- a/lnet/ulnds/socklnd/Makefile.am +++ b/lnet/ulnds/socklnd/Makefile.am @@ -4,10 +4,8 @@ noinst_LIBRARIES = libsocklnd.a endif endif -noinst_HEADERS = pqtimer.h dispatch.h table.h timer.h \ - connection.h bridge.h procbridge.h -libsocklnd_a_SOURCES = pqtimer.c select.c table.c pqtimer.h \ - dispatch.h table.h timer.h procapi.c proclib.c \ - connection.c tcplnd.c connection.h +noinst_HEADERS = usocklnd.h +libsocklnd_a_SOURCES = usocklnd.h usocklnd.c usocklnd_cb.c poll.c \ + handlers.c conn.c libsocklnd_a_CPPFLAGS = $(LLCPPFLAGS) libsocklnd_a_CFLAGS = $(LLCFLAGS) diff --git a/lnet/ulnds/socklnd/README b/lnet/ulnds/socklnd/README deleted file mode 100644 index 6cb93d9..0000000 --- a/lnet/ulnds/socklnd/README +++ /dev/null @@ -1,53 +0,0 @@ -This library implements two NAL interfaces, both running over IP. -The first, tcpnal, creates TCP connections between participating -processes in order to transport the portals requests. The second, -ernal, provides a simple transport protocol which runs over -UDP datagrams. - -The interface functions return both of these values in host order for -convenience and readability. However this means that addresses -exchanged in messages between hosts of different orderings will not -function properly. - -Both NALs use the same support functions in order to schedule events -and communicate with the generic portals implementation. - - ------------------------- - | api | - |_______________________| - | lib | - |_______________________| - | ernal | |tcpnal | - |--------| |----------| - | udpsock| |connection| - |-----------------------| - | timer/select | - ------------------------- - - - These NALs uses the framework from fdnal of a pipe between the api -and library sides. This is wrapped up in the select on the library -side, and blocks on the api side. Performance could be severely -enhanced by collapsing this aritificial barrier, by using shared -memory queues, or by wiring the api layer directly to the library. - - -nid is defined as the low order 24-bits of the IP address of the -physical node left shifted by 8 plus a virtual node number of 0 -through 255 (really only 239). The virtual node number of a tcpnal -application should be specified using the environment variable -PTL_VIRTNODE. pid is now a completely arbitrary number in the -range of 0 to 255. The IP interface used can be overridden by -specifying the appropriate hostid by setting the PTL_HOSTID -environment variable. The value can be either dotted decimal -(n.n.n.n) or hex starting with "0x". -TCPNAL: - As the NAL needs to try to send to a particular nid/pid pair, it - will open up connections on demand. Because the port associated with - the connecting socket is different from the bound port, two - connections will normally be established between a pair of peers, with - data flowing from the anonymous connect (active) port to the advertised - or well-known bound (passive) port of each peer. - - Should the connection fail to open, an error is reported to the - library component, which causes the api request to fail. diff --git a/lnet/ulnds/socklnd/bridge.h b/lnet/ulnds/socklnd/bridge.h deleted file mode 100644 index a46cb13..0000000 --- a/lnet/ulnds/socklnd/bridge.h +++ /dev/null @@ -1,23 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ - */ - -#ifndef TCPNAL_PROCBRIDGE_H -#define TCPNAL_PROCBRIDGE_H - -#include - -typedef struct bridge { - int alive; - lnet_ni_t *b_ni; - void *lower; - void *local; - /* this doesn't really belong here */ - unsigned char iptop8; -} *bridge; - -#endif diff --git a/lnet/ulnds/socklnd/conn.c b/lnet/ulnds/socklnd/conn.c new file mode 100644 index 0000000..80a0779 --- /dev/null +++ b/lnet/ulnds/socklnd/conn.c @@ -0,0 +1,1081 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001, 2002 Cluster File Systems, Inc. + * Author: Maxim Patlasov + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + */ + +#include "usocklnd.h" + +/* Return 1 if the conn is timed out, 0 else */ +int +usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time) +{ + if (conn->uc_tx_flag && /* sending is in progress */ + cfs_time_aftereq(current_time, conn->uc_tx_deadline)) + return 1; + + if (conn->uc_rx_flag && /* receiving is in progress */ + cfs_time_aftereq(current_time, conn->uc_rx_deadline)) + return 1; + + return 0; +} + +void +usocklnd_conn_kill(usock_conn_t *conn) +{ + pthread_mutex_lock(&conn->uc_lock); + if (conn->uc_state != UC_DEAD) + usocklnd_conn_kill_locked(conn); + pthread_mutex_unlock(&conn->uc_lock); +} + +/* Mark the conn as DEAD and schedule its deletion */ +void +usocklnd_conn_kill_locked(usock_conn_t *conn) +{ + conn->uc_rx_flag = conn->uc_tx_flag = 0; + conn->uc_state = UC_DEAD; + usocklnd_add_killrequest(conn); +} + +usock_conn_t * +usocklnd_conn_allocate() +{ + usock_conn_t *conn; + usock_pollrequest_t *pr; + + LIBCFS_ALLOC (pr, sizeof(*pr)); + if (pr == NULL) + return NULL; + + LIBCFS_ALLOC (conn, sizeof(*conn)); + if (conn == NULL) { + LIBCFS_FREE (pr, sizeof(*pr)); + return NULL; + } + memset(conn, 0, sizeof(*conn)); + conn->uc_preq = pr; + + LIBCFS_ALLOC (conn->uc_rx_hello, + offsetof(ksock_hello_msg_t, + kshm_ips[LNET_MAX_INTERFACES])); + if (conn->uc_rx_hello == NULL) { + LIBCFS_FREE (pr, sizeof(*pr)); + LIBCFS_FREE (conn, sizeof(*conn)); + return NULL; + } + + return conn; +} + +void +usocklnd_conn_free(usock_conn_t *conn) +{ + usock_pollrequest_t *pr = conn->uc_preq; + + if (pr != NULL) + LIBCFS_FREE (pr, sizeof(*pr)); + + if (conn->uc_rx_hello != NULL) + LIBCFS_FREE (conn->uc_rx_hello, + offsetof(ksock_hello_msg_t, + kshm_ips[LNET_MAX_INTERFACES])); + + LIBCFS_FREE (conn, sizeof(*conn)); +} + +void +usocklnd_tear_peer_conn(usock_conn_t *conn) +{ + usock_peer_t *peer = conn->uc_peer; + int idx = usocklnd_type2idx(conn->uc_type); + lnet_ni_t *ni; + lnet_process_id_t id; + int decref_flag = 0; + int killall_flag = 0; + + if (peer == NULL) /* nothing to tear */ + return; + + pthread_mutex_lock(&peer->up_lock); + pthread_mutex_lock(&conn->uc_lock); + + ni = peer->up_ni; + id = peer->up_peerid; + + if (peer->up_conns[idx] == conn) { + if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) { + /* change state not to finalize twice */ + conn->uc_rx_state = UC_RX_KSM_HEADER; + lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO); + } + + usocklnd_destroy_txlist(peer->up_ni, + &conn->uc_tx_list); + + peer->up_conns[idx] = NULL; + conn->uc_peer = NULL; + decref_flag = 1; + + if(conn->uc_errored && !peer->up_errored) + peer->up_errored = killall_flag = 1; + } + + pthread_mutex_unlock(&conn->uc_lock); + + if (killall_flag) + usocklnd_del_conns_locked(peer); + + pthread_mutex_unlock(&peer->up_lock); + + if (!decref_flag) + return; + + usocklnd_conn_decref(conn); + usocklnd_peer_decref(peer); + + usocklnd_check_peer_stale(ni, id); +} + +/* Remove peer from hash list if all up_conns[i] is NULL && + * hash table is the only consumer of the peer */ +void +usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id) +{ + usock_peer_t *peer; + + pthread_rwlock_wrlock(&usock_data.ud_peers_lock); + peer = usocklnd_find_peer_locked(ni, id); + + if (peer == NULL) { + pthread_rwlock_unlock(&usock_data.ud_peers_lock); + return; + } + + if (cfs_atomic_read(&peer->up_refcount) == 2) { + int i; + for (i = 0; i < N_CONN_TYPES; i++) + LASSERT (peer->up_conns[i] == NULL); + + list_del(&peer->up_list); + + if (peer->up_errored && + (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0) + lnet_notify (peer->up_ni, peer->up_peerid.nid, 0, + cfs_time_seconds(peer->up_last_alive)); + + usocklnd_peer_decref(peer); + } + + usocklnd_peer_decref(peer); + pthread_rwlock_unlock(&usock_data.ud_peers_lock); +} + +/* Returns 0 on success, <0 else */ +int +usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp) +{ + int rc; + __u32 peer_ip; + __u16 peer_port; + usock_conn_t *conn; + + rc = libcfs_getpeername(fd, &peer_ip, &peer_port); + if (rc) + return rc; + + rc = usocklnd_set_sock_options(fd); + if (rc) + return rc; + + conn = usocklnd_conn_allocate(); + if (conn == NULL) + return -ENOMEM; + + usocklnd_rx_hellomagic_state_transition(conn); + + conn->uc_fd = fd; + conn->uc_peer_ip = peer_ip; + conn->uc_peer_port = peer_port; + conn->uc_state = UC_RECEIVING_HELLO; + conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip); + conn->uc_ni = ni; + CFS_INIT_LIST_HEAD (&conn->uc_tx_list); + CFS_INIT_LIST_HEAD (&conn->uc_zcack_list); + pthread_mutex_init(&conn->uc_lock, NULL); + cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */ + + *connp = conn; + return 0; +} + +/* Returns 0 on success, <0 else */ +int +usocklnd_create_active_conn(usock_peer_t *peer, int type, + usock_conn_t **connp) +{ + int rc; + int fd; + usock_conn_t *conn; + __u32 dst_ip = LNET_NIDADDR(peer->up_peerid.nid); + __u16 dst_port = lnet_acceptor_port(); + + conn = usocklnd_conn_allocate(); + if (conn == NULL) + return -ENOMEM; + + conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type, + peer->up_peerid.nid); + if (conn->uc_tx_hello == NULL) { + usocklnd_conn_free(conn); + return -ENOMEM; + } + + if (the_lnet.ln_pid & LNET_PID_USERFLAG) + rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port); + else + rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port); + + if (rc) { + usocklnd_destroy_tx(NULL, conn->uc_tx_hello); + usocklnd_conn_free(conn); + return rc; + } + + conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout); + conn->uc_tx_flag = 1; + + conn->uc_fd = fd; + conn->uc_peer_ip = dst_ip; + conn->uc_peer_port = dst_port; + conn->uc_type = type; + conn->uc_activeflag = 1; + conn->uc_state = UC_CONNECTING; + conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip); + conn->uc_ni = NULL; + conn->uc_peerid = peer->up_peerid; + conn->uc_peer = peer; + usocklnd_peer_addref(peer); + CFS_INIT_LIST_HEAD (&conn->uc_tx_list); + CFS_INIT_LIST_HEAD (&conn->uc_zcack_list); + pthread_mutex_init(&conn->uc_lock, NULL); + cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */ + + *connp = conn; + return 0; +} + +/* Returns 0 on success, <0 else */ +int +usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port) +{ + __u16 port; + int fd; + int rc; + + for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; + port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; + port--) { + /* Iterate through reserved ports. */ + + rc = libcfs_sock_create(&fd); + if (rc) + return rc; + + rc = libcfs_sock_bind_to_port(fd, port); + if (rc) { + close(fd); + continue; + } + + rc = usocklnd_set_sock_options(fd); + if (rc) { + close(fd); + return rc; + } + + rc = libcfs_sock_connect(fd, dst_ip, dst_port); + if (rc == 0) { + *fdp = fd; + return 0; + } + + if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) { + close(fd); + return rc; + } + + close(fd); + } + + CERROR("Can't bind to any reserved port\n"); + return rc; +} + +/* Returns 0 on success, <0 else */ +int +usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port) +{ + int fd; + int rc; + + rc = libcfs_sock_create(&fd); + if (rc) + return rc; + + rc = usocklnd_set_sock_options(fd); + if (rc) { + close(fd); + return rc; + } + + rc = libcfs_sock_connect(fd, dst_ip, dst_port); + if (rc) { + close(fd); + return rc; + } + + *fdp = fd; + return 0; +} + +int +usocklnd_set_sock_options(int fd) +{ + int rc; + + rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle); + if (rc) + return rc; + + if (usock_tuns.ut_sockbufsiz) { + rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz); + if (rc) + return rc; + } + + return libcfs_fcntl_nonblock(fd); +} + +void +usocklnd_init_msg(ksock_msg_t *msg, int type) +{ + msg->ksm_type = type; + msg->ksm_csum = 0; + msg->ksm_zc_req_cookie = 0; + msg->ksm_zc_ack_cookie = 0; +} + +usock_tx_t * +usocklnd_create_noop_tx(__u64 cookie) +{ + usock_tx_t *tx; + + LIBCFS_ALLOC (tx, sizeof(usock_tx_t)); + if (tx == NULL) + return NULL; + + tx->tx_size = sizeof(usock_tx_t); + tx->tx_lnetmsg = NULL; + + usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP); + tx->tx_msg.ksm_zc_ack_cookie = cookie; + + tx->tx_iova[0].iov_base = (void *)&tx->tx_msg; + tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob = + offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr); + tx->tx_iov = tx->tx_iova; + tx->tx_niov = 1; + + return tx; +} + +usock_tx_t * +usocklnd_create_tx(lnet_msg_t *lntmsg) +{ + usock_tx_t *tx; + unsigned int payload_niov = lntmsg->msg_niov; + struct iovec *payload_iov = lntmsg->msg_iov; + unsigned int payload_offset = lntmsg->msg_offset; + unsigned int payload_nob = lntmsg->msg_len; + int size = offsetof(usock_tx_t, + tx_iova[1 + payload_niov]); + + LIBCFS_ALLOC (tx, size); + if (tx == NULL) + return NULL; + + tx->tx_size = size; + tx->tx_lnetmsg = lntmsg; + + tx->tx_resid = tx->tx_nob = + offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) + + payload_nob; + + usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET); + tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr; + tx->tx_iova[0].iov_base = (void *)&tx->tx_msg; + tx->tx_iova[0].iov_len = offsetof(ksock_msg_t, + ksm_u.lnetmsg.ksnm_payload); + tx->tx_iov = tx->tx_iova; + + tx->tx_niov = 1 + + lnet_extract_iov(payload_niov, &tx->tx_iov[1], + payload_niov, payload_iov, + payload_offset, payload_nob); + + return tx; +} + +void +usocklnd_init_hello_msg(ksock_hello_msg_t *hello, + lnet_ni_t *ni, int type, lnet_nid_t peer_nid) +{ + usock_net_t *net = (usock_net_t *)ni->ni_data; + + hello->kshm_magic = LNET_PROTO_MAGIC; + hello->kshm_version = KSOCK_PROTO_V2; + hello->kshm_nips = 0; + hello->kshm_ctype = type; + + hello->kshm_dst_incarnation = 0; /* not used */ + hello->kshm_src_incarnation = net->un_incarnation; + + hello->kshm_src_pid = the_lnet.ln_pid; + hello->kshm_src_nid = ni->ni_nid; + hello->kshm_dst_nid = peer_nid; + hello->kshm_dst_pid = 0; /* not used */ +} + +usock_tx_t * +usocklnd_create_hello_tx(lnet_ni_t *ni, + int type, lnet_nid_t peer_nid) +{ + usock_tx_t *tx; + int size; + ksock_hello_msg_t *hello; + + size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips); + LIBCFS_ALLOC (tx, size); + if (tx == NULL) + return NULL; + + tx->tx_size = size; + tx->tx_lnetmsg = NULL; + + hello = (ksock_hello_msg_t *)&tx->tx_iova[1]; + usocklnd_init_hello_msg(hello, ni, type, peer_nid); + + tx->tx_iova[0].iov_base = (void *)hello; + tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob = + offsetof(ksock_hello_msg_t, kshm_ips); + tx->tx_iov = tx->tx_iova; + tx->tx_niov = 1; + + return tx; +} + +usock_tx_t * +usocklnd_create_cr_hello_tx(lnet_ni_t *ni, + int type, lnet_nid_t peer_nid) +{ + usock_tx_t *tx; + int size; + lnet_acceptor_connreq_t *cr; + ksock_hello_msg_t *hello; + + size = sizeof(usock_tx_t) + + sizeof(lnet_acceptor_connreq_t) + + offsetof(ksock_hello_msg_t, kshm_ips); + LIBCFS_ALLOC (tx, size); + if (tx == NULL) + return NULL; + + tx->tx_size = size; + tx->tx_lnetmsg = NULL; + + cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1]; + memset(cr, 0, sizeof(*cr)); + cr->acr_magic = LNET_PROTO_ACCEPTOR_MAGIC; + cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION; + cr->acr_nid = peer_nid; + + hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr)); + usocklnd_init_hello_msg(hello, ni, type, peer_nid); + + tx->tx_iova[0].iov_base = (void *)cr; + tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob = + sizeof(lnet_acceptor_connreq_t) + + offsetof(ksock_hello_msg_t, kshm_ips); + tx->tx_iov = tx->tx_iova; + tx->tx_niov = 1; + + return tx; +} + +void +usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx) +{ + lnet_msg_t *lnetmsg = tx->tx_lnetmsg; + int rc = (tx->tx_resid == 0) ? 0 : -EIO; + + LASSERT (ni != NULL || lnetmsg == NULL); + + LIBCFS_FREE (tx, tx->tx_size); + + if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */ + lnet_finalize(ni, lnetmsg, rc); +} + +void +usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist) +{ + usock_tx_t *tx; + + while (!list_empty(txlist)) { + tx = list_entry(txlist->next, usock_tx_t, tx_list); + list_del(&tx->tx_list); + + usocklnd_destroy_tx(ni, tx); + } +} + +void +usocklnd_destroy_zcack_list(struct list_head *zcack_list) +{ + usock_zc_ack_t *zcack; + + while (!list_empty(zcack_list)) { + zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list); + list_del(&zcack->zc_list); + + LIBCFS_FREE (zcack, sizeof(*zcack)); + } +} + +void +usocklnd_destroy_peer(usock_peer_t *peer) +{ + usock_net_t *net = peer->up_ni->ni_data; + int i; + + for (i = 0; i < N_CONN_TYPES; i++) + LASSERT (peer->up_conns[i] == NULL); + + LIBCFS_FREE (peer, sizeof (*peer)); + + pthread_mutex_lock(&net->un_lock); + if(--net->un_peercount == 0) + pthread_cond_signal(&net->un_cond); + pthread_mutex_unlock(&net->un_lock); +} + +void +usocklnd_destroy_conn(usock_conn_t *conn) +{ + LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL); + + if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) { + LASSERT (conn->uc_peer != NULL); + lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO); + } + + if (!list_empty(&conn->uc_tx_list)) { + LASSERT (conn->uc_peer != NULL); + usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list); + } + + usocklnd_destroy_zcack_list(&conn->uc_zcack_list); + + if (conn->uc_peer != NULL) + usocklnd_peer_decref(conn->uc_peer); + + if (conn->uc_ni != NULL) + lnet_ni_decref(conn->uc_ni); + + if (conn->uc_tx_hello) + usocklnd_destroy_tx(NULL, conn->uc_tx_hello); + + usocklnd_conn_free(conn); +} + +int +usocklnd_get_conn_type(lnet_msg_t *lntmsg) +{ + int nob; + + if (the_lnet.ln_pid & LNET_PID_USERFLAG) + return SOCKLND_CONN_ANY; + + nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) + + lntmsg->msg_len; + + if (nob >= usock_tuns.ut_min_bulk) + return SOCKLND_CONN_BULK_OUT; + else + return SOCKLND_CONN_CONTROL; +} + +int usocklnd_type2idx(int type) +{ + switch (type) { + case SOCKLND_CONN_ANY: + case SOCKLND_CONN_CONTROL: + return 0; + case SOCKLND_CONN_BULK_IN: + return 1; + case SOCKLND_CONN_BULK_OUT: + return 2; + default: + LBUG(); + } +} + +usock_peer_t * +usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id) +{ + struct list_head *peer_list = usocklnd_nid2peerlist(id.nid); + struct list_head *tmp; + usock_peer_t *peer; + + list_for_each (tmp, peer_list) { + + peer = list_entry (tmp, usock_peer_t, up_list); + + if (peer->up_ni != ni) + continue; + + if (peer->up_peerid.nid != id.nid || + peer->up_peerid.pid != id.pid) + continue; + + usocklnd_peer_addref(peer); + return peer; + } + return (NULL); +} + +int +usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id, + usock_peer_t **peerp) +{ + usock_net_t *net = ni->ni_data; + usock_peer_t *peer; + int i; + + LIBCFS_ALLOC (peer, sizeof (*peer)); + if (peer == NULL) + return -ENOMEM; + + for (i = 0; i < N_CONN_TYPES; i++) + peer->up_conns[i] = NULL; + + peer->up_peerid = id; + peer->up_ni = ni; + peer->up_incrn_is_set = 0; + peer->up_errored = 0; + peer->up_last_alive = 0; + cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */ + pthread_mutex_init(&peer->up_lock, NULL); + + pthread_mutex_lock(&net->un_lock); + net->un_peercount++; + pthread_mutex_unlock(&net->un_lock); + + *peerp = peer; + return 0; +} + +/* Safely create new peer if needed. Save result in *peerp. + * Returns 0 on success, <0 else */ +int +usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id, + usock_peer_t **peerp) +{ + int rc; + usock_peer_t *peer; + usock_peer_t *peer2; + usock_net_t *net = ni->ni_data; + + pthread_rwlock_rdlock(&usock_data.ud_peers_lock); + peer = usocklnd_find_peer_locked(ni, id); + pthread_rwlock_unlock(&usock_data.ud_peers_lock); + + if (peer != NULL) + goto find_or_create_peer_done; + + rc = usocklnd_create_peer(ni, id, &peer); + if (rc) + return rc; + + pthread_rwlock_wrlock(&usock_data.ud_peers_lock); + peer2 = usocklnd_find_peer_locked(ni, id); + if (peer2 == NULL) { + if (net->un_shutdown) { + pthread_rwlock_unlock(&usock_data.ud_peers_lock); + usocklnd_peer_decref(peer); /* should destroy peer */ + CERROR("Can't create peer: network shutdown\n"); + return -ESHUTDOWN; + } + + /* peer table will take 1 of my refs on peer */ + usocklnd_peer_addref(peer); + list_add_tail (&peer->up_list, + usocklnd_nid2peerlist(id.nid)); + } else { + usocklnd_peer_decref(peer); /* should destroy peer */ + peer = peer2; + } + pthread_rwlock_unlock(&usock_data.ud_peers_lock); + + find_or_create_peer_done: + *peerp = peer; + return 0; +} + +/* NB: both peer and conn locks are held */ +static int +usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack) +{ + if (conn->uc_state == UC_READY && + list_empty(&conn->uc_tx_list) && + list_empty(&conn->uc_zcack_list) && + !conn->uc_sending) { + int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST, + POLLOUT); + if (rc != 0) + return rc; + } + + list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list); + return 0; +} + +/* NB: both peer and conn locks are held + * NB: if sending isn't in progress. the caller *MUST* send tx + * immediately after we'll return */ +static void +usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx, + int *send_immediately) +{ + if (conn->uc_state == UC_READY && + list_empty(&conn->uc_tx_list) && + list_empty(&conn->uc_zcack_list) && + !conn->uc_sending) { + conn->uc_sending = 1; + *send_immediately = 1; + return; + } + + *send_immediately = 0; + list_add_tail(&tx->tx_list, &conn->uc_tx_list); +} + +/* Safely create new conn if needed. Save result in *connp. + * Returns 0 on success, <0 else */ +int +usocklnd_find_or_create_conn(usock_peer_t *peer, int type, + usock_conn_t **connp, + usock_tx_t *tx, usock_zc_ack_t *zc_ack, + int *send_immediately) +{ + usock_conn_t *conn; + int idx; + int rc; + lnet_pid_t userflag = peer->up_peerid.pid & LNET_PID_USERFLAG; + + if (userflag) + type = SOCKLND_CONN_ANY; + + idx = usocklnd_type2idx(type); + + pthread_mutex_lock(&peer->up_lock); + if (peer->up_conns[idx] != NULL) { + conn = peer->up_conns[idx]; + LASSERT(conn->uc_type == type); + } else { + if (userflag) { + CERROR("Refusing to create a connection to " + "userspace process %s\n", + libcfs_id2str(peer->up_peerid)); + rc = -EHOSTUNREACH; + goto find_or_create_conn_failed; + } + + rc = usocklnd_create_active_conn(peer, type, &conn); + if (rc) { + peer->up_errored = 1; + usocklnd_del_conns_locked(peer); + goto find_or_create_conn_failed; + } + + /* peer takes 1 of conn refcount */ + usocklnd_link_conn_to_peer(conn, peer, idx); + + rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT); + if (rc) { + peer->up_conns[idx] = NULL; + usocklnd_conn_decref(conn); /* should destroy conn */ + goto find_or_create_conn_failed; + } + usocklnd_wakeup_pollthread(conn->uc_pt_idx); + } + + pthread_mutex_lock(&conn->uc_lock); + LASSERT(conn->uc_peer == peer); + + LASSERT(tx == NULL || zc_ack == NULL); + if (tx != NULL) { + usocklnd_enqueue_tx(conn, tx, send_immediately); + } else { + rc = usocklnd_enqueue_zcack(conn, zc_ack); + if (rc != 0) { + usocklnd_conn_kill_locked(conn); + pthread_mutex_unlock(&conn->uc_lock); + goto find_or_create_conn_failed; + } + } + pthread_mutex_unlock(&conn->uc_lock); + + usocklnd_conn_addref(conn); + pthread_mutex_unlock(&peer->up_lock); + + *connp = conn; + return 0; + + find_or_create_conn_failed: + pthread_mutex_unlock(&peer->up_lock); + return rc; +} + +void +usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx) +{ + peer->up_conns[idx] = conn; + peer->up_errored = 0; /* this new fresh conn will try + * revitalize even stale errored peer */ +} + +int +usocklnd_invert_type(int type) +{ + switch (type) + { + case SOCKLND_CONN_ANY: + case SOCKLND_CONN_CONTROL: + return (type); + case SOCKLND_CONN_BULK_IN: + return SOCKLND_CONN_BULK_OUT; + case SOCKLND_CONN_BULK_OUT: + return SOCKLND_CONN_BULK_IN; + default: + return SOCKLND_CONN_NONE; + } +} + +void +usocklnd_conn_new_state(usock_conn_t *conn, int new_state) +{ + pthread_mutex_lock(&conn->uc_lock); + if (conn->uc_state != UC_DEAD) + conn->uc_state = new_state; + pthread_mutex_unlock(&conn->uc_lock); +} + +/* NB: peer is locked by caller */ +void +usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn, + usock_conn_t *skip_conn) +{ + int i; + + if (!peer->up_incrn_is_set) { + peer->up_incarnation = incrn; + peer->up_incrn_is_set = 1; + return; + } + + if (peer->up_incarnation == incrn) + return; + + peer->up_incarnation = incrn; + + for (i = 0; i < N_CONN_TYPES; i++) { + usock_conn_t *conn = peer->up_conns[i]; + + if (conn == NULL || conn == skip_conn) + continue; + + pthread_mutex_lock(&conn->uc_lock); + LASSERT (conn->uc_peer == peer); + conn->uc_peer = NULL; + peer->up_conns[i] = NULL; + if (conn->uc_state != UC_DEAD) + usocklnd_conn_kill_locked(conn); + pthread_mutex_unlock(&conn->uc_lock); + + usocklnd_conn_decref(conn); + usocklnd_peer_decref(peer); + } +} + +/* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive + * MAGIC part of hello and set uc_rx_state + */ +void +usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn) +{ + LASSERT(conn->uc_rx_hello != NULL); + + conn->uc_rx_niov = 1; + conn->uc_rx_iov = conn->uc_rx_iova; + conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic; + conn->uc_rx_iov[0].iov_len = + conn->uc_rx_nob_wanted = + conn->uc_rx_nob_left = + sizeof(conn->uc_rx_hello->kshm_magic); + + conn->uc_rx_state = UC_RX_HELLO_MAGIC; + + conn->uc_rx_flag = 1; /* waiting for incoming hello */ + conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout); +} + +/* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive + * VERSION part of hello and set uc_rx_state + */ +void +usocklnd_rx_helloversion_state_transition(usock_conn_t *conn) +{ + LASSERT(conn->uc_rx_hello != NULL); + + conn->uc_rx_niov = 1; + conn->uc_rx_iov = conn->uc_rx_iova; + conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version; + conn->uc_rx_iov[0].iov_len = + conn->uc_rx_nob_wanted = + conn->uc_rx_nob_left = + sizeof(conn->uc_rx_hello->kshm_version); + + conn->uc_rx_state = UC_RX_HELLO_VERSION; +} + +/* RX state transition to UC_RX_HELLO_BODY: update RX part to receive + * the rest of hello and set uc_rx_state + */ +void +usocklnd_rx_hellobody_state_transition(usock_conn_t *conn) +{ + LASSERT(conn->uc_rx_hello != NULL); + + conn->uc_rx_niov = 1; + conn->uc_rx_iov = conn->uc_rx_iova; + conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid; + conn->uc_rx_iov[0].iov_len = + conn->uc_rx_nob_wanted = + conn->uc_rx_nob_left = + offsetof(ksock_hello_msg_t, kshm_ips) - + offsetof(ksock_hello_msg_t, kshm_src_nid); + + conn->uc_rx_state = UC_RX_HELLO_BODY; +} + +/* RX state transition to UC_RX_HELLO_IPS: update RX part to receive + * array of IPs and set uc_rx_state + */ +void +usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn) +{ + LASSERT(conn->uc_rx_hello != NULL); + + conn->uc_rx_niov = 1; + conn->uc_rx_iov = conn->uc_rx_iova; + conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips; + conn->uc_rx_iov[0].iov_len = + conn->uc_rx_nob_wanted = + conn->uc_rx_nob_left = + conn->uc_rx_hello->kshm_nips * + sizeof(conn->uc_rx_hello->kshm_ips[0]); + + conn->uc_rx_state = UC_RX_HELLO_IPS; +} + +/* RX state transition to UC_RX_LNET_HEADER: update RX part to receive + * LNET header and set uc_rx_state + */ +void +usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn) +{ + conn->uc_rx_niov = 1; + conn->uc_rx_iov = conn->uc_rx_iova; + conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg; + conn->uc_rx_iov[0].iov_len = + conn->uc_rx_nob_wanted = + conn->uc_rx_nob_left = + sizeof(ksock_lnet_msg_t); + + conn->uc_rx_state = UC_RX_LNET_HEADER; + conn->uc_rx_flag = 1; +} + +/* RX state transition to UC_RX_KSM_HEADER: update RX part to receive + * KSM header and set uc_rx_state + */ +void +usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn) +{ + conn->uc_rx_niov = 1; + conn->uc_rx_iov = conn->uc_rx_iova; + conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg; + conn->uc_rx_iov[0].iov_len = + conn->uc_rx_nob_wanted = + conn->uc_rx_nob_left = + offsetof(ksock_msg_t, ksm_u); + + conn->uc_rx_state = UC_RX_KSM_HEADER; + conn->uc_rx_flag = 0; +} + +/* RX state transition to UC_RX_SKIPPING: update RX part for + * skipping and set uc_rx_state + */ +void +usocklnd_rx_skipping_state_transition(usock_conn_t *conn) +{ + static char skip_buffer[4096]; + + int nob; + unsigned int niov = 0; + int skipped = 0; + int nob_to_skip = conn->uc_rx_nob_left; + + LASSERT(nob_to_skip != 0); + + conn->uc_rx_iov = conn->uc_rx_iova; + + /* Set up to skip as much as possible now. If there's more left + * (ran out of iov entries) we'll get called again */ + + do { + nob = MIN (nob_to_skip, sizeof(skip_buffer)); + + conn->uc_rx_iov[niov].iov_base = skip_buffer; + conn->uc_rx_iov[niov].iov_len = nob; + niov++; + skipped += nob; + nob_to_skip -=nob; + + } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */ + niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec)); + + conn->uc_rx_niov = niov; + conn->uc_rx_nob_wanted = skipped; + + conn->uc_rx_state = UC_RX_SKIPPING; +} diff --git a/lnet/ulnds/socklnd/connection.c b/lnet/ulnds/socklnd/connection.c deleted file mode 100644 index b429060..0000000 --- a/lnet/ulnds/socklnd/connection.c +++ /dev/null @@ -1,615 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * - * This file is part of Lustre, http://www.lustre.org. - * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. - * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -/* connection.c: - This file provides a simple stateful connection manager which - builds tcp connections on demand and leaves them open for - future use. -*/ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#ifndef __CYGWIN__ -#include -#endif - -/* tunables (via environment) */ -int tcpnal_acceptor_port = 988; -int tcpnal_buffer_size = 0; -int tcpnal_nagle = 0; - -int -tcpnal_env_param (char *name, int *val) -{ - char *env = getenv(name); - int n; - - if (env == NULL) - return 1; - - n = strlen(env); /* scanf may not assign on EOS */ - if (sscanf(env, "%i%n", val, &n) >= 1 && n == strlen(env)) { - CDEBUG(D_INFO, "Environment variable %s set to %d\n", - name, *val); - return 1; - } - - CERROR("Can't parse environment variable '%s=%s'\n", - name, env); - return 0; -} - -int -tcpnal_set_global_params (void) -{ - return tcpnal_env_param("TCPNAL_PORT", - &tcpnal_acceptor_port) && - tcpnal_env_param("TCPLND_PORT", - &tcpnal_acceptor_port) && - tcpnal_env_param("TCPNAL_BUFFER_SIZE", - &tcpnal_buffer_size) && - tcpnal_env_param("TCPLND_BUFFER_SIZE", - &tcpnal_buffer_size) && - tcpnal_env_param("TCPNAL_NAGLE", - &tcpnal_nagle) && - tcpnal_env_param("TCPLND_NAGLE", - &tcpnal_nagle); -} - -/* Function: compare_connection - * Arguments: connection c: a connection in the hash table - * lnet_process_id_t: an id to verify agains - * Returns: 1 if the connection is the one requested, 0 otherwise - * - * compare_connection() tests for collisions in the hash table - */ -static int compare_connection(void *arg1, void *arg2) -{ - connection c = arg1; - lnet_nid_t *nid = arg2; - - return (c->peer_nid == *nid); -} - -/* Function: connection_key - * Arguments: lnet_process_id_t id: an id to hash - * Returns: a not-particularily-well-distributed hash - * of the id - */ -static unsigned int connection_key(void *arg) -{ - lnet_nid_t *nid = arg; - - return (unsigned int)(*nid); -} - -void -close_connection(void *arg) -{ - connection c = arg; - - close(c->fd); - free(c); -} - -/* Function: remove_connection - * Arguments: c: the connection to remove - */ -void remove_connection(void *arg) -{ - connection c = arg; - - hash_table_remove(c->m->connections,&c->peer_nid); - close_connection(c); -} - - -/* Function: read_connection: - * Arguments: c: the connection to read from - * dest: the buffer to read into - * len: the number of bytes to read - * Returns: success as 1, or failure as 0 - * - * read_connection() reads data from the connection, continuing - * to read partial results until the request is satisfied or - * it errors. TODO: this read should be covered by signal protection. - */ -int read_connection(connection c, - unsigned char *dest, - int len) -{ - int offset = 0,rc; - - if (len) { - do { -#ifndef __CYGWIN__ - rc = syscall(SYS_read, c->fd, dest+offset, len-offset); -#else - rc = recv(c->fd, dest+offset, len-offset, 0); -#endif - if (rc <= 0) { - if (errno == EINTR) { - rc = 0; - } else { - remove_connection(c); - return (0); - } - } - offset += rc; - } while (offset < len); - } - return (1); -} - -static int connection_input(void *d) -{ - connection c = d; - return((*c->m->handler)(c->m->handler_arg,c)); -} - - -static connection -allocate_connection(manager m, - lnet_nid_t nid, - int fd) -{ - connection c=malloc(sizeof(struct connection)); - - c->m=m; - c->fd=fd; - c->peer_nid = nid; - - register_io_handler(fd,READ_HANDLER,connection_input,c); - hash_table_insert(m->connections,c,&nid); - return(c); -} - -int -tcpnal_write(lnet_nid_t nid, int sockfd, void *buffer, int nob) -{ - int rc = syscall(SYS_write, sockfd, buffer, nob); - - /* NB called on an 'empty' socket with huge buffering! */ - if (rc == nob) - return 0; - - if (rc < 0) { - CERROR("Failed to send to %s: %s\n", - libcfs_nid2str(nid), strerror(errno)); - return -1; - } - - CERROR("Short send to %s: %d/%d\n", - libcfs_nid2str(nid), rc, nob); - return -1; -} - -int -tcpnal_read(lnet_nid_t nid, int sockfd, void *buffer, int nob) -{ - int rc; - - while (nob > 0) { - rc = syscall(SYS_read, sockfd, buffer, nob); - - if (rc == 0) { - CERROR("Unexpected EOF from %s\n", - libcfs_nid2str(nid)); - return -1; - } - - if (rc < 0) { - CERROR("Failed to receive from %s: %s\n", - libcfs_nid2str(nid), strerror(errno)); - return -1; - } - - nob -= rc; - } - return 0; -} - -int -tcpnal_hello (int sockfd, lnet_nid_t nid) -{ - struct timeval tv; - __u64 incarnation; - int rc; - int nob; - lnet_acceptor_connreq_t cr; - lnet_hdr_t hdr; - lnet_magicversion_t hmv; - - gettimeofday(&tv, NULL); - incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; - - memset(&cr, 0, sizeof(cr)); - cr.acr_magic = LNET_PROTO_ACCEPTOR_MAGIC; - cr.acr_version = LNET_PROTO_ACCEPTOR_VERSION; - cr.acr_nid = nid; - - /* hmv initialised and copied separately into hdr; compiler "optimize" - * likely due to confusion about pointer alias of hmv and hdr when this - * was done in-place. */ - hmv.magic = cpu_to_le32(LNET_PROTO_TCP_MAGIC); - hmv.version_major = cpu_to_le32(LNET_PROTO_TCP_VERSION_MAJOR); - hmv.version_minor = cpu_to_le32(LNET_PROTO_TCP_VERSION_MINOR); - - memset (&hdr, 0, sizeof (hdr)); - - CLASSERT (sizeof (hmv) == sizeof (hdr.dest_nid)); - memcpy(&hdr.dest_nid, &hmv, sizeof(hmv)); - - /* hdr.src_nid/src_pid are ignored at dest */ - - hdr.type = cpu_to_le32(LNET_MSG_HELLO); - hdr.msg.hello.type = cpu_to_le32(SOCKLND_CONN_ANY); - hdr.msg.hello.incarnation = cpu_to_le64(incarnation); - - /* I don't send any interface info */ - - /* Assume sufficient socket buffering for these messages... */ - rc = tcpnal_write(nid, sockfd, &cr, sizeof(cr)); - if (rc != 0) - return -1; - - rc = tcpnal_write(nid, sockfd, &hdr, sizeof(hdr)); - if (rc != 0) - return -1; - - rc = tcpnal_read(nid, sockfd, &hmv, sizeof(hmv)); - if (rc != 0) - return -1; - - if (hmv.magic != le32_to_cpu(LNET_PROTO_TCP_MAGIC)) { - CERROR ("Bad magic %#08x (%#08x expected) from %s\n", - cpu_to_le32(hmv.magic), LNET_PROTO_TCP_MAGIC, - libcfs_nid2str(nid)); - return -1; - } - - if (hmv.version_major != cpu_to_le16 (LNET_PROTO_TCP_VERSION_MAJOR) || - hmv.version_minor != cpu_to_le16 (LNET_PROTO_TCP_VERSION_MINOR)) { - CERROR ("Incompatible protocol version %d.%d (%d.%d expected)" - " from %s\n", - le16_to_cpu (hmv.version_major), - le16_to_cpu (hmv.version_minor), - LNET_PROTO_TCP_VERSION_MAJOR, - LNET_PROTO_TCP_VERSION_MINOR, - libcfs_nid2str(nid)); - return -1; - } - -#if (LNET_PROTO_TCP_VERSION_MAJOR != 1) -# error "This code only understands protocol version 1.x" -#endif - /* version 1 sends magic/version as the dest_nid of a 'hello' header, - * so read the rest of it in now... */ - - rc = tcpnal_read(nid, sockfd, ((char *)&hdr) + sizeof (hmv), - sizeof(hdr) - sizeof(hmv)); - if (rc != 0) - return -1; - - /* ...and check we got what we expected */ - if (hdr.type != cpu_to_le32 (LNET_MSG_HELLO)) { - CERROR ("Expecting a HELLO hdr " - " but got type %d with %d payload from %s\n", - le32_to_cpu (hdr.type), - le32_to_cpu (hdr.payload_length), libcfs_nid2str(nid)); - return -1; - } - - if (le64_to_cpu(hdr.src_nid) == LNET_NID_ANY) { - CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY\n"); - return -1; - } - - if (nid != le64_to_cpu (hdr.src_nid)) { - CERROR ("Connected to %s, but expecting %s\n", - libcfs_nid2str(le64_to_cpu (hdr.src_nid)), - libcfs_nid2str(nid)); - return -1; - } - - /* Ignore any interface info in the payload */ - nob = le32_to_cpu(hdr.payload_length); - if (nob != 0) { - CERROR("Unexpected HELLO payload %d from %s\n", - nob, libcfs_nid2str(nid)); - return -1; - } - - return 0; -} - -/* Function: force_tcp_connection - * Arguments: t: tcpnal - * dest: portals endpoint for the connection - * Returns: an allocated connection structure, either - * a pre-existing one, or a new connection - */ -connection force_tcp_connection(manager m, - lnet_nid_t nid, - procbridge pb) -{ - unsigned int ip = LNET_NIDADDR(nid); - connection conn; - struct sockaddr_in addr; - struct sockaddr_in locaddr; - int fd; - int option; - int rc; - int sz; - - pthread_mutex_lock(&m->conn_lock); - - conn = hash_table_find(m->connections, &nid); - if (conn) - goto out; - - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(ip); - addr.sin_port = htons(tcpnal_acceptor_port); - - memset(&locaddr, 0, sizeof(locaddr)); - locaddr.sin_family = AF_INET; - locaddr.sin_addr.s_addr = INADDR_ANY; - locaddr.sin_port = htons(m->port); - -#if 1 /* tcpnal connects from a non-privileged port */ - fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) { - perror("tcpnal socket failed"); - goto out; - } - - option = 1; - rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, - &option, sizeof(option)); - if (rc != 0) { - perror ("Can't set SO_REUSEADDR for socket"); - close(fd); - goto out; - } - - if (m->port != 0) { - /* Bind all subsequent connections to the same port */ - rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr)); - if (rc != 0) { - perror("Error binding port"); - close(fd); - goto out; - } - } - - rc = connect(fd, (struct sockaddr *)&addr, - sizeof(struct sockaddr_in)); - if (rc != 0) { - perror("Error connecting to remote host"); - close(fd); - goto out; - } - - sz = sizeof(locaddr); - rc = getsockname(fd, (struct sockaddr *)&locaddr, &sz); - if (rc != 0) { - perror ("Error on getsockname"); - close(fd); - goto out; - } - - if (m->port == 0) - m->port = ntohs(locaddr.sin_port); - -#else - for (rport = IPPORT_RESERVED - 1; rport > IPPORT_RESERVED / 2; --rport) { - fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) { - perror("tcpnal socket failed"); - goto out; - } - - option = 1; - rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, - &option, sizeof(option)); - if (rc != 0) { - perror ("Can't set SO_REUSEADDR for socket"); - close(fd); - goto out; - } - - locaddr.sin_port = htons(rport); - rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr)); - if (rc == 0 || errno == EACCES) { - rc = connect(fd, (struct sockaddr *)&addr, - sizeof(struct sockaddr_in)); - if (rc == 0) { - break; - } else if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) { - perror("Error connecting to remote host"); - close(fd); - goto out; - } - } else if (errno != EADDRINUSE) { - perror("Error binding to privileged port"); - close(fd); - goto out; - } - close(fd); - } - - if (rport == IPPORT_RESERVED / 2) { - fprintf(stderr, "Out of ports trying to bind to a reserved port\n"); - goto out; - } -#endif - - option = tcpnal_nagle ? 0 : 1; - setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option)); - option = tcpnal_buffer_size; - if (option != 0) { - setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option)); - option = tcpnal_buffer_size; - setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option)); - } - - /* say hello */ - if (tcpnal_hello(fd, nid)) { - close(fd); - goto out; - } - - conn = allocate_connection(m, nid, fd); - - /* let nal thread know this event right away */ - if (conn) - procbridge_wakeup_nal(pb); - -out: - pthread_mutex_unlock(&m->conn_lock); - return (conn); -} - - -#if 0 /* we don't accept connections */ -/* Function: new_connection - * Arguments: t: opaque argument holding the tcpname - * Returns: 1 in order to reregister for new connection requests - * - * called when the bound service socket recieves - * a new connection request, it always accepts and - * installs a new connection - */ -static int new_connection(void *z) -{ - manager m=z; - struct sockaddr_in s; - int len=sizeof(struct sockaddr_in); - int fd=accept(m->bound,(struct sockaddr *)&s,&len); - unsigned int nid=*((unsigned int *)&s.sin_addr); - /* cfs specific hack */ - //unsigned short pid=s.sin_port; - pthread_mutex_lock(&m->conn_lock); - allocate_connection(m,htonl(nid),0/*pid*/,fd); - pthread_mutex_unlock(&m->conn_lock); - return(1); -} - -/* Function: bind_socket - * Arguments: t: the nal state for this interface - * port: the port to attempt to bind to - * Returns: 1 on success, or 0 on error - * - * bind_socket() attempts to allocate and bind a socket to the requested - * port, or dynamically assign one from the kernel should the port be - * zero. Sets the bound and bound_handler elements of m. - * - * TODO: The port should be an explicitly sized type. - */ -static int bind_socket(manager m,unsigned short port) -{ - struct sockaddr_in addr; - int alen=sizeof(struct sockaddr_in); - - if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0) - return(0); - - bzero((char *) &addr, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = 0; - addr.sin_port = htons(port); - - if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){ - perror ("tcpnal bind"); - return(0); - } - - getsockname(m->bound,(struct sockaddr *)&addr, &alen); - - m->bound_handler=register_io_handler(m->bound,READ_HANDLER, - new_connection,m); - listen(m->bound,5); - m->port=addr.sin_port; - return(1); -} -#endif - - -/* Function: shutdown_connections - * Arguments: m: the manager structure - * - * close all connections and reclaim resources - */ -void shutdown_connections(manager m) -{ -#if 0 - /* we don't accept connections */ - close(m->bound); - remove_io_handler(m->bound_handler); -#endif - hash_destroy_table(m->connections,close_connection); - free(m); -} - - -/* Function: init_connections - * Arguments: t: the nal state for this interface - * Returns: a newly allocated manager structure, or - * zero if the fixed port could not be bound - */ -manager init_connections(int (*input)(void *, void *), void *a) -{ - manager m = (manager)malloc(sizeof(struct manager)); - - m->connections = hash_create_table(compare_connection,connection_key); - m->handler = input; - m->handler_arg = a; - m->port = 0; /* set on first connection */ - pthread_mutex_init(&m->conn_lock, 0); - - return m; -#if 0 - if (bind_socket(m,pid)) - return(m); - - free(m); - return(0); -#endif -} diff --git a/lnet/ulnds/socklnd/connection.h b/lnet/ulnds/socklnd/connection.h deleted file mode 100644 index 0c4718e..0000000 --- a/lnet/ulnds/socklnd/connection.h +++ /dev/null @@ -1,35 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ - */ - -#include -#include - -typedef struct manager { - table connections; - pthread_mutex_t conn_lock; /* protect connections table */ -#if 0 /* we don't accept connections */ - int bound; - io_handler bound_handler; -#endif - int (*handler)(void *, void *); - void *handler_arg; - int port; -} *manager; - - -typedef struct connection { - lnet_nid_t peer_nid; - int fd; - manager m; -} *connection; - -connection force_tcp_connection(manager m, lnet_nid_t nid, procbridge pb); -manager init_connections(int (*f)(void *, void *), void *); -void remove_connection(void *arg); -void shutdown_connections(manager m); -int read_connection(connection c, unsigned char *dest, int len); diff --git a/lnet/ulnds/socklnd/dispatch.h b/lnet/ulnds/socklnd/dispatch.h deleted file mode 100644 index 510525e..0000000 --- a/lnet/ulnds/socklnd/dispatch.h +++ /dev/null @@ -1,44 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2002 Eric Hoffman - * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ - */ - -/* this file is only called dispatch.h to prevent it - from colliding with /usr/include/sys/select.h */ - -typedef struct io_handler *io_handler; - -struct io_handler{ - io_handler *last; - io_handler next; - int fd; - int type; - int (*function)(void *); - void *argument; - int disabled; -}; - - -#define READ_HANDLER 1 -#define WRITE_HANDLER 2 -#define EXCEPTION_HANDLER 4 -#define ALL_HANDLER (READ_HANDLER | WRITE_HANDLER | EXCEPTION_HANDLER) - -io_handler register_io_handler(int fd, - int type, - int (*function)(void *), - void *arg); - -void remove_io_handler (io_handler i); -void init_unix_timer(void); -void select_timer_block(when until); -when now(void); - -/* - * hacking for CFS internal MPI testing - */ -#undef ENABLE_SELECT_DISPATCH diff --git a/lnet/ulnds/socklnd/handlers.c b/lnet/ulnds/socklnd/handlers.c new file mode 100644 index 0000000..cd84315 --- /dev/null +++ b/lnet/ulnds/socklnd/handlers.c @@ -0,0 +1,1005 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001, 2002 Cluster File Systems, Inc. + * Author: Maxim Patlasov + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + */ + +#include "usocklnd.h" +#include +#include + +int +usocklnd_notifier_handler(int fd) +{ + int notification; + return syscall(SYS_read, fd, ¬ification, sizeof(notification)); +} + +void +usocklnd_exception_handler(usock_conn_t *conn) +{ + pthread_mutex_lock(&conn->uc_lock); + + if (conn->uc_state == UC_CONNECTING || + conn->uc_state == UC_SENDING_HELLO) + usocklnd_conn_kill_locked(conn); + + pthread_mutex_unlock(&conn->uc_lock); +} + +int +usocklnd_read_handler(usock_conn_t *conn) +{ + int rc; + int continue_reading; + int state; + + read_again: + rc = 0; + pthread_mutex_lock(&conn->uc_lock); + state = conn->uc_state; + + /* process special case: LNET calls lnd_recv() asyncronously */ + if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) { + /* still don't have usocklnd_recv() called */ + rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0); + if (rc == 0) + conn->uc_rx_state = UC_RX_PARSE_WAIT; + else + usocklnd_conn_kill_locked(conn); + + pthread_mutex_unlock(&conn->uc_lock); + return rc; + } + + pthread_mutex_unlock(&conn->uc_lock); + /* From here and below the conn cannot be changed + * asyncronously, except: + * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list, + * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */ + + switch (state) { + + case UC_RECEIVING_HELLO: + case UC_READY: + if (conn->uc_rx_nob_wanted != 0) { + /* read from conn fd as much wanted data as possible */ + rc = usocklnd_read_data(conn); + if (rc == 0) /* partial read */ + break; + if (rc < 0) {/* error happened or EOF */ + usocklnd_conn_kill(conn); + break; + } + } + + /* process incoming data */ + if (state == UC_READY ) + rc = usocklnd_read_msg(conn, &continue_reading); + else /* state == UC_RECEIVING_HELLO */ + rc = usocklnd_read_hello(conn, &continue_reading); + + if (rc < 0) { + usocklnd_conn_kill(conn); + break; + } + + if (continue_reading) + goto read_again; + + break; + + case UC_DEAD: + break; + + default: + LBUG(); + } + + return rc; +} + +/* Switch on rx_state. + * Return 0 on success, 1 if whole packet is read, else return <0 + * Always set cont_flag: 1 if we're ready to continue reading, else 0 + * NB: If whole packet is read, cont_flag will be set to zero to take + * care of fairess + */ +int +usocklnd_read_msg(usock_conn_t *conn, int *cont_flag) +{ + int rc = 0; + __u64 cookie; + + *cont_flag = 0; + + /* smth. new emerged in RX part - let's process it */ + switch (conn->uc_rx_state) { + case UC_RX_KSM_HEADER: + if (conn->uc_flip) { + __swab32s(&conn->uc_rx_msg.ksm_type); + __swab32s(&conn->uc_rx_msg.ksm_csum); + __swab64s(&conn->uc_rx_msg.ksm_zc_req_cookie); + __swab64s(&conn->uc_rx_msg.ksm_zc_ack_cookie); + } + + /* we never send packets for wich zc-acking is required */ + if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET || + conn->uc_rx_msg.ksm_zc_ack_cookie != 0) { + conn->uc_errored = 1; + return -EPROTO; + } + + /* zc_req will be processed later, when + lnet payload will be received */ + + usocklnd_rx_lnethdr_state_transition(conn); + *cont_flag = 1; + break; + + case UC_RX_LNET_HEADER: + if (the_lnet.ln_pid & LNET_PID_USERFLAG) { + /* replace dest_nid,pid (ksocknal sets its own) */ + conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid = + cpu_to_le64(conn->uc_peer->up_ni->ni_nid); + conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid = + cpu_to_le32(the_lnet.ln_pid); + + } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) { + /* Userspace peer */ + lnet_process_id_t *id = &conn->uc_peer->up_peerid; + lnet_hdr_t *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr; + + /* Substitute process ID assigned at connection time */ + lhdr->src_pid = cpu_to_le32(id->pid); + lhdr->src_nid = cpu_to_le64(id->nid); + } + + conn->uc_rx_state = UC_RX_PARSE; + usocklnd_conn_addref(conn); /* ++ref while parsing */ + + rc = lnet_parse(conn->uc_peer->up_ni, + &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr, + conn->uc_peerid.nid, conn, 0); + + if (rc < 0) { + /* I just received garbage: give up on this conn */ + conn->uc_errored = 1; + usocklnd_conn_decref(conn); + return -EPROTO; + } + + /* Race with usocklnd_recv() is possible */ + pthread_mutex_lock(&conn->uc_lock); + LASSERT (conn->uc_rx_state == UC_RX_PARSE || + conn->uc_rx_state == UC_RX_LNET_PAYLOAD); + + /* check whether usocklnd_recv() got called */ + if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) + *cont_flag = 1; + pthread_mutex_unlock(&conn->uc_lock); + break; + + case UC_RX_PARSE: + LBUG(); /* it's error to be here, because this special + * case is handled by caller */ + break; + + case UC_RX_PARSE_WAIT: + LBUG(); /* it's error to be here, because the conn + * shouldn't wait for POLLIN event in this + * state */ + break; + + case UC_RX_LNET_PAYLOAD: + /* payload all received */ + + lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0); + + cookie = conn->uc_rx_msg.ksm_zc_req_cookie; + if (cookie != 0) + rc = usocklnd_handle_zc_req(conn->uc_peer, cookie); + + if (rc != 0) { + /* change state not to finalize twice */ + conn->uc_rx_state = UC_RX_KSM_HEADER; + return -EPROTO; + } + + /* Fall through */ + + case UC_RX_SKIPPING: + if (conn->uc_rx_nob_left != 0) { + usocklnd_rx_skipping_state_transition(conn); + *cont_flag = 1; + } else { + usocklnd_rx_ksmhdr_state_transition(conn); + rc = 1; /* whole packet is read */ + } + + break; + + default: + LBUG(); /* unknown state */ + } + + return rc; +} + +/* Handle incoming ZC request from sender. + * NB: it's called only from read_handler, so we're sure that + * the conn cannot become zombie in the middle of processing */ +int +usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie) +{ + usock_conn_t *conn; + usock_zc_ack_t *zc_ack; + int type; + int rc; + int dummy; + + LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack)); + if (zc_ack == NULL) + return -ENOMEM; + zc_ack->zc_cookie = cookie; + + /* Let's assume that CONTROL is the best type for zcack, + * but userspace clients don't use typed connections */ + if (the_lnet.ln_pid & LNET_PID_USERFLAG) + type = SOCKLND_CONN_ANY; + else + type = SOCKLND_CONN_CONTROL; + + rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack, + &dummy); + if (rc != 0) { + LIBCFS_FREE (zc_ack, sizeof(*zc_ack)); + return rc; + } + usocklnd_conn_decref(conn); + + return 0; +} + +/* Switch on rx_state. + * Return 0 on success, else return <0 + * Always set cont_flag: 1 if we're ready to continue reading, else 0 + */ +int +usocklnd_read_hello(usock_conn_t *conn, int *cont_flag) +{ + int rc = 0; + ksock_hello_msg_t *hello = conn->uc_rx_hello; + + *cont_flag = 0; + + /* smth. new emerged in hello - let's process it */ + switch (conn->uc_rx_state) { + case UC_RX_HELLO_MAGIC: + if (hello->kshm_magic == LNET_PROTO_MAGIC) + conn->uc_flip = 0; + else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC)) + conn->uc_flip = 1; + else + return -EPROTO; + + usocklnd_rx_helloversion_state_transition(conn); + *cont_flag = 1; + break; + + case UC_RX_HELLO_VERSION: + if ((!conn->uc_flip && + (hello->kshm_version != KSOCK_PROTO_V2)) || + (conn->uc_flip && + (hello->kshm_version != __swab32(KSOCK_PROTO_V2)))) + return -EPROTO; + + usocklnd_rx_hellobody_state_transition(conn); + *cont_flag = 1; + break; + + case UC_RX_HELLO_BODY: + if (conn->uc_flip) { + ksock_hello_msg_t *hello = conn->uc_rx_hello; + __swab32s(&hello->kshm_src_pid); + __swab64s(&hello->kshm_src_nid); + __swab32s(&hello->kshm_dst_pid); + __swab64s(&hello->kshm_dst_nid); + __swab64s(&hello->kshm_src_incarnation); + __swab64s(&hello->kshm_dst_incarnation); + __swab32s(&hello->kshm_ctype); + __swab32s(&hello->kshm_nips); + } + + if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) { + CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n", + conn->uc_rx_hello->kshm_nips, + HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port); + return -EPROTO; + } + + if (conn->uc_rx_hello->kshm_nips) { + usocklnd_rx_helloIPs_state_transition(conn); + *cont_flag = 1; + break; + } + /* fall through */ + + case UC_RX_HELLO_IPS: + if (conn->uc_activeflag == 1) /* active conn */ + rc = usocklnd_activeconn_hellorecv(conn); + else /* passive conn */ + rc = usocklnd_passiveconn_hellorecv(conn); + + break; + + default: + LBUG(); /* unknown state */ + } + + return rc; +} + +/* All actions that we need after receiving hello on active conn: + * 1) Schedule removing if we're zombie + * 2) Restart active conn if we lost the race + * 3) Else: update RX part to receive KSM header + */ +int +usocklnd_activeconn_hellorecv(usock_conn_t *conn) +{ + int rc = 0; + ksock_hello_msg_t *hello = conn->uc_rx_hello; + usock_peer_t *peer = conn->uc_peer; + + /* Active conn with peer==NULL is zombie. + * Don't try to link it to peer because the conn + * has already had a chance to proceed at the beginning */ + if (peer == NULL) { + LASSERT(list_empty(&conn->uc_tx_list) && + list_empty(&conn->uc_zcack_list)); + + usocklnd_conn_kill(conn); + return 0; + } + + peer->up_last_alive = cfs_time_current(); + + /* peer says that we lost the race */ + if (hello->kshm_ctype == SOCKLND_CONN_NONE) { + /* Start new active conn, relink txs and zc_acks from + * the conn to new conn, schedule removing the conn. + * Actually, we're expecting that a passive conn will + * make us zombie soon and take care of our txs and + * zc_acks */ + + struct list_head tx_list, zcack_list; + usock_conn_t *conn2; + int idx = usocklnd_type2idx(conn->uc_type); + + CFS_INIT_LIST_HEAD (&tx_list); + CFS_INIT_LIST_HEAD (&zcack_list); + + /* Block usocklnd_send() to check peer->up_conns[idx] + * and to enqueue more txs */ + pthread_mutex_lock(&peer->up_lock); + pthread_mutex_lock(&conn->uc_lock); + + /* usocklnd_shutdown() could kill us */ + if (conn->uc_state == UC_DEAD) { + pthread_mutex_unlock(&conn->uc_lock); + pthread_mutex_unlock(&peer->up_lock); + return 0; + } + + LASSERT (peer == conn->uc_peer); + LASSERT (peer->up_conns[idx] == conn); + + rc = usocklnd_create_active_conn(peer, conn->uc_type, &conn2); + if (rc) { + conn->uc_errored = 1; + pthread_mutex_unlock(&conn->uc_lock); + pthread_mutex_unlock(&peer->up_lock); + return rc; + } + + usocklnd_link_conn_to_peer(conn2, peer, idx); + conn2->uc_peer = peer; + + /* unlink txs and zcack from the conn */ + list_add(&tx_list, &conn->uc_tx_list); + list_del_init(&conn->uc_tx_list); + list_add(&zcack_list, &conn->uc_zcack_list); + list_del_init(&conn->uc_zcack_list); + + /* link they to the conn2 */ + list_add(&conn2->uc_tx_list, &tx_list); + list_del_init(&tx_list); + list_add(&conn2->uc_zcack_list, &zcack_list); + list_del_init(&zcack_list); + + /* make conn zombie */ + conn->uc_peer = NULL; + usocklnd_peer_decref(peer); + + /* schedule conn2 for processing */ + rc = usocklnd_add_pollrequest(conn2, POLL_ADD_REQUEST, POLLOUT); + if (rc) { + peer->up_conns[idx] = NULL; + usocklnd_conn_decref(conn2); /* should destroy conn */ + } else { + usocklnd_conn_kill_locked(conn); + } + + pthread_mutex_unlock(&conn->uc_lock); + pthread_mutex_unlock(&peer->up_lock); + usocklnd_conn_decref(conn); + + } else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */ + if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype)) + return -EPROTO; + + pthread_mutex_lock(&peer->up_lock); + usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, + conn); + pthread_mutex_unlock(&peer->up_lock); + + /* safely transit to UC_READY state */ + /* rc == 0 */ + pthread_mutex_lock(&conn->uc_lock); + if (conn->uc_state != UC_DEAD) { + usocklnd_rx_ksmhdr_state_transition(conn); + + /* POLLIN is already set because we just + * received hello, but maybe we've smth. to + * send? */ + LASSERT (conn->uc_sending == 0); + if ( !list_empty(&conn->uc_tx_list) || + !list_empty(&conn->uc_zcack_list) ) { + + conn->uc_tx_deadline = + cfs_time_shift(usock_tuns.ut_timeout); + conn->uc_tx_flag = 1; + rc = usocklnd_add_pollrequest(conn, + POLL_SET_REQUEST, + POLLIN | POLLOUT); + } + + if (rc == 0) + conn->uc_state = UC_READY; + } + pthread_mutex_unlock(&conn->uc_lock); + } + + return rc; +} + +/* All actions that we need after receiving hello on passive conn: + * 1) Stash peer's nid, pid, incarnation and conn type + * 2) Cope with easy case: conn[idx] is empty - just save conn there + * 3) Resolve race: + * a) if our nid is higher - reply with CONN_NONE and make us zombie + * b) if peer's nid is higher - postpone race resolution till + * READY state + * 4) Anyhow, send reply hello +*/ +int +usocklnd_passiveconn_hellorecv(usock_conn_t *conn) +{ + ksock_hello_msg_t *hello = conn->uc_rx_hello; + int type; + int idx; + int rc; + usock_peer_t *peer; + lnet_ni_t *ni = conn->uc_ni; + __u32 peer_ip = conn->uc_peer_ip; + __u16 peer_port = conn->uc_peer_port; + + /* don't know parent peer yet and not zombie */ + LASSERT (conn->uc_peer == NULL && + ni != NULL); + + /* don't know peer's nid and incarnation yet */ + if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) { + /* do not trust liblustre clients */ + conn->uc_peerid.pid = peer_port | LNET_PID_USERFLAG; + conn->uc_peerid.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), + peer_ip); + if (hello->kshm_ctype != SOCKLND_CONN_ANY) { + lnet_ni_decref(ni); + conn->uc_ni = NULL; + CERROR("Refusing to accept connection of type=%d from " + "userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype, + HIPQUAD(peer_ip), peer_port); + return -EINVAL; + } + } else { + conn->uc_peerid.pid = hello->kshm_src_pid; + conn->uc_peerid.nid = hello->kshm_src_nid; + } + conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype); + + rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer); + if (rc) { + lnet_ni_decref(ni); + conn->uc_ni = NULL; + return rc; + } + + peer->up_last_alive = cfs_time_current(); + + idx = usocklnd_type2idx(conn->uc_type); + + /* safely check whether we're first */ + pthread_mutex_lock(&peer->up_lock); + + usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL); + + if (peer->up_conns[idx] == NULL) { + peer->up_last_alive = cfs_time_current(); + conn->uc_peer = peer; + conn->uc_ni = NULL; + usocklnd_link_conn_to_peer(conn, peer, idx); + usocklnd_conn_addref(conn); + } else { + usocklnd_peer_decref(peer); + + /* Resolve race in favour of higher NID */ + if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) { + /* make us zombie */ + conn->uc_ni = NULL; + type = SOCKLND_CONN_NONE; + } + + /* if conn->uc_peerid.nid > conn->uc_ni->ni_nid, + * postpone race resolution till READY state + * (hopefully that conn[idx] will die because of + * incoming hello of CONN_NONE type) */ + } + pthread_mutex_unlock(&peer->up_lock); + + /* allocate and initialize fake tx with hello */ + conn->uc_tx_hello = usocklnd_create_hello_tx(ni, type, + conn->uc_peerid.nid); + if (conn->uc_ni == NULL) + lnet_ni_decref(ni); + + if (conn->uc_tx_hello == NULL) + return -ENOMEM; + + /* rc == 0 */ + pthread_mutex_lock(&conn->uc_lock); + if (conn->uc_state == UC_DEAD) + goto passive_hellorecv_done; + + conn->uc_state = UC_SENDING_HELLO; + conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout); + conn->uc_tx_flag = 1; + rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT); + + passive_hellorecv_done: + pthread_mutex_unlock(&conn->uc_lock); + return rc; +} + +int +usocklnd_write_handler(usock_conn_t *conn) +{ + usock_tx_t *tx; + int ret; + int rc = 0; + int state; + usock_peer_t *peer; + lnet_ni_t *ni; + + pthread_mutex_lock(&conn->uc_lock); /* like membar */ + state = conn->uc_state; + pthread_mutex_unlock(&conn->uc_lock); + + switch (state) { + case UC_CONNECTING: + /* hello_tx has already been initialized + * in usocklnd_create_active_conn() */ + usocklnd_conn_new_state(conn, UC_SENDING_HELLO); + /* fall through */ + + case UC_SENDING_HELLO: + rc = usocklnd_send_tx(conn, conn->uc_tx_hello); + if (rc <= 0) /* error or partial send or connection closed */ + break; + + /* tx with hello was sent successfully */ + usocklnd_destroy_tx(NULL, conn->uc_tx_hello); + conn->uc_tx_hello = NULL; + + if (conn->uc_activeflag == 1) /* active conn */ + rc = usocklnd_activeconn_hellosent(conn); + else /* passive conn */ + rc = usocklnd_passiveconn_hellosent(conn); + + break; + + case UC_READY: + pthread_mutex_lock(&conn->uc_lock); + + peer = conn->uc_peer; + LASSERT (peer != NULL); + ni = peer->up_ni; + + if (list_empty(&conn->uc_tx_list) && + list_empty(&conn->uc_zcack_list)) { + LASSERT(usock_tuns.ut_fair_limit > 1); + pthread_mutex_unlock(&conn->uc_lock); + return 0; + } + + tx = usocklnd_try_piggyback(&conn->uc_tx_list, + &conn->uc_zcack_list); + if (tx != NULL) + conn->uc_sending = 1; + else + rc = -ENOMEM; + + pthread_mutex_unlock(&conn->uc_lock); + + if (rc) + break; + + rc = usocklnd_send_tx(conn, tx); + if (rc == 0) { /* partial send or connection closed */ + pthread_mutex_lock(&conn->uc_lock); + list_add(&tx->tx_list, &conn->uc_tx_list); + conn->uc_sending = 0; + pthread_mutex_unlock(&conn->uc_lock); + break; + } + if (rc < 0) { /* real error */ + usocklnd_destroy_tx(ni, tx); + break; + } + + /* rc == 1: tx was sent completely */ + usocklnd_destroy_tx(ni, tx); + + pthread_mutex_lock(&conn->uc_lock); + conn->uc_sending = 0; + if (conn->uc_state != UC_DEAD && + list_empty(&conn->uc_tx_list) && + list_empty(&conn->uc_zcack_list)) { + conn->uc_tx_flag = 0; + ret = usocklnd_add_pollrequest(conn, + POLL_TX_SET_REQUEST, 0); + if (ret) + rc = ret; + } + pthread_mutex_unlock(&conn->uc_lock); + + break; + + case UC_DEAD: + break; + + default: + LBUG(); + } + + if (rc < 0) + usocklnd_conn_kill(conn); + + return rc; +} + +/* Return the first tx from tx_list with piggybacked zc_ack + * from zcack_list when possible. If tx_list is empty, return + * brand new noop tx for zc_ack from zcack_list. Return NULL + * if an error happened */ +usock_tx_t * +usocklnd_try_piggyback(struct list_head *tx_list_p, + struct list_head *zcack_list_p) +{ + usock_tx_t *tx; + usock_zc_ack_t *zc_ack; + + /* assign tx and zc_ack */ + if (list_empty(tx_list_p)) + tx = NULL; + else { + tx = list_entry(tx_list_p->next, usock_tx_t, tx_list); + list_del(&tx->tx_list); + + /* already piggybacked or partially send */ + if (tx->tx_msg.ksm_zc_ack_cookie || + tx->tx_resid != tx->tx_nob) + return tx; + } + + if (list_empty(zcack_list_p)) { + /* nothing to piggyback */ + return tx; + } else { + zc_ack = list_entry(zcack_list_p->next, + usock_zc_ack_t, zc_list); + list_del(&zc_ack->zc_list); + } + + if (tx != NULL) + /* piggyback the zc-ack cookie */ + tx->tx_msg.ksm_zc_ack_cookie = zc_ack->zc_cookie; + else + /* cannot piggyback, need noop */ + tx = usocklnd_create_noop_tx(zc_ack->zc_cookie); + + LIBCFS_FREE (zc_ack, sizeof(*zc_ack)); + return tx; +} + +/* All actions that we need after sending hello on active conn: + * 1) update RX iov to receive hello + * 2) state transition to UC_RECEIVING_HELLO + * 3) notify poll_thread that we're waiting for incoming hello */ +int +usocklnd_activeconn_hellosent(usock_conn_t *conn) +{ + int rc = 0; + + pthread_mutex_lock(&conn->uc_lock); + + if (conn->uc_state != UC_DEAD) { + usocklnd_rx_hellomagic_state_transition(conn); + conn->uc_state = UC_RECEIVING_HELLO; + conn->uc_tx_flag = 0; + rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN); + } + + pthread_mutex_unlock(&conn->uc_lock); + + return rc; +} + +/* All actions that we need after sending hello on passive conn: + * 1) Cope with 1st easy case: conn is already linked to a peer + * 2) Cope with 2nd easy case: remove zombie conn + * 3) Resolve race: + * a) find the peer + * b) link the conn to the peer if conn[idx] is empty + * c) if the conn[idx] isn't empty and is in READY state, + * remove the conn as duplicated + * d) if the conn[idx] isn't empty and isn't in READY state, + * override conn[idx] with the conn + */ +int +usocklnd_passiveconn_hellosent(usock_conn_t *conn) +{ + usock_conn_t *conn2; + usock_peer_t *peer; + struct list_head tx_list; + struct list_head zcack_list; + int idx; + int rc = 0; + + /* almost nothing to do if conn is already linked to peer hash table */ + if (conn->uc_peer != NULL) + goto passive_hellosent_done; + + /* conn->uc_peer == NULL, so the conn isn't accessible via + * peer hash list, so nobody can touch the conn but us */ + + if (conn->uc_ni == NULL) /* remove zombie conn */ + goto passive_hellosent_connkill; + + /* all code below is race resolution, because normally + * passive conn is linked to peer just after receiving hello */ + CFS_INIT_LIST_HEAD (&tx_list); + CFS_INIT_LIST_HEAD (&zcack_list); + + /* conn is passive and isn't linked to any peer, + so its tx and zc_ack lists have to be empty */ + LASSERT (list_empty(&conn->uc_tx_list) && + list_empty(&conn->uc_zcack_list) && + conn->uc_sending == 0); + + rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer); + if (rc) + return rc; + + idx = usocklnd_type2idx(conn->uc_type); + + /* try to link conn to peer */ + pthread_mutex_lock(&peer->up_lock); + if (peer->up_conns[idx] == NULL) { + usocklnd_link_conn_to_peer(conn, peer, idx); + usocklnd_conn_addref(conn); + conn->uc_peer = peer; + usocklnd_peer_addref(peer); + } else { + conn2 = peer->up_conns[idx]; + pthread_mutex_lock(&conn2->uc_lock); + + if (conn2->uc_state == UC_READY) { + /* conn2 is in READY state, so conn is "duplicated" */ + pthread_mutex_unlock(&conn2->uc_lock); + pthread_mutex_unlock(&peer->up_lock); + usocklnd_peer_decref(peer); + goto passive_hellosent_connkill; + } + + /* uc_state != UC_READY => switch conn and conn2 */ + /* Relink txs and zc_acks from conn2 to conn. + * We're sure that nobody but us can access to conn, + * nevertheless we use mutex (if we're wrong yet, + * deadlock is easy to see that corrupted list */ + list_add(&tx_list, &conn2->uc_tx_list); + list_del_init(&conn2->uc_tx_list); + list_add(&zcack_list, &conn2->uc_zcack_list); + list_del_init(&conn2->uc_zcack_list); + + pthread_mutex_lock(&conn->uc_lock); + list_add_tail(&conn->uc_tx_list, &tx_list); + list_del_init(&tx_list); + list_add_tail(&conn->uc_zcack_list, &zcack_list); + list_del_init(&zcack_list); + conn->uc_peer = peer; + pthread_mutex_unlock(&conn->uc_lock); + + conn2->uc_peer = NULL; /* make conn2 zombie */ + pthread_mutex_unlock(&conn2->uc_lock); + usocklnd_conn_decref(conn2); + + usocklnd_link_conn_to_peer(conn, peer, idx); + usocklnd_conn_addref(conn); + conn->uc_peer = peer; + } + + lnet_ni_decref(conn->uc_ni); + conn->uc_ni = NULL; + pthread_mutex_unlock(&peer->up_lock); + usocklnd_peer_decref(peer); + + passive_hellosent_done: + /* safely transit to UC_READY state */ + /* rc == 0 */ + pthread_mutex_lock(&conn->uc_lock); + if (conn->uc_state != UC_DEAD) { + usocklnd_rx_ksmhdr_state_transition(conn); + + /* we're ready to recive incoming packets and maybe + already have smth. to transmit */ + LASSERT (conn->uc_sending == 0); + if ( list_empty(&conn->uc_tx_list) && + list_empty(&conn->uc_zcack_list) ) { + conn->uc_tx_flag = 0; + rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, + POLLIN); + } else { + conn->uc_tx_deadline = + cfs_time_shift(usock_tuns.ut_timeout); + conn->uc_tx_flag = 1; + rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, + POLLIN | POLLOUT); + } + + if (rc == 0) + conn->uc_state = UC_READY; + } + pthread_mutex_unlock(&conn->uc_lock); + return rc; + + passive_hellosent_connkill: + usocklnd_conn_kill(conn); + return 0; +} + +/* Send as much tx data as possible. + * Returns 0 or 1 on succsess, <0 if fatal error. + * 0 means partial send or non-fatal error, 1 - complete. + * Rely on libcfs_sock_writev() for differentiating fatal and + * non-fatal errors. An error should be considered as non-fatal if: + * 1) it still makes sense to continue reading && + * 2) anyway, poll() will set up POLLHUP|POLLERR flags */ +int +usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx) +{ + struct iovec *iov; + int nob; + int fd = conn->uc_fd; + cfs_time_t t; + + LASSERT (tx->tx_resid != 0); + + do { + usock_peer_t *peer = conn->uc_peer; + + LASSERT (tx->tx_niov > 0); + + nob = libcfs_sock_writev(fd, tx->tx_iov, tx->tx_niov); + if (nob < 0) + conn->uc_errored = 1; + if (nob <= 0) /* write queue is flow-controlled or error */ + return nob; + + LASSERT (nob <= tx->tx_resid); + tx->tx_resid -= nob; + t = cfs_time_current(); + conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout)); + + if(peer != NULL) + peer->up_last_alive = t; + + /* "consume" iov */ + iov = tx->tx_iov; + do { + LASSERT (tx->tx_niov > 0); + + if (nob < iov->iov_len) { + iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); + iov->iov_len -= nob; + break; + } + + nob -= iov->iov_len; + tx->tx_iov = ++iov; + tx->tx_niov--; + } while (nob != 0); + + } while (tx->tx_resid != 0); + + return 1; /* send complete */ +} + +/* Read from wire as much data as possible. + * Returns 0 or 1 on succsess, <0 if error or EOF. + * 0 means partial read, 1 - complete */ +int +usocklnd_read_data(usock_conn_t *conn) +{ + struct iovec *iov; + int nob; + cfs_time_t t; + + LASSERT (conn->uc_rx_nob_wanted != 0); + + do { + usock_peer_t *peer = conn->uc_peer; + + LASSERT (conn->uc_rx_niov > 0); + + nob = libcfs_sock_readv(conn->uc_fd, conn->uc_rx_iov, conn->uc_rx_niov); + if (nob <= 0) {/* read nothing or error */ + conn->uc_errored = 1; + return nob; + } + + LASSERT (nob <= conn->uc_rx_nob_wanted); + conn->uc_rx_nob_wanted -= nob; + conn->uc_rx_nob_left -= nob; + t = cfs_time_current(); + conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout)); + + if(peer != NULL) + peer->up_last_alive = t; + + /* "consume" iov */ + iov = conn->uc_rx_iov; + do { + LASSERT (conn->uc_rx_niov > 0); + + if (nob < iov->iov_len) { + iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); + iov->iov_len -= nob; + break; + } + + nob -= iov->iov_len; + conn->uc_rx_iov = ++iov; + conn->uc_rx_niov--; + } while (nob != 0); + + } while (conn->uc_rx_nob_wanted != 0); + + return 1; /* read complete */ +} diff --git a/lnet/ulnds/socklnd/poll.c b/lnet/ulnds/socklnd/poll.c new file mode 100644 index 0000000..9c346a7 --- /dev/null +++ b/lnet/ulnds/socklnd/poll.c @@ -0,0 +1,483 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001, 2002 Cluster File Systems, Inc. + * Author: Maxim Patlasov + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + */ + +#include "usocklnd.h" +#include +#include + +void +usocklnd_process_stale_list(usock_pollthread_t *pt_data) +{ + while (!list_empty(&pt_data->upt_stale_list)) { + usock_conn_t *conn; + conn = list_entry(pt_data->upt_stale_list.next, + usock_conn_t, uc_stale_list); + + list_del(&conn->uc_stale_list); + + usocklnd_tear_peer_conn(conn); + usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */ + } +} + +int +usocklnd_poll_thread(void *arg) +{ + int rc = 0; + usock_pollthread_t *pt_data = (usock_pollthread_t *)arg; + cfs_time_t current_time; + cfs_time_t planned_time; + int idx; + int idx_start; + int idx_finish; + int chunk; + int saved_nfds; + int extra; + int times; + + /* mask signals to avoid SIGPIPE, etc */ + sigset_t sigs; + sigfillset (&sigs); + pthread_sigmask (SIG_SETMASK, &sigs, 0); + + LASSERT(pt_data != NULL); + + planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout); + chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds); + saved_nfds = pt_data->upt_nfds; + idx_start = 1; + + /* Main loop */ + while (usock_data.ud_shutdown == 0) { + rc = 0; + + /* Process all enqueued poll requests */ + pthread_mutex_lock(&pt_data->upt_pollrequests_lock); + while (!list_empty(&pt_data->upt_pollrequests)) { + usock_pollrequest_t *pr; + pr = list_entry(pt_data->upt_pollrequests.next, + usock_pollrequest_t, upr_list); + + list_del(&pr->upr_list); + rc = usocklnd_process_pollrequest(pr, pt_data); + if (rc) + break; + } + pthread_mutex_unlock(&pt_data->upt_pollrequests_lock); + + if (rc) + break; + + /* Delete conns orphaned due to POLL_DEL_REQUESTs */ + usocklnd_process_stale_list(pt_data); + + /* Actual polling for events */ + rc = poll(pt_data->upt_pollfd, + pt_data->upt_nfds, + usock_tuns.ut_poll_timeout * 1000); + + if (rc < 0) { + CERROR("Cannot poll(2): errno=%d\n", errno); + break; + } + + if (rc > 0) + usocklnd_execute_handlers(pt_data); + + current_time = cfs_time_current(); + + if (pt_data->upt_nfds < 2 || + cfs_time_before(current_time, planned_time)) + continue; + + /* catch up growing pollfd[] */ + if (pt_data->upt_nfds > saved_nfds) { + extra = pt_data->upt_nfds - saved_nfds; + saved_nfds = pt_data->upt_nfds; + } else { + extra = 0; + } + + times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1; + idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds); + + for (idx = idx_start; idx < idx_finish; idx++) { + usock_conn_t *conn = pt_data->upt_idx2conn[idx]; + pthread_mutex_lock(&conn->uc_lock); + if (usocklnd_conn_timed_out(conn, current_time) && + conn->uc_state != UC_DEAD) { + conn->uc_errored = 1; + usocklnd_conn_kill_locked(conn); + } + pthread_mutex_unlock(&conn->uc_lock); + } + + if (idx_finish == pt_data->upt_nfds) { + chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds); + saved_nfds = pt_data->upt_nfds; + idx_start = 1; + } + else { + idx_start = idx_finish; + } + + planned_time = cfs_time_add(current_time, + cfs_time_seconds(usock_tuns.ut_poll_timeout)); + } + + /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */ + LASSERT (rc != 0 || pt_data->upt_nfds == 1); + + if (rc) { + pthread_mutex_lock(&pt_data->upt_pollrequests_lock); + + /* Block new poll requests to be enqueued */ + pt_data->upt_errno = rc; + + while (!list_empty(&pt_data->upt_pollrequests)) { + usock_pollrequest_t *pr; + pr = list_entry(pt_data->upt_pollrequests.next, + usock_pollrequest_t, upr_list); + + list_del(&pr->upr_list); + + if (pr->upr_type == POLL_ADD_REQUEST) { + close(pr->upr_conn->uc_fd); + list_add_tail(&pr->upr_conn->uc_stale_list, + &pt_data->upt_stale_list); + } else { + usocklnd_conn_decref(pr->upr_conn); + } + + LIBCFS_FREE (pr, sizeof(*pr)); + } + pthread_mutex_unlock(&pt_data->upt_pollrequests_lock); + + usocklnd_process_stale_list(pt_data); + + for (idx = 1; idx < pt_data->upt_nfds; idx++) { + usock_conn_t *conn = pt_data->upt_idx2conn[idx]; + LASSERT(conn != NULL); + close(conn->uc_fd); + usocklnd_tear_peer_conn(conn); + usocklnd_conn_decref(conn); + } + } + + /* unblock usocklnd_shutdown() */ + cfs_complete(&pt_data->upt_completion); + + return 0; +} + +/* Returns 0 on success, <0 else */ +int +usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value) +{ + int pt_idx = conn->uc_pt_idx; + usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx]; + usock_pollrequest_t *pr; + + LIBCFS_ALLOC(pr, sizeof(*pr)); + if (pr == NULL) { + CERROR ("Cannot allocate poll request\n"); + return -ENOMEM; + } + + pr->upr_conn = conn; + pr->upr_type = type; + pr->upr_value = value; + + usocklnd_conn_addref(conn); /* +1 for poll request */ + + pthread_mutex_lock(&pt->upt_pollrequests_lock); + + if (pt->upt_errno) { /* very rare case: errored poll thread */ + int rc = pt->upt_errno; + pthread_mutex_unlock(&pt->upt_pollrequests_lock); + usocklnd_conn_decref(conn); + LIBCFS_FREE(pr, sizeof(*pr)); + return rc; + } + + list_add_tail(&pr->upr_list, &pt->upt_pollrequests); + pthread_mutex_unlock(&pt->upt_pollrequests_lock); + return 0; +} + +void +usocklnd_add_killrequest(usock_conn_t *conn) +{ + int pt_idx = conn->uc_pt_idx; + usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx]; + usock_pollrequest_t *pr = conn->uc_preq; + + /* Use preallocated poll request because there is no good + * workaround for ENOMEM error while killing connection */ + if (pr) { + pr->upr_conn = conn; + pr->upr_type = POLL_DEL_REQUEST; + pr->upr_value = 0; + + usocklnd_conn_addref(conn); /* +1 for poll request */ + + pthread_mutex_lock(&pt->upt_pollrequests_lock); + + if (pt->upt_errno) { /* very rare case: errored poll thread */ + pthread_mutex_unlock(&pt->upt_pollrequests_lock); + usocklnd_conn_decref(conn); + return; /* conn will be killed in poll thread anyway */ + } + + list_add_tail(&pr->upr_list, &pt->upt_pollrequests); + pthread_mutex_unlock(&pt->upt_pollrequests_lock); + + conn->uc_preq = NULL; + } +} + +/* Process poll request. Update poll data. + * Returns 0 on success, <0 else */ +int +usocklnd_process_pollrequest(usock_pollrequest_t *pr, + usock_pollthread_t *pt_data) +{ + int type = pr->upr_type; + short value = pr->upr_value; + usock_conn_t *conn = pr->upr_conn; + int idx = 0; + struct pollfd *pollfd = pt_data->upt_pollfd; + int *fd2idx = pt_data->upt_fd2idx; + usock_conn_t **idx2conn = pt_data->upt_idx2conn; + int *skip = pt_data->upt_skip; + + LASSERT(conn != NULL); + LASSERT(conn->uc_fd >=0); + LASSERT(type == POLL_ADD_REQUEST || + conn->uc_fd < pt_data->upt_nfd2idx); + + if (type != POLL_ADD_REQUEST) { + idx = fd2idx[conn->uc_fd]; + if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */ + LASSERT(pollfd[idx].fd == conn->uc_fd); + } else { /* unlikely */ + CWARN("Very unlikely event happend: trying to" + " handle poll request of type %d but idx=%d" + " is out of range [1 ... %d]. Is shutdown" + " in progress (%d)?\n", + type, idx, pt_data->upt_nfds - 1, + usock_data.ud_shutdown); + + LIBCFS_FREE (pr, sizeof(*pr)); + usocklnd_conn_decref(conn); + return 0; + } + } + + LIBCFS_FREE (pr, sizeof(*pr)); + + switch (type) { + case POLL_ADD_REQUEST: + if (pt_data->upt_nfds >= pt_data->upt_npollfd) { + /* resize pollfd[], idx2conn[] and skip[] */ + struct pollfd *new_pollfd; + int new_npollfd = pt_data->upt_npollfd * 2; + usock_conn_t **new_idx2conn; + int *new_skip; + + new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd * + sizeof(struct pollfd)); + if (new_pollfd == NULL) + goto process_pollrequest_enomem; + pt_data->upt_pollfd = pollfd = new_pollfd; + + new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd * + sizeof(usock_conn_t *)); + if (new_idx2conn == NULL) + goto process_pollrequest_enomem; + pt_data->upt_idx2conn = idx2conn = new_idx2conn; + + new_skip = LIBCFS_REALLOC(skip, new_npollfd * + sizeof(int)); + if (new_skip == NULL) + goto process_pollrequest_enomem; + pt_data->upt_skip = new_skip; + + pt_data->upt_npollfd = new_npollfd; + } + + if (conn->uc_fd >= pt_data->upt_nfd2idx) { + /* resize fd2idx[] */ + int *new_fd2idx; + int new_nfd2idx = pt_data->upt_nfd2idx * 2; + + while (new_nfd2idx <= conn->uc_fd) + new_nfd2idx *= 2; + + new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx * + sizeof(int)); + if (new_fd2idx == NULL) + goto process_pollrequest_enomem; + + pt_data->upt_fd2idx = fd2idx = new_fd2idx; + memset(fd2idx + pt_data->upt_nfd2idx, 0, + (new_nfd2idx - pt_data->upt_nfd2idx) + * sizeof(int)); + pt_data->upt_nfd2idx = new_nfd2idx; + } + + LASSERT(fd2idx[conn->uc_fd] == 0); + + idx = pt_data->upt_nfds++; + idx2conn[idx] = conn; + fd2idx[conn->uc_fd] = idx; + + pollfd[idx].fd = conn->uc_fd; + pollfd[idx].events = value; + pollfd[idx].revents = 0; + break; + case POLL_DEL_REQUEST: + fd2idx[conn->uc_fd] = 0; /* invalidate this entry */ + + --pt_data->upt_nfds; + if (idx != pt_data->upt_nfds) { + /* shift last entry into released position */ + memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds], + sizeof(struct pollfd)); + idx2conn[idx] = idx2conn[pt_data->upt_nfds]; + fd2idx[pollfd[idx].fd] = idx; + } + + close(conn->uc_fd); + list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list); + break; + case POLL_RX_SET_REQUEST: + pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value; + break; + case POLL_TX_SET_REQUEST: + pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value; + break; + case POLL_SET_REQUEST: + pollfd[idx].events = value; + break; + default: + LBUG(); /* unknown type */ + } + + /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the + * reference that poll request possesses */ + if (type != POLL_ADD_REQUEST) + usocklnd_conn_decref(conn); + + return 0; + + process_pollrequest_enomem: + usocklnd_conn_decref(conn); + return -ENOMEM; +} + +/* Loop on poll data executing handlers repeatedly until + * fair_limit is reached or all entries are exhausted */ +void +usocklnd_execute_handlers(usock_pollthread_t *pt_data) +{ + struct pollfd *pollfd = pt_data->upt_pollfd; + int nfds = pt_data->upt_nfds; + usock_conn_t **idx2conn = pt_data->upt_idx2conn; + int *skip = pt_data->upt_skip; + int j; + + if (pollfd[0].revents & POLLIN) + while (usocklnd_notifier_handler(pollfd[0].fd) > 0) + ; + + skip[0] = 1; /* always skip notifier fd */ + + for (j = 0; j < usock_tuns.ut_fair_limit; j++) { + int prev = 0; + int i = skip[0]; + + if (i >= nfds) /* nothing ready */ + break; + + do { + usock_conn_t *conn = idx2conn[i]; + int next; + + if (j == 0) /* first pass... */ + next = skip[i] = i+1; /* set skip chain */ + else /* later passes... */ + next = skip[i]; /* skip unready pollfds */ + + /* kill connection if it's closed by peer and + * there is no data pending for reading */ + if ((pollfd[i].revents & POLLERR) != 0 || + (pollfd[i].revents & POLLHUP) != 0) { + if ((pollfd[i].events & POLLIN) != 0 && + (pollfd[i].revents & POLLIN) == 0) + usocklnd_conn_kill(conn); + else + usocklnd_exception_handler(conn); + } + + if ((pollfd[i].revents & POLLIN) != 0 && + usocklnd_read_handler(conn) <= 0) + pollfd[i].revents &= ~POLLIN; + + if ((pollfd[i].revents & POLLOUT) != 0 && + usocklnd_write_handler(conn) <= 0) + pollfd[i].revents &= ~POLLOUT; + + if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0) + skip[prev] = next; /* skip this entry next pass */ + else + prev = i; + + i = next; + } while (i < nfds); + } +} + +int +usocklnd_calculate_chunk_size(int num) +{ + const int n = 4; + const int p = usock_tuns.ut_poll_timeout; + int chunk = num; + + /* chunk should be big enough to detect a timeout on any + * connection within (n+1)/n times the timeout interval + * if we checks every 'p' seconds 'chunk' conns */ + + if (usock_tuns.ut_timeout > n * p) + chunk = (chunk * n * p) / usock_tuns.ut_timeout; + + if (chunk == 0) + chunk = 1; + + return chunk; +} + +void +usocklnd_wakeup_pollthread(int i) +{ + usock_pollthread_t *pt = &usock_data.ud_pollthreads[i]; + int notification = 0; + int rc; + + rc = syscall(SYS_write, pt->upt_notifier_fd, ¬ification, + sizeof(notification)); + + if (rc != sizeof(notification)) + CERROR("Very unlikely event happend: " + "cannot write to notifier fd (rc=%d; errno=%d)\n", + rc, errno); +} diff --git a/lnet/ulnds/socklnd/pqtimer.c b/lnet/ulnds/socklnd/pqtimer.c deleted file mode 100644 index 98c48eb..0000000 --- a/lnet/ulnds/socklnd/pqtimer.c +++ /dev/null @@ -1,226 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2002 Eric Hoffman - * - * This file is part of Lustre, http://www.lustre.org. - * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. - * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -/* timer.c: - * this file implements a simple priority-queue based timer system. when - * combined with a file which implements now() and block(), it can - * be used to provide course-grained time-based callbacks. - */ - -#include -#include -#include - -struct timer { - void (*function)(void *); - void *arg; - when w; - int interval; - int disable; -}; - -typedef struct thunk *thunk; -struct thunk { - void (*f)(void *); - void *a; - thunk next; -}; - -extern when now(void); - -static thunk thunks; -static int internal; -static void (*block_function)(when); -static int number_of_timers; -static int size_of_pqueue; -static timer *timers; - - -static void heal(int where) -{ - int left=(where<<1); - int right=(where<<1)+1; - int min=where; - timer temp; - - if (left <= number_of_timers) - if (timers[left]->w < timers[min]->w) min=left; - if (right <= number_of_timers) - if (timers[right]->w < timers[min]->w) min=right; - if (min != where){ - temp=timers[where]; - timers[where]=timers[min]; - timers[min]=temp; - heal(min); - } -} - -static void add_pqueue(int i) -{ - timer temp; - int parent=(i>>1); - if ((i>1) && (timers[i]->w< timers[parent]->w)){ - temp=timers[i]; - timers[i]=timers[parent]; - timers[parent]=temp; - add_pqueue(parent); - } -} - -static void add_timer(timer t) -{ - if (size_of_pqueue<(number_of_timers+2)){ - int oldsize=size_of_pqueue; - timer *new=(void *)malloc(sizeof(struct timer)*(size_of_pqueue+=10)); - memcpy(new,timers,sizeof(timer)*oldsize); - timers=new; - } - timers[++number_of_timers]=t; - add_pqueue(number_of_timers); -} - -/* Function: register_timer - * Arguments: interval: the time interval from the current time when - * the timer function should be called - * function: the function to call when the time has expired - * argument: the argument to call it with. - * Returns: a pointer to a timer structure - */ -timer register_timer(when interval, - void (*function)(void *), - void *argument) -{ - timer t=(timer)malloc(sizeof(struct timer)); - - t->arg=argument; - t->function=function; - t->interval=interval; - t->disable=0; - t->w=now()+interval; - add_timer(t); - if (!internal && (number_of_timers==1)) - block_function(t->w); - return(t); -} - -/* Function: remove_timer - * Arguments: t: - * Returns: nothing - * - * remove_timer removes a timer from the system, insuring - * that it will never be called. It does not actually - * free the timer due to reentrancy issues. - */ - -void remove_timer(timer t) -{ - t->disable=1; -} - - - -void timer_fire() -{ - timer current; - - current=timers[1]; - timers[1]=timers[number_of_timers--]; - heal(1); - if (!current->disable) { - (*current->function)(current->arg); - } - free(current); -} - -when next_timer(void) -{ - when here=now(); - - while (number_of_timers && (timers[1]->w <= here)) timer_fire(); - if (number_of_timers) return(timers[1]->w); - return(0); -} - -/* Function: timer_loop - * Arguments: none - * Returns: never - * - * timer_loop() is the blocking dispatch function for the timer. - * Is calls the block() function registered with init_timer, - * and handles associated with timers that have been registered. - */ -void timer_loop() -{ - when here; - - while (1){ - thunk z; - here=now(); - - for (z=thunks;z;z=z->next) (*z->f)(z->a); - - if (number_of_timers){ - if (timers[1]->w > here){ - (*block_function)(timers[1]->w); - } else { - timer_fire(); - } - } else { - thunk z; - for (z=thunks;z;z=z->next) (*z->f)(z->a); - (*block_function)(0); - } - } -} - - -/* Function: register_thunk - * Arguments: f: the function to call - * a: the single argument to call it with - * - * Thunk functions get called at irregular intervals, they - * should not assume when, or take a particularily long - * amount of time. Thunks are for background cleanup tasks. - */ -void register_thunk(void (*f)(void *),void *a) -{ - thunk t=(void *)malloc(sizeof(struct thunk)); - t->f=f; - t->a=a; - t->next=thunks; - thunks=t; -} - -/* Function: initialize_timer - * Arguments: block: the function to call to block for the specified interval - * - * initialize_timer() must be called before any other timer function, - * including timer_loop. - */ -void initialize_timer(void (*block)(when)) -{ - block_function=block; - number_of_timers=0; - size_of_pqueue=10; - timers=(timer *)malloc(sizeof(timer)*size_of_pqueue); - thunks=0; -} diff --git a/lnet/ulnds/socklnd/pqtimer.h b/lnet/ulnds/socklnd/pqtimer.h deleted file mode 100644 index 11efb0e..0000000 --- a/lnet/ulnds/socklnd/pqtimer.h +++ /dev/null @@ -1,25 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2002 Eric Hoffman - * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ - */ - -typedef unsigned long long when; -when now(void); -typedef struct timer *timer; -timer register_timer(when interval, - void (*function)(void *), - void *argument); -timer register_timer_wait(void); -void remove_timer(timer); -void timer_loop(void); -void initialize_timer(void (*block)(when)); -void timer_fire(void); - - -#define HZ 0x100000000ull - - diff --git a/lnet/ulnds/socklnd/procapi.c b/lnet/ulnds/socklnd/procapi.c deleted file mode 100644 index 5fd5f46..0000000 --- a/lnet/ulnds/socklnd/procapi.c +++ /dev/null @@ -1,198 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2003 Cluster File Systems, Inc. - * - * This file is part of Lustre, http://www.lustre.org. - * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. - * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -/* api.c: - * This file provides the 'api' side for the process-based nals. - * it is responsible for creating the 'library' side thread, - * and passing wrapped portals transactions to it. - * - * Along with initialization, shutdown, and transport to the library - * side, this file contains some stubs to satisfy the nal definition. - */ -#include -#include -#include -#include -#ifndef __CYGWIN__ -# include -#endif -#include -#include -#include -#include -#include -#include -#include -#ifdef HAVE_GETHOSTBYNAME -# include -#endif - -#if !HAVE_LIBPTHREAD -# error "This LND requires a multi-threaded runtime" -#endif - -/* XXX CFS workaround, to give a chance to let nal thread wake up - * from waiting in select - */ -static int procbridge_notifier_handler(void *arg) -{ - static char buf[8]; - procbridge p = (procbridge) arg; - - syscall(SYS_read, p->notifier[1], buf, sizeof(buf)); - return 1; -} - -void procbridge_wakeup_nal(procbridge p) -{ - static char buf[8]; - syscall(SYS_write, p->notifier[0], buf, sizeof(buf)); -} - -lnd_t the_tcplnd = { - .lnd_type = SOCKLND, - .lnd_startup = procbridge_startup, - .lnd_shutdown = procbridge_shutdown, - .lnd_send = tcpnal_send, - .lnd_recv = tcpnal_recv, - .lnd_notify = tcpnal_notify, -}; -int tcpnal_running; - -/* Function: shutdown - * Arguments: ni: the instance of me - * - * cleanup nal state, reclaim the lower side thread and - * its state using PTL_FINI codepoint - */ -void -procbridge_shutdown(lnet_ni_t *ni) -{ - bridge b=(bridge)ni->ni_data; - procbridge p=(procbridge)b->local; - - p->nal_flags |= NAL_FLAG_STOPPING; - procbridge_wakeup_nal(p); - - do { - pthread_mutex_lock(&p->mutex); - if (p->nal_flags & NAL_FLAG_STOPPED) { - pthread_mutex_unlock(&p->mutex); - break; - } - pthread_cond_wait(&p->cond, &p->mutex); - pthread_mutex_unlock(&p->mutex); - } while (1); - - free(p); - tcpnal_running = 0; -} - -#ifdef ENABLE_SELECT_DISPATCH -procbridge __global_procbridge = NULL; -#endif - -/* Function: procbridge_startup - * - * Arguments: ni: the instance of me - * interfaces: ignored - * - * Returns: portals rc - * - * initializes the tcp nal. we define unix_failure as an - * error wrapper to cut down clutter. - */ -int -procbridge_startup (lnet_ni_t *ni) -{ - procbridge p; - bridge b; - int rc; - - /* NB The local NID is not assigned. We only ever connect to the socknal, - * which assigns the src nid/pid on incoming non-privileged connections - * (i.e. us), and we don't accept connections. */ - - LASSERT (ni->ni_lnd == &the_tcplnd); - LASSERT (!tcpnal_running); /* only single instance supported */ - LASSERT (ni->ni_interfaces[0] == NULL); /* explicit interface(s) not supported */ - - /* The credit settings here are pretty irrelevent. Userspace tcplnd has no - * tx descriptor pool to exhaust and does a blocking send; that's the real - * limit on send concurrency. */ - ni->ni_maxtxcredits = 1000; - ni->ni_peertxcredits = 1000; - - init_unix_timer(); - - b=(bridge)malloc(sizeof(struct bridge)); - p=(procbridge)malloc(sizeof(struct procbridge)); - b->local=p; - b->b_ni = ni; - ni->ni_data = b; - - /* init procbridge */ - pthread_mutex_init(&p->mutex,0); - pthread_cond_init(&p->cond, 0); - p->nal_flags = 0; - - /* initialize notifier */ - if (socketpair(AF_UNIX, SOCK_STREAM, 0, p->notifier)) { - perror("socketpair failed"); - rc = -errno; - return rc; - } - - if (!register_io_handler(p->notifier[1], READ_HANDLER, - procbridge_notifier_handler, p)) { - perror("fail to register notifier handler"); - return -ENOMEM; - } - -#ifdef ENABLE_SELECT_DISPATCH - __global_procbridge = p; -#endif - - /* create nal thread */ - rc = pthread_create(&p->t, NULL, nal_thread, b); - if (rc != 0) { - perror("nal_init: pthread_create"); - return -ESRCH; - } - - do { - pthread_mutex_lock(&p->mutex); - if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) { - pthread_mutex_unlock(&p->mutex); - break; - } - pthread_cond_wait(&p->cond, &p->mutex); - pthread_mutex_unlock(&p->mutex); - } while (1); - - if (p->nal_flags & NAL_FLAG_STOPPED) - return -ENETDOWN; - - tcpnal_running = 1; - - return 0; -} diff --git a/lnet/ulnds/socklnd/procbridge.h b/lnet/ulnds/socklnd/procbridge.h deleted file mode 100644 index 2dd534b..0000000 --- a/lnet/ulnds/socklnd/procbridge.h +++ /dev/null @@ -1,58 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2003 Cluster File Systems, Inc. - * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ - */ - -#ifndef _PROCBRIDGE_H_ -#define _PROCBRIDGE_H_ - -#include -#include - - -#define NAL_FLAG_RUNNING 1 -#define NAL_FLAG_STOPPING 2 -#define NAL_FLAG_STOPPED 4 - -typedef struct procbridge { - /* sync between user threads and nal thread */ - pthread_t t; - pthread_cond_t cond; - pthread_mutex_t mutex; - - /* socket pair used to notify nal thread */ - int notifier[2]; - - int nal_flags; - -} *procbridge; - -typedef struct nal_init_args { - lnet_pid_t nia_requested_pid; - bridge nia_bridge; -} nal_init_args_t; - -extern void *nal_thread(void *); - -extern void procbridge_wakeup_nal(procbridge p); - -extern int procbridge_startup (lnet_ni_t *); -extern void procbridge_shutdown (lnet_ni_t *); - -extern void tcpnal_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive); - -extern int tcpnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg); -int tcpnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *cookie, - int delayed, unsigned int niov, - struct iovec *iov, lnet_kiov_t *kiov, - unsigned int offset, unsigned int mlen, unsigned int rlen); -extern int tcpnal_set_global_params(); - - - - -#endif diff --git a/lnet/ulnds/socklnd/proclib.c b/lnet/ulnds/socklnd/proclib.c deleted file mode 100644 index 01faf05..0000000 --- a/lnet/ulnds/socklnd/proclib.c +++ /dev/null @@ -1,108 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2003 Cluster File Systems, Inc. - * - * This file is part of Lustre, http://www.lustre.org. - * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. - * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -/* lib.c: - * This file provides the 'library' side for the process-based nals. - * it is responsible for communication with the 'api' side and - * providing service to the generic portals 'library' - * implementation. 'library' might be better termed 'communication' - * or 'kernel'. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -/* the following functions are stubs to satisfy the nal definition - without doing anything particularily useful*/ -extern int tcpnal_init(bridge); -extern void tcpnal_shutdown(bridge); - -static void check_stopping(void *z) -{ - bridge b = z; - procbridge p = b->local; - - if ((p->nal_flags & NAL_FLAG_STOPPING) == 0) - return; - - tcpnal_shutdown(b); - - pthread_mutex_lock(&p->mutex); - p->nal_flags |= NAL_FLAG_STOPPED; - pthread_cond_broadcast(&p->cond); - pthread_mutex_unlock(&p->mutex); - - pthread_exit(0); -} - - -/* Function: nal_thread - * Arguments: z: an opaque reference to a nal control structure - * allocated and partially populated by the api level code - * Returns: nothing, and only on error or explicit shutdown - * - * This function is the entry point of the pthread initiated on - * the api side of the interface. This thread is used to handle - * asynchronous delivery to the application. - * - * We define a limit macro to place a ceiling on limits - * for syntactic convenience - */ - -void *nal_thread(void *z) -{ - bridge b = (bridge) z; - procbridge p=b->local; - int rc; - - rc = tcpnal_init(b); - - /* - * Whatever the initialization returned is passed back to the - * user level code for further interpretation. We just exit if - * it is non-zero since something went wrong. - */ - - pthread_mutex_lock(&p->mutex); - p->nal_flags |= (rc != 0) ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING; - pthread_cond_broadcast(&p->cond); - pthread_mutex_unlock(&p->mutex); - - if (rc == 0) { - /* the thunk function is called each time the timer loop - performs an operation and returns to blocking mode. we - overload this function to inform the api side that - it may be interested in looking at the event queue */ - register_thunk(check_stopping,b); - timer_loop(); - } - return(0); -} diff --git a/lnet/ulnds/socklnd/select.c b/lnet/ulnds/socklnd/select.c deleted file mode 100644 index 9a88146..0000000 --- a/lnet/ulnds/socklnd/select.c +++ /dev/null @@ -1,420 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2002 Eric Hoffman - * - * This file is part of Lustre, http://www.lustre.org. - * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. - * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -/* select.c: - * Provides a general mechanism for registering and dispatching - * io events through the select system call. - */ - -#define DEBUG_SUBSYSTEM S_LND - -#ifdef sun -#include -#else -#include -#endif - -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -static struct timeval beginning_of_epoch; -static io_handler io_handlers; - -/* Function: now - * - * Return: the current time in canonical units: a 64 bit number - * where the most significant 32 bits contains the number - * of seconds, and the least signficant a count of (1/(2^32))ths - * of a second. - */ -when now() -{ - struct timeval result; - - gettimeofday(&result,0); - return((((unsigned long long)result.tv_sec)<<32)| - (((unsigned long long)result.tv_usec)<<32)/1000000); -} - - -/* Function: register_io_handler - * Arguments: fd: the file descriptor of interest - * type: a mask of READ_HANDLER, WRITE_HANDLER, EXCEPTION_HANDLER - * function: a function to call when io is available on fd - * arg: an opaque correlator to return to the handler - * Returns: a pointer to the io_handler structure - */ -io_handler register_io_handler(int fd, - int type, - int (*function)(void *), - void *arg) -{ - io_handler i=(io_handler)malloc(sizeof(struct io_handler)); - if ((i->fd=fd)>=0){ - i->type=type; - i->function=function; - i->argument=arg; - i->disabled=0; - i->last=&io_handlers; - if ((i->next=io_handlers)) i->next->last=&i->next; - io_handlers=i; - } - return(i); -} - -/* Function: remove_io_handler - * Arguments: i: a pointer to the handler to stop servicing - * - * remove_io_handler() doesn't actually free the handler, due - * to reentrancy problems. it just marks the handler for - * later cleanup by the blocking function. - */ -void remove_io_handler (io_handler i) -{ - i->disabled=1; -} - -static void set_flag(io_handler n,fd_set *r, fd_set *w, fd_set *e) -{ - if (n->type & READ_HANDLER) FD_SET(n->fd, r); - if (n->type & WRITE_HANDLER) FD_SET(n->fd, w); - if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, e); -} - -static int prepare_fd_sets(fd_set *r, fd_set *w, fd_set *e) -{ - io_handler j; - io_handler *k; - int max = 0; - - FD_ZERO(r); - FD_ZERO(w); - FD_ZERO(e); - for (k=&io_handlers;*k;){ - if ((*k)->disabled){ - j=*k; - *k=(*k)->next; - free(j); - } - if (*k) { - set_flag(*k,r,w,e); - if ((*k)->fd > max) - max = (*k)->fd; - k=&(*k)->next; - } - } - return max + 1; -} - -static int execute_callbacks(fd_set *r, fd_set *w, fd_set *e) -{ - io_handler j; - int n = 0, t; - - for (j = io_handlers; j; j = j->next) { - if (j->disabled) - continue; - - t = 0; - if (FD_ISSET(j->fd, r) && (j->type & READ_HANDLER)) { - FD_CLR(j->fd, r); - t++; - } - if (FD_ISSET(j->fd, w) && (j->type & WRITE_HANDLER)) { - FD_CLR(j->fd, w); - t++; - } - if (FD_ISSET(j->fd, e) && (j->type & EXCEPTION_HANDLER)) { - FD_CLR(j->fd, e); - t++; - } - if (t == 0) - continue; - - if (!(*j->function)(j->argument)) - j->disabled = 1; - - n += t; - } - - return n; -} - -#ifdef ENABLE_SELECT_DISPATCH - -static struct { - pthread_mutex_t mutex; - pthread_cond_t cond; - int submitted; - int nready; - int maxfd; - fd_set *rset; - fd_set *wset; - fd_set *eset; - struct timeval *timeout; - struct timeval submit_time; -} fd_extra = { - PTHREAD_MUTEX_INITIALIZER, - PTHREAD_COND_INITIALIZER, - 0, 0, 0, - NULL, NULL, NULL, NULL, -}; - -extern int liblustre_wait_event(int timeout); -extern procbridge __global_procbridge; - -/* - * this will intercept syscall select() of user apps - * such as MPI libs. - */ -int select(int n, fd_set *rset, fd_set *wset, fd_set *eset, - struct timeval *timeout) -{ - LASSERT(fd_extra.submitted == 0); - - fd_extra.nready = 0; - fd_extra.maxfd = n; - fd_extra.rset = rset; - fd_extra.wset = wset; - fd_extra.eset = eset; - fd_extra.timeout = timeout; - - liblustre_wait_event(0); - pthread_mutex_lock(&fd_extra.mutex); - gettimeofday(&fd_extra.submit_time, NULL); - fd_extra.submitted = 1; - LASSERT(__global_procbridge); - procbridge_wakeup_nal(__global_procbridge); - -again: - if (fd_extra.submitted) - pthread_cond_wait(&fd_extra.cond, &fd_extra.mutex); - pthread_mutex_unlock(&fd_extra.mutex); - - liblustre_wait_event(0); - - pthread_mutex_lock(&fd_extra.mutex); - if (fd_extra.submitted) - goto again; - pthread_mutex_unlock(&fd_extra.mutex); - - LASSERT(fd_extra.nready >= 0); - LASSERT(fd_extra.submitted == 0); - return fd_extra.nready; -} - -static int merge_fds(int max, fd_set *rset, fd_set *wset, fd_set *eset) -{ - int i; - - LASSERT(rset); - LASSERT(wset); - LASSERT(eset); - - for (i = 0; i < __FD_SETSIZE/__NFDBITS; i++) { - LASSERT(!fd_extra.rset || - !(__FDS_BITS(rset)[i] & __FDS_BITS(fd_extra.rset)[i])); - LASSERT(!fd_extra.wset || - !(__FDS_BITS(wset)[i] & __FDS_BITS(fd_extra.wset)[i])); - LASSERT(!fd_extra.eset || - !(__FDS_BITS(eset)[i] & __FDS_BITS(fd_extra.eset)[i])); - - if (fd_extra.rset && __FDS_BITS(fd_extra.rset)[i]) - __FDS_BITS(rset)[i] |= __FDS_BITS(fd_extra.rset)[i]; - if (fd_extra.wset && __FDS_BITS(fd_extra.wset)[i]) - __FDS_BITS(wset)[i] |= __FDS_BITS(fd_extra.wset)[i]; - if (fd_extra.eset && __FDS_BITS(fd_extra.eset)[i]) - __FDS_BITS(eset)[i] |= __FDS_BITS(fd_extra.eset)[i]; - } - - return (fd_extra.maxfd > max ? fd_extra.maxfd : max); -} - -static inline -int timeval_ge(struct timeval *tv1, struct timeval *tv2) -{ - LASSERT(tv1 && tv2); - return ((tv1->tv_sec - tv2->tv_sec) * 1000000 + - (tv1->tv_usec - tv2->tv_usec) >= 0); -} - -/* - * choose the most recent timeout value - */ -static struct timeval *choose_timeout(struct timeval *tv1, - struct timeval *tv2) -{ - if (!tv1) - return tv2; - else if (!tv2) - return tv1; - - if (timeval_ge(tv1, tv2)) - return tv2; - else - return tv1; -} - -/* Function: select_timer_block - * Arguments: until: an absolute time when the select should return - * - * This function dispatches the various file descriptors' handler - * functions, if the kernel indicates there is io available. - */ -void select_timer_block(when until) -{ - fd_set fds[3]; - struct timeval timeout; - struct timeval *timeout_pointer, *select_timeout; - int max, nready, nexec; - int fd_handling; - -again: - if (until) { - when interval; - - interval = until - now(); - timeout.tv_sec = (interval >> 32); - timeout.tv_usec = ((interval << 32) / 1000000) >> 32; - timeout_pointer = &timeout; - } else - timeout_pointer = NULL; - - fd_handling = 0; - max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]); - select_timeout = timeout_pointer; - - pthread_mutex_lock(&fd_extra.mutex); - fd_handling = fd_extra.submitted; - pthread_mutex_unlock(&fd_extra.mutex); - if (fd_handling) { - max = merge_fds(max, &fds[0], &fds[1], &fds[2]); - select_timeout = choose_timeout(timeout_pointer, fd_extra.timeout); - } - - /* XXX only compile for linux */ -#if (__WORDSIZE == 64) && !defined(__mips64__) - nready = syscall(SYS_select, max, &fds[0], &fds[1], &fds[2], - select_timeout); -#else - nready = syscall(SYS__newselect, max, &fds[0], &fds[1], &fds[2], - select_timeout); -#endif - if (nready < 0) { - CERROR("select return err %d, errno %d\n", nready, errno); - return; - } - - if (nready) { - nexec = execute_callbacks(&fds[0], &fds[1], &fds[2]); - nready -= nexec; - } else - nexec = 0; - - /* even both nready & nexec are 0, we still need try to wakeup - * upper thread since it may have timed out - */ - if (fd_handling) { - LASSERT(nready >= 0); - - pthread_mutex_lock(&fd_extra.mutex); - if (nready) { - if (fd_extra.rset) - *fd_extra.rset = fds[0]; - if (fd_extra.wset) - *fd_extra.wset = fds[1]; - if (fd_extra.eset) - *fd_extra.eset = fds[2]; - fd_extra.nready = nready; - fd_extra.submitted = 0; - } else { - struct timeval t; - - fd_extra.nready = 0; - if (fd_extra.timeout) { - gettimeofday(&t, NULL); - if (timeval_ge(&t, &fd_extra.submit_time)) - fd_extra.submitted = 0; - } - } - - pthread_cond_signal(&fd_extra.cond); - pthread_mutex_unlock(&fd_extra.mutex); - } - - /* haven't found portals event, go back to loop if time - * is not expired */ - if (!nexec) { - if (timeout_pointer == NULL || now() >= until) - goto again; - } -} - -#else /* !ENABLE_SELECT_DISPATCH */ - -/* Function: select_timer_block - * Arguments: until: an absolute time when the select should return - * - * This function dispatches the various file descriptors' handler - * functions, if the kernel indicates there is io available. - */ -void select_timer_block(when until) -{ - fd_set fds[3]; - struct timeval timeout; - struct timeval *timeout_pointer; - int max, nready; - - if (until) { - when interval; - interval = until - now(); - timeout.tv_sec = (interval >> 32); - timeout.tv_usec = ((interval << 32) / 1000000) >> 32; - timeout_pointer = &timeout; - } else - timeout_pointer = NULL; - - max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]); - - nready = select(max, &fds[0], &fds[1], &fds[2], timeout_pointer); - if (nready > 0) - execute_callbacks(&fds[0], &fds[1], &fds[2]); -} -#endif /* ENABLE_SELECT_DISPATCH */ - -/* Function: init_unix_timer() - * is called to initialize the library - */ -void init_unix_timer() -{ - io_handlers=0; - gettimeofday(&beginning_of_epoch, 0); - initialize_timer(select_timer_block); -} diff --git a/lnet/ulnds/socklnd/table.c b/lnet/ulnds/socklnd/table.c deleted file mode 100644 index 986c5ce..0000000 --- a/lnet/ulnds/socklnd/table.c +++ /dev/null @@ -1,267 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2002 Eric Hoffman - * - * This file is part of Lustre, http://www.lustre.org. - * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. - * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include -#include -#include - - -/* table.c: - * a very simple hash table implementation with paramerterizable - * comparison and key generation functions. it does resize - * in order to accomidate more entries, but never collapses - * the table - */ - -static table_entry *table_lookup (table t,void *comparator, - unsigned int k, - int (*compare_function)(void *, void *), - int *success) -{ - unsigned int key=k%t->size; - table_entry *i; - - for (i=&(t->entries[key]);*i;i=&((*i)->next)){ - if (compare_function && ((*i)->key==k)) - if ((*t->compare_function)((*i)->value,comparator)){ - *success=1; - return(i); - } - } - *success=0; - return(&(t->entries[key])); -} - - -static void resize_table(table t, int size) -{ - int old_size=t->size; - table_entry *old_entries=t->entries; - int i; - table_entry j,n; - table_entry *position; - int success; - - t->size=size; - t->entries=(table_entry *)malloc(sizeof(table_entry)*t->size); - memset(t->entries,0,sizeof(table_entry)*t->size); - - for (i=0;inext; - position=table_lookup(t,0,j->key,0,&success); - j->next= *position; - *position=j; - } - free(old_entries); -} - - -/* Function: key_from_int - * Arguments: int i: value to compute the key of - * Returns: the key - */ -unsigned int key_from_int(int i) -{ - return(i); -} - - -/* Function: key_from_string - * Arguments: char *s: the null terminated string - * to compute the key of - * Returns: the key - */ -unsigned int key_from_string(char *s) -{ - unsigned int result=0, i; - unsigned char *n; - - if (!s) - return(1); - for (n = (unsigned char *)s, i = 0; *n; n++, i++) - result ^= (*n * 57) ^ *n * i; - - return(result); -} - - -/* Function: hash_create_table - * Arguments: compare_function: a function to compare - * a table instance with a correlator - * key_function: a function to generate a 32 bit - * hash key from a correlator - * Returns: a pointer to the new table - */ -table hash_create_table (int (*compare_function)(void *, void *), - unsigned int (*key_function)(void *)) -{ - table new=(table)malloc(sizeof(struct table)); - memset(new, 0, sizeof(struct table)); - - new->compare_function=compare_function; - new->key_function=key_function; - new->number_of_entries=0; - new->size=4; - new->entries=(table_entry *)malloc(sizeof(table_entry)*new->size); - memset(new->entries,0,sizeof(table_entry)*new->size); - return(new); -} - - -/* Function: hash_table_find - * Arguments: t: a table to look in - * comparator: a value to access the table entry - * Returns: the element references to by comparator, or null - */ -void *hash_table_find (table t, void *comparator) -{ - int success; - table_entry* entry=table_lookup(t,comparator, - (*t->key_function)(comparator), - t->compare_function, - &success); - if (success) return((*entry)->value); - return(0); -} - - -/* Function: hash_table_insert - * Arguments: t: a table to insert the object - * value: the object to put in the table - * comparator: the value by which the object - * will be addressed - * Returns: nothing - */ -void hash_table_insert (table t, void *value, void *comparator) -{ - int success; - unsigned int k=(*t->key_function)(comparator); - table_entry *position=table_lookup(t,comparator,k, - t->compare_function,&success); - table_entry entry; - - if (success) { - entry = *position; - } else { - entry = (table_entry)malloc(sizeof(struct table_entry)); - memset(entry, 0, sizeof(struct table_entry)); - entry->next= *position; - *position=entry; - t->number_of_entries++; - } - entry->value=value; - entry->key=k; - if (t->number_of_entries > t->size) resize_table(t,t->size*2); -} - -/* Function: hash_table_remove - * Arguments: t: the table to remove the object from - * comparator: the index value of the object to remove - * Returns: - */ -void hash_table_remove (table t, void *comparator) -{ - int success; - table_entry temp; - table_entry *position=table_lookup(t,comparator, - (*t->key_function)(comparator), - t->compare_function,&success); - if(success) { - temp=*position; - *position=(*position)->next; - free(temp); /* the value? */ - t->number_of_entries--; - } -} - -/* Function: hash_iterate_table_entries - * Arguments: t: the table to iterate over - * handler: a function to call with each element - * of the table, along with arg - * arg: the opaque object to pass to handler - * Returns: nothing - */ -void hash_iterate_table_entries(table t, - void (*handler)(void *,void *), - void *arg) -{ - int i; - table_entry *j,*next; - - for (i=0;isize;i++) - for (j=t->entries+i;*j;j=next){ - next=&((*j)->next); - (*handler)(arg,(*j)->value); - } -} - -/* Function: hash_filter_table_entries - * Arguments: t: the table to iterate over - * handler: a function to call with each element - * of the table, along with arg - * arg: the opaque object to pass to handler - * Returns: nothing - * Notes: operations on the table inside handler are not safe - * - * filter_table_entires() calls the handler function for each - * item in the table, passing it and arg. The handler function - * returns 1 if it is to be retained in the table, and 0 - * if it is to be removed. - */ -void hash_filter_table_entries(table t, int (*handler)(void *, void *), void *arg) -{ - int i; - table_entry *j,*next,v; - - for (i=0;isize;i++) - for (j=t->entries+i;*j;j=next){ - next=&((*j)->next); - if (!(*handler)(arg,(*j)->value)){ - next=j; - v=*j; - *j=(*j)->next; - free(v); - t->number_of_entries--; - } - } -} - -/* Function: destroy_table - * Arguments: t: the table to free - * thunk: a function to call with each element, - * most likely free() - * Returns: nothing - */ -void hash_destroy_table(table t,void (*thunk)(void *)) -{ - table_entry j,next; - int i; - for (i=0;isize;i++) - for (j=t->entries[i];j;j=next){ - next=j->next; - if (thunk) (*thunk)(j->value); - free(j); - } - free(t->entries); - free(t); -} diff --git a/lnet/ulnds/socklnd/table.h b/lnet/ulnds/socklnd/table.h deleted file mode 100644 index 0cb9669..0000000 --- a/lnet/ulnds/socklnd/table.h +++ /dev/null @@ -1,40 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2002 Eric Hoffman - * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ - */ - -#ifndef E_TABLE -#define E_TABLE - -typedef struct table_entry { - unsigned int key; - void *value; - struct table_entry *next; -} *table_entry; - - -typedef struct table { - unsigned int size; - int number_of_entries; - table_entry *entries; - int (*compare_function)(void *, void *); - unsigned int (*key_function)(void *); -} *table; - -/* table.c */ -unsigned int key_from_int(int i); -unsigned int key_from_string(char *s); -table hash_create_table(int (*compare_function)(void *, void *), - unsigned int (*key_function)(void *)); -void *hash_table_find(table t, void *comparator); -void hash_table_insert(table t, void *value, void *comparator); -void hash_table_remove(table t, void *comparator); -void hash_iterate_table_entries(table t, void (*handler)(void *, void *), void *arg); -void hash_filter_table_entries(table t, int (*handler)(void *, void *), void *arg); -void hash_destroy_table(table t, void (*thunk)(void *)); - -#endif diff --git a/lnet/ulnds/socklnd/tcplnd.c b/lnet/ulnds/socklnd/tcplnd.c deleted file mode 100644 index 010f972..0000000 --- a/lnet/ulnds/socklnd/tcplnd.c +++ /dev/null @@ -1,253 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2003 Cluster File Systems, Inc. - * - * This file is part of Lustre, http://www.lustre.org. - * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. - * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -/* tcpnal.c: - This file implements the TCP-based nal by providing glue - between the connection service and the generic NAL implementation */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#ifndef __CYGWIN__ -#include -#endif - -void -tcpnal_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive) -{ - bridge b = (bridge)ni->ni_data; - connection c; - - if (!alive) { - LBUG(); - } - - c = force_tcp_connection((manager)b->lower, nid, b->local); - if (c == NULL) - CERROR("Can't create connection to %s\n", - libcfs_nid2str(nid)); -} - -/* - * sends a packet to the peer, after insuring that a connection exists - */ -int tcpnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) -{ - lnet_hdr_t *hdr = &lntmsg->msg_hdr; - lnet_process_id_t target = lntmsg->msg_target; - unsigned int niov = lntmsg->msg_niov; - struct iovec *iov = lntmsg->msg_iov; - unsigned int offset = lntmsg->msg_offset; - unsigned int len = lntmsg->msg_len; - - connection c; - bridge b = (bridge)ni->ni_data; - struct iovec tiov[257]; - static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER; - int rc = 0; - int sysrc; - int total; - int ntiov; - int i; - - if (!(c = force_tcp_connection((manager)b->lower, target.nid, - b->local))) - return(-EIO); - - /* TODO: these results should be checked. furthermore, provision - must be made for the SIGPIPE which is delivered when - writing on a tcp socket which has closed underneath - the application. there is a linux flag in the sendmsg - call which turns off the signally behaviour, but its - nonstandard */ - - LASSERT (niov <= 256); - LASSERT (len == 0 || iov != NULL); /* I don't understand kiovs */ - - tiov[0].iov_base = hdr; - tiov[0].iov_len = sizeof(lnet_hdr_t); - ntiov = 1 + lnet_extract_iov(256, &tiov[1], niov, iov, offset, len); - - pthread_mutex_lock(&send_lock); -#if 1 - for (i = total = 0; i < ntiov; i++) - total += tiov[i].iov_len; - - sysrc = syscall(SYS_writev, c->fd, tiov, ntiov); - if (sysrc != total) { - fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n", - rc, total, errno); - rc = -errno; - } -#else - for (i = total = 0; i <= ntiov; i++) { - rc = send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0); - - if (rc != tiov[i].iov_len) { - fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n", - rc, tiov[i].iov_len, errno); - rc = -errno; - break; - } - total += rc; - } -#endif -#if 0 - fprintf (stderr, "sent %s total %d in %d frags\n", - hdr->type == LNET_MSG_ACK ? "ACK" : - hdr->type == LNET_MSG_PUT ? "PUT" : - hdr->type == LNET_MSG_GET ? "GET" : - hdr->type == LNET_MSG_REPLY ? "REPLY" : - hdr->type == LNET_MSG_HELLO ? "HELLO" : "UNKNOWN", - total, niov + 1); -#endif - pthread_mutex_unlock(&send_lock); - - if (rc == 0) { - /* NB the NAL only calls lnet_finalize() if it returns 0 - * from cb_send() */ - lnet_finalize(ni, lntmsg, 0); - } - - return(rc); -} - - -int tcpnal_recv(lnet_ni_t *ni, - void *private, - lnet_msg_t *cookie, - int delayed, - unsigned int niov, - struct iovec *iov, - lnet_kiov_t *kiov, - unsigned int offset, - unsigned int mlen, - unsigned int rlen) -{ - struct iovec tiov[256]; - int ntiov; - int i; - - if (mlen == 0) - goto finalize; - - LASSERT(iov != NULL); /* I don't understand kiovs */ - - ntiov = lnet_extract_iov(256, tiov, niov, iov, offset, mlen); - - /* FIXME - * 1. Is this effecient enough? change to use readv() directly? - * - MeiJia - */ - for (i = 0; i < ntiov; i++) - if (!read_connection(private, tiov[i].iov_base, tiov[i].iov_len)) - return -EIO; - - -finalize: - LASSERT(rlen >= mlen); - - if (mlen != rlen){ - int rc; - char *trash=malloc(rlen - mlen); - - if (!trash) - return -ENOMEM; - - rc = read_connection(private, trash, rlen - mlen); - free(trash); - if (!rc) - return -EIO; - } - - lnet_finalize(ni, cookie, 0); - return(0); -} - - -/* Function: from_connection: - * Arguments: c: the connection to read from - * Returns: whether or not to continue reading from this connection, - * expressed as a 1 to continue, and a 0 to not - * - * from_connection() is called from the select loop when i/o is - * available. It attempts to read the portals header and - * pass it to the generic library for processing. - */ -static int from_connection(void *a, void *d) -{ - connection c = d; - bridge b = a; - lnet_hdr_t hdr; - int rc; - - if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))) { - /* replace dest_nid,pid (socknal sets its own) */ - hdr.dest_nid = cpu_to_le64(b->b_ni->ni_nid); - hdr.dest_pid = cpu_to_le32(the_lnet.ln_pid); - - rc = lnet_parse(b->b_ni, &hdr, c->peer_nid, c, 0); - if (rc < 0) { - CERROR("Error %d from lnet_parse\n", rc); - return 0; - } - - return(1); - } - return(0); -} - - -void tcpnal_shutdown(bridge b) -{ - shutdown_connections(b->lower); -} - -/* Function: PTL_IFACE_TCP - * Arguments: pid_request: desired port number to bind to - * desired: passed NAL limits structure - * actual: returned NAL limits structure - * Returns: a nal structure on success, or null on failure - */ -int tcpnal_init(bridge b) -{ - manager m; - - tcpnal_set_global_params(); - - if (!(m = init_connections(from_connection, b))) { - /* TODO: this needs to shut down the newly created junk */ - return(-ENXIO); - } - b->lower = m; - return(0); -} diff --git a/lnet/ulnds/socklnd/timer.h b/lnet/ulnds/socklnd/timer.h deleted file mode 100644 index aaf39d2..0000000 --- a/lnet/ulnds/socklnd/timer.h +++ /dev/null @@ -1,30 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * Copyright (c) 2002 Cray Inc. - * Copyright (c) 2002 Eric Hoffman - * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ - */ - -/* TODO: make this an explicit type when they become available */ -typedef unsigned long long when; - -typedef struct timer { - void (*function)(void *); - void *arg; - when w; - int interval; - int disable; -} *timer; - -timer register_timer(when, void (*f)(void *), void *a); -void remove_timer(timer t); -void timer_loop(void); -void initialize_timer(void); -void register_thunk(void (*f)(void *),void *a); - - -#define HZ 0x100000000ull - - diff --git a/lnet/ulnds/socklnd/usocklnd.c b/lnet/ulnds/socklnd/usocklnd.c new file mode 100644 index 0000000..2e1ba9b --- /dev/null +++ b/lnet/ulnds/socklnd/usocklnd.c @@ -0,0 +1,546 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2007 Cluster File Systems, Inc. + * Author: Maxim Patlasov + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + */ + +#include "usocklnd.h" +#include + +lnd_t the_tcplnd = { + .lnd_type = SOCKLND, + .lnd_startup = usocklnd_startup, + .lnd_shutdown = usocklnd_shutdown, + .lnd_send = usocklnd_send, + .lnd_recv = usocklnd_recv, + .lnd_accept = usocklnd_accept, +}; + +usock_data_t usock_data; +usock_tunables_t usock_tuns = { + .ut_timeout = 50, + .ut_poll_timeout = 1, + .ut_fair_limit = 1, + .ut_npollthreads = 0, + .ut_min_bulk = 1<<10, + .ut_txcredits = 256, + .ut_peertxcredits = 8, + .ut_socknagle = 0, + .ut_sockbufsiz = 0, +}; + +#define MAX_REASONABLE_TIMEOUT 36000 /* 10 hours */ +#define MAX_REASONABLE_NPT 1000 + +int +usocklnd_validate_tunables() +{ + if (usock_tuns.ut_timeout <= 0 || + usock_tuns.ut_timeout > MAX_REASONABLE_TIMEOUT) { + CERROR("USOCK_TIMEOUT: %d is out of reasonable limits\n", + usock_tuns.ut_timeout); + return -1; + } + + if (usock_tuns.ut_poll_timeout <= 0 || + usock_tuns.ut_poll_timeout > MAX_REASONABLE_TIMEOUT) { + CERROR("USOCK_POLL_TIMEOUT: %d is out of reasonable limits\n", + usock_tuns.ut_poll_timeout); + return -1; + } + + if (usock_tuns.ut_fair_limit <= 0) { + CERROR("Invalid USOCK_FAIR_LIMIT: %d (should be >0)\n", + usock_tuns.ut_fair_limit); + return -1; + } + + if (usock_tuns.ut_npollthreads < 0 || + usock_tuns.ut_npollthreads > MAX_REASONABLE_NPT) { + CERROR("USOCK_NPOLLTHREADS: %d is out of reasonable limits\n", + usock_tuns.ut_npollthreads); + return -1; + } + + if (usock_tuns.ut_txcredits <= 0) { + CERROR("USOCK_TXCREDITS: %d should be positive\n", + usock_tuns.ut_txcredits); + return -1; + } + + if (usock_tuns.ut_peertxcredits <= 0) { + CERROR("USOCK_PEERTXCREDITS: %d should be positive\n", + usock_tuns.ut_peertxcredits); + return -1; + } + + if (usock_tuns.ut_peertxcredits > usock_tuns.ut_txcredits) { + CERROR("USOCK_PEERTXCREDITS: %d should not be greater" + " than USOCK_TXCREDITS: %d\n", + usock_tuns.ut_peertxcredits, usock_tuns.ut_txcredits); + return -1; + } + + if (usock_tuns.ut_socknagle != 0 && + usock_tuns.ut_socknagle != 1) { + CERROR("USOCK_SOCKNAGLE: %d should be 0 or 1\n", + usock_tuns.ut_socknagle); + return -1; + } + + if (usock_tuns.ut_sockbufsiz < 0) { + CERROR("USOCK_SOCKBUFSIZ: %d should be 0 or positive\n", + usock_tuns.ut_sockbufsiz); + return -1; + } + + return 0; +} + +void +usocklnd_release_poll_states(int n) +{ + int i; + + for (i = 0; i < n; i++) { + usock_pollthread_t *pt = &usock_data.ud_pollthreads[i]; + + close(pt->upt_notifier_fd); + close(pt->upt_pollfd[0].fd); + + pthread_mutex_destroy(&pt->upt_pollrequests_lock); + cfs_fini_completion(&pt->upt_completion); + + LIBCFS_FREE (pt->upt_pollfd, + sizeof(struct pollfd) * pt->upt_npollfd); + LIBCFS_FREE (pt->upt_idx2conn, + sizeof(usock_conn_t *) * pt->upt_npollfd); + LIBCFS_FREE (pt->upt_fd2idx, + sizeof(int) * pt->upt_nfd2idx); + } +} + +int +usocklnd_update_tunables() +{ + int rc; + + rc = cfs_parse_int_tunable(&usock_tuns.ut_timeout, + "USOCK_TIMEOUT"); + if (rc) + return rc; + + rc = cfs_parse_int_tunable(&usock_tuns.ut_poll_timeout, + "USOCK_POLL_TIMEOUT"); + if (rc) + return rc; + + rc = cfs_parse_int_tunable(&usock_tuns.ut_npollthreads, + "USOCK_NPOLLTHREADS"); + if (rc) + return rc; + + rc = cfs_parse_int_tunable(&usock_tuns.ut_fair_limit, + "USOCK_FAIR_LIMIT"); + if (rc) + return rc; + + rc = cfs_parse_int_tunable(&usock_tuns.ut_min_bulk, + "USOCK_MIN_BULK"); + if (rc) + return rc; + + rc = cfs_parse_int_tunable(&usock_tuns.ut_txcredits, + "USOCK_TXCREDITS"); + if (rc) + return rc; + + rc = cfs_parse_int_tunable(&usock_tuns.ut_peertxcredits, + "USOCK_PEERTXCREDITS"); + if (rc) + return rc; + + rc = cfs_parse_int_tunable(&usock_tuns.ut_socknagle, + "USOCK_SOCKNAGLE"); + if (rc) + return rc; + + rc = cfs_parse_int_tunable(&usock_tuns.ut_sockbufsiz, + "USOCK_SOCKBUFSIZ"); + if (rc) + return rc; + + if (usocklnd_validate_tunables()) + return -EINVAL; + + if (usock_tuns.ut_npollthreads == 0) { + usock_tuns.ut_npollthreads = cfs_online_cpus(); + + if (usock_tuns.ut_npollthreads <= 0) { + CERROR("Cannot find out the number of online CPUs\n"); + return -EINVAL; + } + } + + return 0; +} + + +int +usocklnd_base_startup() +{ + usock_pollthread_t *pt; + int i; + int rc; + + rc = usocklnd_update_tunables(); + if (rc) + return rc; + + usock_data.ud_npollthreads = usock_tuns.ut_npollthreads; + + LIBCFS_ALLOC (usock_data.ud_pollthreads, + usock_data.ud_npollthreads * + sizeof(usock_pollthread_t)); + if (usock_data.ud_pollthreads == NULL) + return -ENOMEM; + + /* Initialize poll thread state structures */ + for (i = 0; i < usock_data.ud_npollthreads; i++) { + int notifier[2]; + + pt = &usock_data.ud_pollthreads[i]; + + rc = -ENOMEM; + + LIBCFS_ALLOC (pt->upt_pollfd, + sizeof(struct pollfd) * UPT_START_SIZ); + if (pt->upt_pollfd == NULL) + goto base_startup_failed_0; + + LIBCFS_ALLOC (pt->upt_idx2conn, + sizeof(usock_conn_t *) * UPT_START_SIZ); + if (pt->upt_idx2conn == NULL) + goto base_startup_failed_1; + + LIBCFS_ALLOC (pt->upt_fd2idx, + sizeof(int) * UPT_START_SIZ); + if (pt->upt_fd2idx == NULL) + goto base_startup_failed_2; + + memset(pt->upt_fd2idx, 0, + sizeof(int) * UPT_START_SIZ); + + LIBCFS_ALLOC (pt->upt_skip, + sizeof(int) * UPT_START_SIZ); + if (pt->upt_skip == NULL) + goto base_startup_failed_3; + + pt->upt_npollfd = pt->upt_nfd2idx = UPT_START_SIZ; + + rc = libcfs_socketpair(notifier); + if (rc != 0) + goto base_startup_failed_4; + + pt->upt_notifier_fd = notifier[0]; + + pt->upt_pollfd[0].fd = notifier[1]; + pt->upt_pollfd[0].events = POLLIN; + pt->upt_pollfd[0].revents = 0; + + pt->upt_nfds = 1; + pt->upt_idx2conn[0] = NULL; + + pt->upt_errno = 0; + CFS_INIT_LIST_HEAD (&pt->upt_pollrequests); + CFS_INIT_LIST_HEAD (&pt->upt_stale_list); + pthread_mutex_init(&pt->upt_pollrequests_lock, NULL); + cfs_init_completion(&pt->upt_completion); + } + + /* Initialize peer hash list */ + for (i = 0; i < UD_PEER_HASH_SIZE; i++) + CFS_INIT_LIST_HEAD(&usock_data.ud_peers[i]); + + pthread_rwlock_init(&usock_data.ud_peers_lock, NULL); + + /* Spawn poll threads */ + for (i = 0; i < usock_data.ud_npollthreads; i++) { + rc = cfs_create_thread(usocklnd_poll_thread, + &usock_data.ud_pollthreads[i]); + if (rc) { + usocklnd_base_shutdown(i); + return rc; + } + } + + usock_data.ud_state = UD_STATE_INITIALIZED; + + return 0; + + base_startup_failed_4: + LIBCFS_FREE (pt->upt_skip, sizeof(int) * UPT_START_SIZ); + base_startup_failed_3: + LIBCFS_FREE (pt->upt_fd2idx, sizeof(int) * UPT_START_SIZ); + base_startup_failed_2: + LIBCFS_FREE (pt->upt_idx2conn, sizeof(usock_conn_t *) * UPT_START_SIZ); + base_startup_failed_1: + LIBCFS_FREE (pt->upt_pollfd, sizeof(struct pollfd) * UPT_START_SIZ); + base_startup_failed_0: + LASSERT(rc != 0); + usocklnd_release_poll_states(i); + LIBCFS_FREE (usock_data.ud_pollthreads, + usock_data.ud_npollthreads * + sizeof(usock_pollthread_t)); + return rc; +} + +void +usocklnd_base_shutdown(int n) +{ + int i; + + usock_data.ud_shutdown = 1; + for (i = 0; i < n; i++) { + usock_pollthread_t *pt = &usock_data.ud_pollthreads[i]; + usocklnd_wakeup_pollthread(i); + cfs_wait_for_completion(&pt->upt_completion); + } + + pthread_rwlock_destroy(&usock_data.ud_peers_lock); + + usocklnd_release_poll_states(usock_data.ud_npollthreads); + + LIBCFS_FREE (usock_data.ud_pollthreads, + usock_data.ud_npollthreads * + sizeof(usock_pollthread_t)); + + usock_data.ud_state = UD_STATE_INIT_NOTHING; +} + +__u64 +usocklnd_new_incarnation() +{ + struct timeval tv; + int rc = gettimeofday(&tv, NULL); + LASSERT (rc == 0); + return (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; +} + +static int +usocklnd_assign_ni_nid(lnet_ni_t *ni) +{ + int rc; + int up; + __u32 ipaddr; + + /* Find correct IP-address and update ni_nid with it. + * Two cases are supported: + * 1) no explicit interfaces are defined. NID will be assigned to + * first non-lo interface that is up; + * 2) exactly one explicit interface is defined. For example, + * LNET_NETWORKS='tcp(eth0)' */ + + if (ni->ni_interfaces[0] == NULL) { + char **names; + int i, n; + + n = libcfs_ipif_enumerate(&names); + if (n <= 0) { + CERROR("Can't enumerate interfaces: %d\n", n); + return -1; + } + + for (i = 0; i < n; i++) { + + if (!strcmp(names[i], "lo")) /* skip the loopback IF */ + continue; + + rc = libcfs_ipif_query(names[i], &up, &ipaddr); + if (rc != 0) { + CWARN("Can't get interface %s info: %d\n", + names[i], rc); + continue; + } + + if (!up) { + CWARN("Ignoring interface %s (down)\n", + names[i]); + continue; + } + + break; /* one address is quite enough */ + } + + libcfs_ipif_free_enumeration(names, n); + + if (i >= n) { + CERROR("Can't find any usable interfaces\n"); + return -1; + } + + CDEBUG(D_NET, "No explicit interfaces defined. " + "%u.%u.%u.%u used\n", HIPQUAD(ipaddr)); + } else { + if (ni->ni_interfaces[1] != NULL) { + CERROR("only one explicit interface is allowed\n"); + return -1; + } + + rc = libcfs_ipif_query(ni->ni_interfaces[0], &up, &ipaddr); + if (rc != 0) { + CERROR("Can't get interface %s info: %d\n", + ni->ni_interfaces[0], rc); + return -1; + } + + if (!up) { + CERROR("Explicit interface defined: %s but is down\n", + ni->ni_interfaces[0]); + return -1; + } + + CDEBUG(D_NET, "Explicit interface defined: %s. " + "%u.%u.%u.%u used\n", + ni->ni_interfaces[0], HIPQUAD(ipaddr)); + + } + + ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ipaddr); + + return 0; +} + +int +usocklnd_startup(lnet_ni_t *ni) +{ + int rc; + usock_net_t *net; + + if (usock_data.ud_state == UD_STATE_INIT_NOTHING) { + rc = usocklnd_base_startup(); + if (rc != 0) + return rc; + } + + LIBCFS_ALLOC(net, sizeof(*net)); + if (net == NULL) + goto startup_failed_0; + + memset(net, 0, sizeof(*net)); + net->un_incarnation = usocklnd_new_incarnation(); + pthread_mutex_init(&net->un_lock, NULL); + pthread_cond_init(&net->un_cond, NULL); + + ni->ni_data = net; + + if (!(the_lnet.ln_pid & LNET_PID_USERFLAG)) { + rc = usocklnd_assign_ni_nid(ni); + if (rc != 0) + goto startup_failed_1; + } + + LASSERT (ni->ni_lnd == &the_tcplnd); + + ni->ni_maxtxcredits = usock_tuns.ut_txcredits; + ni->ni_peertxcredits = usock_tuns.ut_peertxcredits; + + usock_data.ud_nets_count++; + return 0; + + startup_failed_1: + pthread_mutex_destroy(&net->un_lock); + pthread_cond_destroy(&net->un_cond); + LIBCFS_FREE(net, sizeof(*net)); + startup_failed_0: + if (usock_data.ud_nets_count == 0) + usocklnd_base_shutdown(usock_data.ud_npollthreads); + + return -ENETDOWN; +} + +void +usocklnd_shutdown(lnet_ni_t *ni) +{ + usock_net_t *net = ni->ni_data; + + net->un_shutdown = 1; + + usocklnd_del_all_peers(ni); + + /* Wait for all peer state to clean up */ + pthread_mutex_lock(&net->un_lock); + while (net->un_peercount != 0) + pthread_cond_wait(&net->un_cond, &net->un_lock); + pthread_mutex_unlock(&net->un_lock); + + /* Release usock_net_t structure */ + pthread_mutex_destroy(&net->un_lock); + pthread_cond_destroy(&net->un_cond); + LIBCFS_FREE(net, sizeof(*net)); + + usock_data.ud_nets_count--; + if (usock_data.ud_nets_count == 0) + usocklnd_base_shutdown(usock_data.ud_npollthreads); +} + +void +usocklnd_del_all_peers(lnet_ni_t *ni) +{ + struct list_head *ptmp; + struct list_head *pnxt; + usock_peer_t *peer; + int i; + + pthread_rwlock_wrlock(&usock_data.ud_peers_lock); + + for (i = 0; i < UD_PEER_HASH_SIZE; i++) { + list_for_each_safe (ptmp, pnxt, &usock_data.ud_peers[i]) { + peer = list_entry (ptmp, usock_peer_t, up_list); + + if (peer->up_ni != ni) + continue; + + usocklnd_del_peer_and_conns(peer); + } + } + + pthread_rwlock_unlock(&usock_data.ud_peers_lock); + + /* wakeup all threads */ + for (i = 0; i < usock_data.ud_npollthreads; i++) + usocklnd_wakeup_pollthread(i); +} + +void +usocklnd_del_peer_and_conns(usock_peer_t *peer) +{ + /* peer cannot disappear because it's still in hash list */ + + pthread_mutex_lock(&peer->up_lock); + /* content of conn[] array cannot change now */ + usocklnd_del_conns_locked(peer); + pthread_mutex_unlock(&peer->up_lock); + + /* peer hash list is still protected by the caller */ + list_del(&peer->up_list); + + usocklnd_peer_decref(peer); /* peer isn't in hash list anymore */ +} + +void +usocklnd_del_conns_locked(usock_peer_t *peer) +{ + int i; + + for (i=0; i < N_CONN_TYPES; i++) { + usock_conn_t *conn = peer->up_conns[i]; + if (conn != NULL) + usocklnd_conn_kill(conn); + } +} diff --git a/lnet/ulnds/socklnd/usocklnd.h b/lnet/ulnds/socklnd/usocklnd.h new file mode 100644 index 0000000..f2abf9d --- /dev/null +++ b/lnet/ulnds/socklnd/usocklnd.h @@ -0,0 +1,332 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001, 2002 Cluster File Systems, Inc. + * Author: Maxim Patlasov + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + */ +#define _GNU_SOURCE +#include +#include +#include +#include + +typedef struct { + struct list_head tx_list; /* neccessary to form tx list */ + lnet_msg_t *tx_lnetmsg; /* lnet message for lnet_finalize() */ + ksock_msg_t tx_msg; /* buffer for wire header of ksock msg */ + int tx_resid; /* # of residual bytes */ + int tx_nob; /* # of packet bytes */ + int tx_size; /* size of this descriptor */ + struct iovec *tx_iov; /* points to tx_iova[i] */ + int tx_niov; /* # of packet iovec frags */ + struct iovec tx_iova[1]; /* iov for header */ +} usock_tx_t; + +struct usock_peer_s; + +typedef struct { + int uc_fd; /* socket */ + int uc_type; /* conn type */ + int uc_activeflag; /* active side of connection? */ + int uc_flip; /* is peer other endian? */ + int uc_state; /* connection state */ + struct usock_peer_s *uc_peer; /* owning peer */ + lnet_process_id_t uc_peerid; /* id of remote peer */ + int uc_pt_idx; /* index in ud_pollthreads[] of + * owning poll thread */ + lnet_ni_t *uc_ni; /* parent NI while accepting */ + struct usock_preq_s *uc_preq; /* preallocated request */ + __u32 uc_peer_ip; /* IP address of the peer */ + __u16 uc_peer_port; /* port of the peer */ + struct list_head uc_stale_list; /* orphaned connections */ + + /* Receive state */ + int uc_rx_state; /* message or hello state */ + ksock_hello_msg_t *uc_rx_hello; /* hello buffer */ + struct iovec *uc_rx_iov; /* points to uc_rx_iova[i] */ + struct iovec uc_rx_iova[LNET_MAX_IOV]; /* message frags */ + int uc_rx_niov; /* # frags */ + int uc_rx_nob_left; /* # bytes to next hdr/body */ + int uc_rx_nob_wanted; /* # of bytes actually wanted */ + void *uc_rx_lnetmsg; /* LNET message being received */ + cfs_time_t uc_rx_deadline; /* when to time out */ + int uc_rx_flag; /* deadline valid? */ + ksock_msg_t uc_rx_msg; /* message buffer */ + + /* Send state */ + struct list_head uc_tx_list; /* pending txs */ + struct list_head uc_zcack_list; /* pending zc_acks */ + cfs_time_t uc_tx_deadline; /* when to time out */ + int uc_tx_flag; /* deadline valid? */ + int uc_sending; /* send op is in progress */ + usock_tx_t *uc_tx_hello; /* fake tx with hello */ + + cfs_atomic_t uc_refcount; /* # of users */ + pthread_mutex_t uc_lock; /* serialize */ + int uc_errored; /* a flag for lnet_notify() */ +} usock_conn_t; + +/* Allowable conn states are: */ +#define UC_CONNECTING 1 +#define UC_SENDING_HELLO 2 +#define UC_RECEIVING_HELLO 3 +#define UC_READY 4 +#define UC_DEAD 5 + +/* Allowable RX states are: */ +#define UC_RX_HELLO_MAGIC 1 +#define UC_RX_HELLO_VERSION 2 +#define UC_RX_HELLO_BODY 3 +#define UC_RX_HELLO_IPS 4 +#define UC_RX_KSM_HEADER 5 +#define UC_RX_LNET_HEADER 6 +#define UC_RX_PARSE 7 +#define UC_RX_PARSE_WAIT 8 +#define UC_RX_LNET_PAYLOAD 9 +#define UC_RX_SKIPPING 10 + +#define N_CONN_TYPES 3 /* CONTROL, BULK_IN and BULK_OUT */ + +typedef struct usock_peer_s { + struct list_head up_list; /* neccessary to form peer list */ + lnet_process_id_t up_peerid; /* id of remote peer */ + usock_conn_t *up_conns[N_CONN_TYPES]; /* conns that connect us + * us with the peer */ + lnet_ni_t *up_ni; /* pointer to parent NI */ + __u64 up_incarnation; /* peer's incarnation */ + int up_incrn_is_set; /* 0 if peer's incarnation + * hasn't been set so far */ + cfs_atomic_t up_refcount; /* # of users */ + pthread_mutex_t up_lock; /* serialize */ + int up_errored; /* a flag for lnet_notify() */ + cfs_time_t up_last_alive; /* when the peer was last alive */ +} usock_peer_t; + +typedef struct { + int upt_notifier_fd; /* notifier fd for writing */ + struct pollfd *upt_pollfd; /* poll fds */ + int upt_nfds; /* active poll fds */ + int upt_npollfd; /* allocated poll fds */ + usock_conn_t **upt_idx2conn; /* conns corresponding to + * upt_pollfd[idx] */ + int *upt_skip; /* skip chain */ + int *upt_fd2idx; /* index into upt_pollfd[] + * by fd */ + int upt_nfd2idx; /* # of allocated elements + * of upt_fd2idx[] */ + struct list_head upt_stale_list; /* list of orphaned conns */ + struct list_head upt_pollrequests; /* list of poll requests */ + pthread_mutex_t upt_pollrequests_lock; /* serialize */ + int upt_errno; /* non-zero if errored */ + struct cfs_completion upt_completion; /* wait/signal facility for + * syncronizing shutdown */ +} usock_pollthread_t; + +/* Number of elements in upt_pollfd[], upt_idx2conn[] and upt_fd2idx[] + * at initialization time. Will be resized on demand */ +#define UPT_START_SIZ 32 + +/* # peer lists */ +#define UD_PEER_HASH_SIZE 101 + +typedef struct { + int ud_state; /* initialization state */ + int ud_npollthreads; /* # of poll threads */ + usock_pollthread_t *ud_pollthreads; /* their state */ + int ud_shutdown; /* shutdown flag */ + int ud_nets_count; /* # of instances */ + struct list_head ud_peers[UD_PEER_HASH_SIZE]; /* peer hash table */ + pthread_rwlock_t ud_peers_lock; /* serialize */ +} usock_data_t; + +extern usock_data_t usock_data; + +/* ud_state allowed values */ +#define UD_STATE_INIT_NOTHING 0 +#define UD_STATE_INITIALIZED 1 + +typedef struct { + int un_peercount; /* # of peers */ + int un_shutdown; /* shutdown flag */ + __u64 un_incarnation; /* my epoch */ + pthread_cond_t un_cond; /* condvar to wait for notifications */ + pthread_mutex_t un_lock; /* a lock to protect un_cond */ +} usock_net_t; + +typedef struct { + int ut_poll_timeout; /* the third arg for poll(2) (seconds) */ + int ut_timeout; /* "stuck" socket timeout (seconds) */ + int ut_npollthreads; /* number of poll thread to spawn */ + int ut_fair_limit; /* how many packets can we receive or transmit + * without calling poll(2) */ + int ut_min_bulk; /* smallest "large" message */ + int ut_txcredits; /* # concurrent sends */ + int ut_peertxcredits; /* # concurrent sends to 1 peer */ + int ut_socknagle; /* Is Nagle alg on ? */ + int ut_sockbufsiz; /* size of socket buffers */ +} usock_tunables_t; + +extern usock_tunables_t usock_tuns; + +typedef struct usock_preq_s { + int upr_type; /* type of requested action */ + short upr_value; /* bitmask of POLLIN and POLLOUT bits */ + usock_conn_t * upr_conn; /* a conn for the sake of which + * action will be performed */ + struct list_head upr_list; /* neccessary to form list */ +} usock_pollrequest_t; + +/* Allowable poll request types are: */ +#define POLL_ADD_REQUEST 1 +#define POLL_DEL_REQUEST 2 +#define POLL_RX_SET_REQUEST 3 +#define POLL_TX_SET_REQUEST 4 +#define POLL_SET_REQUEST 5 + +typedef struct { + struct list_head zc_list; /* neccessary to form zc_ack list */ + __u64 zc_cookie; /* zero-copy cookie */ +} usock_zc_ack_t; + +static inline void +usocklnd_conn_addref(usock_conn_t *conn) +{ + LASSERT (cfs_atomic_read(&conn->uc_refcount) > 0); + cfs_atomic_inc(&conn->uc_refcount); +} + +void usocklnd_destroy_conn(usock_conn_t *conn); + +static inline void +usocklnd_conn_decref(usock_conn_t *conn) +{ + LASSERT (cfs_atomic_read(&conn->uc_refcount) > 0); + if (cfs_atomic_dec_and_test(&conn->uc_refcount)) + usocklnd_destroy_conn(conn); +} + +static inline void +usocklnd_peer_addref(usock_peer_t *peer) +{ + LASSERT (cfs_atomic_read(&peer->up_refcount) > 0); + cfs_atomic_inc(&peer->up_refcount); +} + +void usocklnd_destroy_peer(usock_peer_t *peer); + +static inline void +usocklnd_peer_decref(usock_peer_t *peer) +{ + LASSERT (cfs_atomic_read(&peer->up_refcount) > 0); + if (cfs_atomic_dec_and_test(&peer->up_refcount)) + usocklnd_destroy_peer(peer); +} + +static inline int +usocklnd_ip2pt_idx(__u32 ip) { + return ip % usock_data.ud_npollthreads; +} + +static inline struct list_head * +usocklnd_nid2peerlist(lnet_nid_t nid) +{ + unsigned int hash = ((unsigned int)nid) % UD_PEER_HASH_SIZE; + + return &usock_data.ud_peers[hash]; +} + +int usocklnd_startup(lnet_ni_t *ni); +void usocklnd_shutdown(lnet_ni_t *ni); +int usocklnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg); +int usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, + unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, + unsigned int offset, unsigned int mlen, unsigned int rlen); +int usocklnd_accept(lnet_ni_t *ni, int sock_fd); + +int usocklnd_poll_thread(void *arg); +int usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value); +void usocklnd_add_killrequest(usock_conn_t *conn); +int usocklnd_process_pollrequest(usock_pollrequest_t *pr, + usock_pollthread_t *pt_data); +void usocklnd_execute_handlers(usock_pollthread_t *pt_data); +int usocklnd_calculate_chunk_size(int num); +void usocklnd_wakeup_pollthread(int i); + +int usocklnd_notifier_handler(int fd); +void usocklnd_exception_handler(usock_conn_t *conn); +int usocklnd_read_handler(usock_conn_t *conn); +int usocklnd_read_msg(usock_conn_t *conn, int *cont_flag); +int usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie); +int usocklnd_read_hello(usock_conn_t *conn, int *cont_flag); +int usocklnd_activeconn_hellorecv(usock_conn_t *conn); +int usocklnd_passiveconn_hellorecv(usock_conn_t *conn); +int usocklnd_write_handler(usock_conn_t *conn); +usock_tx_t * usocklnd_try_piggyback(struct list_head *tx_list_p, + struct list_head *zcack_list_p); +int usocklnd_activeconn_hellosent(usock_conn_t *conn); +int usocklnd_passiveconn_hellosent(usock_conn_t *conn); +int usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx); +int usocklnd_read_data(usock_conn_t *conn); + +void usocklnd_release_poll_states(int n); +int usocklnd_base_startup(); +void usocklnd_base_shutdown(int n); +__u64 usocklnd_new_incarnation(); +void usocklnd_del_all_peers(lnet_ni_t *ni); +void usocklnd_del_peer_and_conns(usock_peer_t *peer); +void usocklnd_del_conns_locked(usock_peer_t *peer); + +int usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time); +void usocklnd_conn_kill(usock_conn_t *conn); +void usocklnd_conn_kill_locked(usock_conn_t *conn); +usock_conn_t *usocklnd_conn_allocate(); +void usocklnd_conn_free(usock_conn_t *conn); +void usocklnd_tear_peer_conn(usock_conn_t *conn); +void usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id); +int usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp); +int usocklnd_create_active_conn(usock_peer_t *peer, int type, + usock_conn_t **connp); +int usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port); +int usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port); +int usocklnd_set_sock_options(int fd); +void usocklnd_init_msg(ksock_msg_t *msg, int type); +usock_tx_t *usocklnd_create_noop_tx(__u64 cookie); +usock_tx_t *usocklnd_create_tx(lnet_msg_t *lntmsg); +void usocklnd_init_hello_msg(ksock_hello_msg_t *hello, + lnet_ni_t *ni, int type, lnet_nid_t peer_nid); +usock_tx_t *usocklnd_create_hello_tx(lnet_ni_t *ni, + int type, lnet_nid_t peer_nid); +usock_tx_t *usocklnd_create_cr_hello_tx(lnet_ni_t *ni, + int type, lnet_nid_t peer_nid); +void usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx); +void usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist); +void usocklnd_destroy_zcack_list(struct list_head *zcack_list); +void usocklnd_destroy_peer (usock_peer_t *peer); +int usocklnd_get_conn_type(lnet_msg_t *lntmsg); +int usocklnd_type2idx(int type); +usock_peer_t *usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id); +int usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id, + usock_peer_t **peerp); +int usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id, + usock_peer_t **peerp); +int usocklnd_find_or_create_conn(usock_peer_t *peer, int type, + usock_conn_t **connp, + usock_tx_t *tx, usock_zc_ack_t *zc_ack, + int *send_immediately_flag); +void usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx); +int usocklnd_invert_type(int type); +void usocklnd_conn_new_state(usock_conn_t *conn, int new_state); +void usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn, + usock_conn_t *skip_conn); + +void usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn); +void usocklnd_rx_helloversion_state_transition(usock_conn_t *conn); +void usocklnd_rx_hellobody_state_transition(usock_conn_t *conn); +void usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn); +void usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn); +void usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn); +void usocklnd_rx_skipping_state_transition(usock_conn_t *conn); diff --git a/lnet/ulnds/socklnd/usocklnd_cb.c b/lnet/ulnds/socklnd/usocklnd_cb.c new file mode 100644 index 0000000..c1337e9 --- /dev/null +++ b/lnet/ulnds/socklnd/usocklnd_cb.c @@ -0,0 +1,176 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001, 2002 Cluster File Systems, Inc. + * Author: Maxim Patlasov + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + */ + +#include "usocklnd.h" + +static int +usocklnd_send_tx_immediately(usock_conn_t *conn, usock_tx_t *tx) +{ + int rc; + int rc2; + int partial_send = 0; + usock_peer_t *peer = conn->uc_peer; + + LASSERT (peer != NULL); + + /* usocklnd_enqueue_tx() turned it on for us */ + LASSERT(conn->uc_sending); + + //counter_imm_start++; + rc = usocklnd_send_tx(conn, tx); + if (rc == 0) { /* partial send or connection closed */ + pthread_mutex_lock(&conn->uc_lock); + list_add(&tx->tx_list, &conn->uc_tx_list); + conn->uc_sending = 0; + pthread_mutex_unlock(&conn->uc_lock); + partial_send = 1; + } else { + usocklnd_destroy_tx(peer->up_ni, tx); + /* NB: lnetmsg was finalized, so we *must* return 0 */ + + if (rc < 0) { /* real error */ + usocklnd_conn_kill(conn); + return 0; + } + + /* rc == 1: tx was sent completely */ + rc = 0; /* let's say to caller 'Ok' */ + //counter_imm_complete++; + } + + pthread_mutex_lock(&conn->uc_lock); + conn->uc_sending = 0; + + /* schedule write handler */ + if (partial_send || + (conn->uc_state == UC_READY && + (!list_empty(&conn->uc_tx_list) || + !list_empty(&conn->uc_zcack_list)))) { + conn->uc_tx_deadline = + cfs_time_shift(usock_tuns.ut_timeout); + conn->uc_tx_flag = 1; + rc2 = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST, POLLOUT); + if (rc2 != 0) + usocklnd_conn_kill_locked(conn); + else + usocklnd_wakeup_pollthread(conn->uc_pt_idx); + } + + pthread_mutex_unlock(&conn->uc_lock); + + return rc; +} + +int +usocklnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) +{ + usock_tx_t *tx; + lnet_process_id_t target = lntmsg->msg_target; + usock_peer_t *peer; + int type; + int rc; + usock_conn_t *conn; + int send_immediately; + + tx = usocklnd_create_tx(lntmsg); + if (tx == NULL) + return -ENOMEM; + + rc = usocklnd_find_or_create_peer(ni, target, &peer); + if (rc) { + LIBCFS_FREE (tx, tx->tx_size); + return rc; + } + /* peer cannot disappear now because its refcount was incremented */ + + type = usocklnd_get_conn_type(lntmsg); + rc = usocklnd_find_or_create_conn(peer, type, &conn, tx, NULL, + &send_immediately); + if (rc != 0) { + usocklnd_peer_decref(peer); + usocklnd_check_peer_stale(ni, target); + LIBCFS_FREE (tx, tx->tx_size); + return rc; + } + /* conn cannot disappear now because its refcount was incremented */ + + if (send_immediately) + rc = usocklnd_send_tx_immediately(conn, tx); + + usocklnd_conn_decref(conn); + usocklnd_peer_decref(peer); + return rc; +} + +int +usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, + unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, + unsigned int offset, unsigned int mlen, unsigned int rlen) +{ + int rc = 0; + usock_conn_t *conn = (usock_conn_t *)private; + + /* I don't think that we'll win much concurrency moving lock() + * call below lnet_extract_iov() */ + pthread_mutex_lock(&conn->uc_lock); + + conn->uc_rx_lnetmsg = msg; + conn->uc_rx_nob_wanted = mlen; + conn->uc_rx_nob_left = rlen; + conn->uc_rx_iov = conn->uc_rx_iova; + conn->uc_rx_niov = + lnet_extract_iov(LNET_MAX_IOV, conn->uc_rx_iov, + niov, iov, offset, mlen); + + /* the gap between lnet_parse() and usocklnd_recv() happened? */ + if (conn->uc_rx_state == UC_RX_PARSE_WAIT) { + conn->uc_rx_flag = 1; /* waiting for incoming lnet payload */ + conn->uc_rx_deadline = + cfs_time_shift(usock_tuns.ut_timeout); + rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, POLLIN); + if (rc != 0) { + usocklnd_conn_kill_locked(conn); + goto recv_out; + } + usocklnd_wakeup_pollthread(conn->uc_pt_idx); + } + + conn->uc_rx_state = UC_RX_LNET_PAYLOAD; + recv_out: + pthread_mutex_unlock(&conn->uc_lock); + usocklnd_conn_decref(conn); + return rc; +} + +int +usocklnd_accept(lnet_ni_t *ni, int sock_fd) +{ + int rc; + usock_conn_t *conn; + + rc = usocklnd_create_passive_conn(ni, sock_fd, &conn); + if (rc) + return rc; + LASSERT(conn != NULL); + + /* disable shutdown event temporarily */ + lnet_ni_addref(ni); + + rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLIN); + if (rc == 0) + usocklnd_wakeup_pollthread(conn->uc_pt_idx); + + /* NB: conn reference counter was incremented while adding + * poll request if rc == 0 */ + + usocklnd_conn_decref(conn); /* should destroy conn if rc != 0 */ + return rc; +} -- 1.8.3.1