mxlnd - MX 1.2.1 or later,
ptllnd - Portals 3.3 / UNICOS/lc 1.5.x, 2.0.x
+Severity : normal
+Bugzilla : 12302
+Description: new userspace socklnd
+Details : Old userspace tcpnal that resided in lnet/ulnds/socklnd replaced
+ with new one - usocklnd.
+
--------------------------------------------------------------------------------
2007-09-27 Cluster File Systems, Inc. <info@clusterfs.com>
DIST_SUBDIRS := $(SUBDIRS)
EXTRA_DIST := curproc.h kp30.h libcfs.h list.h lltrace.h \
- portals_utils.h types.h user-lock.h user-prim.h user-time.h
+ portals_utils.h types.h user-lock.h user-prim.h user-time.h \
+ user-tcpip.h
int libcfs_deregister_ioctl(struct libcfs_ioctl_handler *hand);
/* libcfs tcpip */
-#define LNET_ACCEPTOR_MIN_RESERVED_PORT 512
-#define LNET_ACCEPTOR_MAX_RESERVED_PORT 1023
-
int libcfs_ipif_query(char *name, int *up, __u32 *ip, __u32 *mask);
int libcfs_ipif_enumerate(char ***names);
void libcfs_ipif_free_enumeration(char **names, int n);
/* __KERNEL__ */
#endif
+/* need both kernel and user-land acceptor */
+#define LNET_ACCEPTOR_MIN_RESERVED_PORT 512
+#define LNET_ACCEPTOR_MAX_RESERVED_PORT 1023
+
/*
* libcfs pseudo device operations
*
#error Do not #include this file directly. #include <libcfs/libcfs.h> instead
#endif
+#ifdef HAVE_ASM_TYPES_H
+#include <asm/types.h>
+#else
+#include <libcfs/types.h>
+#endif
+
#include <stdarg.h>
#include <libcfs/linux/linux-mem.h>
#include <libcfs/linux/linux-time.h>
#include <libcfs/linux/linux-fs.h>
#include <libcfs/linux/linux-tcpip.h>
-#ifdef HAVE_ASM_TYPES_H
-#include <asm/types.h>
-#else
-#include <libcfs/types.h>
-#endif
-
#ifdef __KERNEL__
# include <linux/types.h>
#define SOCK_ERROR(so) ((so)->sk->sk_err)
#define SOCK_TEST_NOSPACE(so) test_bit(SOCK_NOSPACE, &(so)->flags)
-#endif
+#else /* !__KERNEL__ */
+
+#include "../user-tcpip.h"
+
+#endif /* __KERNEL__ */
#endif
#endif
+#ifdef HAVE_LIBPTHREAD
+#include <pthread.h>
+
+/*
+ * Completion
+ */
+
+struct cfs_completion {
+ int c_done;
+ pthread_cond_t c_cond;
+ pthread_mutex_t c_mut;
+};
+
+void cfs_init_completion(struct cfs_completion *c);
+void cfs_fini_completion(struct cfs_completion *c);
+void cfs_complete(struct cfs_completion *c);
+void cfs_wait_for_completion(struct cfs_completion *c);
+
+/*
+ * atomic.h
+ */
+
+typedef struct { volatile int counter; } cfs_atomic_t;
+
+int cfs_atomic_read(cfs_atomic_t *a);
+void cfs_atomic_set(cfs_atomic_t *a, int b);
+int cfs_atomic_dec_and_test(cfs_atomic_t *a);
+void cfs_atomic_inc(cfs_atomic_t *a);
+void cfs_atomic_dec(cfs_atomic_t *a);
+void cfs_atomic_add(int b, cfs_atomic_t *a);
+void cfs_atomic_sub(int b, cfs_atomic_t *a);
+
+#endif /* HAVE_LIBPTHREAD */
+
/* !__KERNEL__ */
#endif
#include <stdlib.h>
#include <unistd.h>
+#ifdef HAVE_LIBPTHREAD
+#include <pthread.h>
+#endif
+
#ifndef PAGE_SIZE
#define PAGE_SIZE (getpagesize())
#define cfs_recalc_sigpending(l) do {} while (0)
#define cfs_kernel_thread(l,m,n) LBUG()
+#ifdef HAVE_LIBPTHREAD
+typedef int (*cfs_thread_t)(void *);
+int cfs_create_thread(cfs_thread_t func, void *arg);
+#else
+#define cfs_create_thread(l,m) LBUG()
+#endif
+
+int cfs_parse_int_tunable(int *value, char *name);
+uid_t cfs_curproc_uid(void);
+
+#define LIBCFS_REALLOC(ptr, size) realloc(ptr, size)
+
+#define cfs_online_cpus() sysconf(_SC_NPROCESSORS_ONLN)
+
// static inline void local_irq_save(unsigned long flag) {return;}
// static inline void local_irq_restore(unsigned long flag) {return;}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2005 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef __LIBCFS_USER_TCPIP_H__
+#define __LIBCFS_USER_TCPIP_H__
+
+#ifndef __LIBCFS_LIBCFS_H__
+#error Do not #include this file directly. #include <libcfs/libcfs.h> instead
+#endif
+
+#ifndef __KERNEL__
+
+#include <sys/uio.h>
+
+/*
+ * Functions to get network interfaces info
+ */
+
+int libcfs_sock_ioctl(int cmd, unsigned long arg);
+int libcfs_ipif_query (char *name, int *up, __u32 *ip);
+void libcfs_ipif_free_enumeration (char **names, int n);
+int libcfs_ipif_enumerate (char ***namesp);
+
+/*
+ * Network function used by user-land lnet acceptor
+ */
+
+int libcfs_sock_listen (int *sockp, __u32 local_ip, int local_port, int backlog);
+int libcfs_sock_accept (int *newsockp, int sock, __u32 *peer_ip, int *peer_port);
+int libcfs_sock_read (int sock, void *buffer, int nob, int timeout);
+void libcfs_sock_abort_accept(__u16 port);
+
+/*
+ * Network functions of common use
+ */
+
+int libcfs_getpeername(int sock_fd, __u32 *ipaddr_p, __u16 *port_p);
+int libcfs_socketpair(int *fdp);
+int libcfs_fcntl_nonblock(int fd);
+int libcfs_sock_set_nagle(int fd, int nagle);
+int libcfs_sock_set_bufsiz(int fd, int bufsiz);
+int libcfs_sock_create(int *fdp);
+int libcfs_sock_bind_to_port(int fd, __u16 port);
+int libcfs_sock_connect(int fd, __u32 ip, __u16 port);
+int libcfs_sock_writev(int fd, const struct iovec *vector, int count);
+int libcfs_sock_readv(int fd, const struct iovec *vector, int count);
+
+/*
+ * Macros for easy printing IP-adresses
+ */
+
+#define NIPQUAD(addr) \
+ ((unsigned char *)&addr)[0], \
+ ((unsigned char *)&addr)[1], \
+ ((unsigned char *)&addr)[2], \
+ ((unsigned char *)&addr)[3]
+
+#if defined(__LITTLE_ENDIAN) || defined(_LITTLE_ENDIAN)
+#define HIPQUAD(addr) \
+ ((unsigned char *)&addr)[3], \
+ ((unsigned char *)&addr)[2], \
+ ((unsigned char *)&addr)[1], \
+ ((unsigned char *)&addr)[0]
+#elif defined(__BIG_ENDIAN) || defined(_BIG_ENDIAN)
+#define HIPQUAD NIPQUAD
+#else
+#error "Undefined byteorder??"
+#endif /* __LITTLE_ENDIAN */
+
+#endif /* !__KERNEL__ */
+
+#endif
int LNetSetAsync(lnet_process_id_t id, int nasync);
+#ifndef __KERNEL__
+/* Temporary workaround to allow uOSS and test programs force server
+ * mode in userspace. See comments near ln_server_mode_flag in
+ * lnet/lib-types.h */
+
+void lnet_server_mode();
+#endif
+
#endif
int lnet_acceptor_port(void);
#endif
+#ifdef HAVE_LIBPTHREAD
+int lnet_count_acceptor_nis(lnet_ni_t **first_ni);
+int lnet_acceptor_port(void);
+#endif
+
int lnet_acceptor_start(void);
void lnet_acceptor_stop(void);
/* ensure non-RDMA messages can be received outside liblustre */
int (*lnd_setasync)(struct lnet_ni *ni, lnet_process_id_t id, int nasync);
+
+#ifdef HAVE_LIBPTHREAD
+ int (*lnd_accept)(struct lnet_ni *ni, int sock);
+#endif
#endif
} lnd_t;
struct list_head ln_active_eqs;
lnet_counters_t ln_counters;
+
+#ifndef __KERNEL__
+ /* Temporary workaround to allow uOSS and test programs force
+ * server mode in userspace. The only place where we use it is
+ * lnet_prepare(). The only way to turn this flag on is to
+ * call lnet_server_mode() */
+
+ int ln_server_mode_flag;
+#endif
} lnet_t;
#endif
#define KSOCK_MSG_NOOP 0xc0 /* ksm_u empty */
#define KSOCK_MSG_LNET 0xc1 /* lnet msg */
+/* We need to know this number to parse hello msg from ksocklnd in
+ * other LND (usocklnd, for example) */
+#define KSOCK_PROTO_V2 2
+
#endif
#define KSOCK_PROTO_V1_MAJOR LNET_PROTO_TCP_VERSION_MAJOR
#define KSOCK_PROTO_V1_MINOR LNET_PROTO_TCP_VERSION_MINOR
#define KSOCK_PROTO_V1 KSOCK_PROTO_V1_MAJOR
-#define KSOCK_PROTO_V2 2
static inline int
ksocknal_route_mask(void)
if LIBLUSTRE
noinst_LIBRARIES= libcfs.a
-libcfs_a_SOURCES= debug.c user-prim.c user-lock.c
+libcfs_a_SOURCES= debug.c user-prim.c user-lock.c user-tcpip.c
libcfs_a_CPPFLAGS = $(LLCPPFLAGS)
libcfs_a_CFLAGS = $(LLCFLAGS)
endif
EXTRA_DIST := Info.plist
MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ linux-*.c linux/*.o darwin/*.o libcfs
-DIST_SOURCES := $(libcfs-all-objs:%.o=%.c) tracefile.h user-prim.c user-lock.c
+DIST_SOURCES := $(libcfs-all-objs:%.o=%.c) tracefile.h user-prim.c \
+ user-lock.c user-tcpip.c
#include <stdlib.h>
#include <libcfs/libcfs.h>
+#include <libcfs/kp30.h>
+
/*
* Optional debugging (magic stamping and checking ownership) can be added.
*/
}
#endif
+#ifdef HAVE_LIBPTHREAD
+
+/*
+ * Completion
+ */
+
+void cfs_init_completion(struct cfs_completion *c)
+{
+ LASSERT(c != NULL);
+ c->c_done = 0;
+ pthread_mutex_init(&c->c_mut, NULL);
+ pthread_cond_init(&c->c_cond, NULL);
+}
+
+void cfs_fini_completion(struct cfs_completion *c)
+{
+ LASSERT(c != NULL);
+ pthread_mutex_destroy(&c->c_mut);
+ pthread_cond_destroy(&c->c_cond);
+}
+
+void cfs_complete(struct cfs_completion *c)
+{
+ LASSERT(c != NULL);
+ pthread_mutex_lock(&c->c_mut);
+ c->c_done++;
+ pthread_cond_signal(&c->c_cond);
+ pthread_mutex_unlock(&c->c_mut);
+}
+
+void cfs_wait_for_completion(struct cfs_completion *c)
+{
+ LASSERT(c != NULL);
+ pthread_mutex_lock(&c->c_mut);
+ while (c->c_done == 0)
+ pthread_cond_wait(&c->c_cond, &c->c_mut);
+ c->c_done--;
+ pthread_mutex_unlock(&c->c_mut);
+}
+
+/*
+ * atomic primitives
+ */
+
+static pthread_mutex_t atomic_guard_lock = PTHREAD_MUTEX_INITIALIZER;
+
+int cfs_atomic_read(cfs_atomic_t *a)
+{
+ int r;
+
+ pthread_mutex_lock(&atomic_guard_lock);
+ r = a->counter;
+ pthread_mutex_unlock(&atomic_guard_lock);
+ return r;
+}
+
+void cfs_atomic_set(cfs_atomic_t *a, int b)
+{
+ pthread_mutex_lock(&atomic_guard_lock);
+ a->counter = b;
+ pthread_mutex_unlock(&atomic_guard_lock);
+}
+
+int cfs_atomic_dec_and_test(cfs_atomic_t *a)
+{
+ int r;
+
+ pthread_mutex_lock(&atomic_guard_lock);
+ r = --a->counter;
+ pthread_mutex_unlock(&atomic_guard_lock);
+ return (r == 0);
+}
+
+void cfs_atomic_inc(cfs_atomic_t *a)
+{
+ pthread_mutex_lock(&atomic_guard_lock);
+ ++a->counter;
+ pthread_mutex_unlock(&atomic_guard_lock);
+}
+
+void cfs_atomic_dec(cfs_atomic_t *a)
+{
+ pthread_mutex_lock(&atomic_guard_lock);
+ --a->counter;
+ pthread_mutex_unlock(&atomic_guard_lock);
+}
+void cfs_atomic_add(int b, cfs_atomic_t *a)
+
+{
+ pthread_mutex_lock(&atomic_guard_lock);
+ a->counter += b;
+ pthread_mutex_unlock(&atomic_guard_lock);
+}
+
+void cfs_atomic_sub(int b, cfs_atomic_t *a)
+{
+ pthread_mutex_lock(&atomic_guard_lock);
+ a->counter -= b;
+ pthread_mutex_unlock(&atomic_guard_lock);
+}
+
+#endif /* HAVE_LIBPTHREAD */
+
+
/* !__KERNEL__ */
#endif
return 0;
}
+#ifdef HAVE_LIBPTHREAD
+
+/*
+ * Threads
+ */
+
+struct lustre_thread_arg {
+ cfs_thread_t f;
+ void *arg;
+};
+static void *cfs_thread_helper(void *data)
+{
+ struct lustre_thread_arg *targ = data;
+ cfs_thread_t f = targ->f;
+ void *arg = targ->arg;
+
+ free(targ);
+
+ (void)f(arg);
+ return NULL;
+}
+int cfs_create_thread(cfs_thread_t func, void *arg)
+{
+ pthread_t tid;
+ pthread_attr_t tattr;
+ int rc;
+ struct lustre_thread_arg *targ_p = malloc(sizeof(struct lustre_thread_arg));
+
+ if ( targ_p == NULL )
+ return -ENOMEM;
+
+ targ_p->f = func;
+ targ_p->arg = arg;
+
+ pthread_attr_init(&tattr);
+ pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
+ rc = pthread_create(&tid, &tattr, cfs_thread_helper, targ_p);
+ pthread_attr_destroy(&tattr);
+ return -rc;
+}
+#endif
+
+uid_t cfs_curproc_uid(void)
+{
+ return getuid();
+}
+
+int cfs_parse_int_tunable(int *value, char *name)
+{
+ char *env = getenv(name);
+ char *end;
+
+ if (env == NULL)
+ return 0;
+
+ *value = strtoull(env, &end, 0);
+ if (*end == 0)
+ return 0;
+
+ CERROR("Can't parse tunable %s=%s\n", name, env);
+ return -EINVAL;
+}
+
/*
* Allocator
*/
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2005 Cluster File Systems, Inc.
+ *
+ * This file is part of Lustre, http://www.lustre.org.
+ *
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
+ *
+ * Lustre is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
+ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef __KERNEL__
+
+#include <sys/socket.h>
+#include <netinet/tcp.h>
+#include <sys/ioctl.h>
+#include <unistd.h>
+#include <string.h>
+#include <unistd.h>
+#include <poll.h>
+#include <net/if.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#if defined(__sun__) || defined(__sun)
+#include <sys/sockio.h>
+#endif
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
+
+#include <libcfs/libcfs.h>
+#include <libcfs/kp30.h>
+
+/*
+ * Functions to get network interfaces info
+ */
+
+int
+libcfs_sock_ioctl(int cmd, unsigned long arg)
+{
+ int fd, rc;
+
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+
+ if (fd < 0) {
+ rc = -errno;
+ CERROR("socket() failed: errno==%d\n", errno);
+ return rc;
+ }
+
+ rc = ioctl(fd, cmd, arg);
+
+ close(fd);
+ return rc;
+}
+
+int
+libcfs_ipif_query (char *name, int *up, __u32 *ip)
+{
+ struct ifreq ifr;
+ int nob;
+ int rc;
+ __u32 val;
+
+ nob = strlen(name);
+ if (nob >= IFNAMSIZ) {
+ CERROR("Interface name %s too long\n", name);
+ return -EINVAL;
+ }
+
+ CLASSERT (sizeof(ifr.ifr_name) >= IFNAMSIZ);
+
+ strcpy(ifr.ifr_name, name);
+ rc = libcfs_sock_ioctl(SIOCGIFFLAGS, (unsigned long)&ifr);
+
+ if (rc != 0) {
+ CERROR("Can't get flags for interface %s\n", name);
+ return rc;
+ }
+
+ if ((ifr.ifr_flags & IFF_UP) == 0) {
+ CDEBUG(D_NET, "Interface %s down\n", name);
+ *up = 0;
+ *ip = 0;
+ return 0;
+ }
+
+ *up = 1;
+
+ strcpy(ifr.ifr_name, name);
+ ifr.ifr_addr.sa_family = AF_INET;
+ rc = libcfs_sock_ioctl(SIOCGIFADDR, (unsigned long)&ifr);
+
+ if (rc != 0) {
+ CERROR("Can't get IP address for interface %s\n", name);
+ return rc;
+ }
+
+ val = ((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr.s_addr;
+ *ip = ntohl(val);
+
+ return 0;
+}
+
+void
+libcfs_ipif_free_enumeration (char **names, int n)
+{
+ int i;
+
+ LASSERT (n > 0);
+
+ for (i = 0; i < n && names[i] != NULL; i++)
+ LIBCFS_FREE(names[i], IFNAMSIZ);
+
+ LIBCFS_FREE(names, n * sizeof(*names));
+}
+
+int
+libcfs_ipif_enumerate (char ***namesp)
+{
+ /* Allocate and fill in 'names', returning # interfaces/error */
+ char **names;
+ int nalloc;
+ int nfound;
+ struct ifreq *ifr;
+ struct ifconf ifc;
+ int rc;
+ int nob;
+ int i;
+
+
+ nalloc = 16; /* first guess at max interfaces */
+ for (;;) {
+ LIBCFS_ALLOC(ifr, nalloc * sizeof(*ifr));
+ if (ifr == NULL) {
+ CERROR ("ENOMEM enumerating up to %d interfaces\n",
+ nalloc);
+ rc = -ENOMEM;
+ goto out0;
+ }
+
+ ifc.ifc_buf = (char *)ifr;
+ ifc.ifc_len = nalloc * sizeof(*ifr);
+
+ rc = libcfs_sock_ioctl(SIOCGIFCONF, (unsigned long)&ifc);
+
+ if (rc < 0) {
+ CERROR ("Error %d enumerating interfaces\n", rc);
+ goto out1;
+ }
+
+ LASSERT (rc == 0);
+
+ nfound = ifc.ifc_len/sizeof(*ifr);
+ LASSERT (nfound <= nalloc);
+
+ if (nfound < nalloc)
+ break;
+
+ LIBCFS_FREE(ifr, nalloc * sizeof(*ifr));
+ nalloc *= 2;
+ }
+
+ if (nfound == 0)
+ goto out1;
+
+ LIBCFS_ALLOC(names, nfound * sizeof(*names));
+ if (names == NULL) {
+ rc = -ENOMEM;
+ goto out1;
+ }
+ /* NULL out all names[i] */
+ memset (names, 0, nfound * sizeof(*names));
+
+ for (i = 0; i < nfound; i++) {
+
+ nob = strlen (ifr[i].ifr_name);
+ if (nob >= IFNAMSIZ) {
+ /* no space for terminating NULL */
+ CERROR("interface name %.*s too long (%d max)\n",
+ nob, ifr[i].ifr_name, IFNAMSIZ);
+ rc = -ENAMETOOLONG;
+ goto out2;
+ }
+
+ LIBCFS_ALLOC(names[i], IFNAMSIZ);
+ if (names[i] == NULL) {
+ rc = -ENOMEM;
+ goto out2;
+ }
+
+ memcpy(names[i], ifr[i].ifr_name, nob);
+ names[i][nob] = 0;
+ }
+
+ *namesp = names;
+ rc = nfound;
+
+ out2:
+ if (rc < 0)
+ libcfs_ipif_free_enumeration(names, nfound);
+ out1:
+ LIBCFS_FREE(ifr, nalloc * sizeof(*ifr));
+ out0:
+ return rc;
+}
+
+/*
+ * Network functions used by user-land lnet acceptor
+ */
+
+int
+libcfs_sock_listen (int *sockp, __u32 local_ip, int local_port, int backlog)
+{
+ int rc;
+ int option;
+ struct sockaddr_in locaddr;
+
+ *sockp = socket(AF_INET, SOCK_STREAM, 0);
+ if (*sockp < 0) {
+ rc = -errno;
+ CERROR("socket() failed: errno==%d\n", errno);
+ return rc;
+ }
+
+ option = 1;
+ if ( setsockopt(*sockp, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&option, sizeof (option)) ) {
+ rc = -errno;
+ CERROR("setsockopt(SO_REUSEADDR) failed: errno==%d\n", errno);
+ goto failed;
+ }
+
+ if (local_ip != 0 || local_port != 0) {
+ memset(&locaddr, 0, sizeof(locaddr));
+ locaddr.sin_family = AF_INET;
+ locaddr.sin_port = htons(local_port);
+ locaddr.sin_addr.s_addr = (local_ip == 0) ?
+ INADDR_ANY : htonl(local_ip);
+
+ if ( bind(*sockp, (struct sockaddr *)&locaddr, sizeof(locaddr)) ) {
+ rc = -errno;
+ if ( errno == -EADDRINUSE )
+ CDEBUG(D_NET, "Port %d already in use\n",
+ local_port);
+ else
+ CERROR("bind() to port %d failed: errno==%d\n",
+ local_port, errno);
+ goto failed;
+ }
+ }
+
+ if ( listen(*sockp, backlog) ) {
+ rc = -errno;
+ CERROR("listen() with backlog==%d failed: errno==%d\n",
+ backlog, errno);
+ goto failed;
+ }
+
+ return 0;
+
+ failed:
+ close(*sockp);
+ return rc;
+}
+
+int
+libcfs_sock_accept (int *newsockp, int sock, __u32 *peer_ip, int *peer_port)
+{
+ struct sockaddr_in accaddr;
+ socklen_t accaddr_len = sizeof(struct sockaddr_in);
+
+ *newsockp = accept(sock, (struct sockaddr *)&accaddr, &accaddr_len);
+
+ if ( *newsockp < 0 ) {
+ CERROR("accept() failed: errno==%d\n", errno);
+ return -errno;
+ }
+
+ *peer_ip = ntohl(accaddr.sin_addr.s_addr);
+ *peer_port = ntohs(accaddr.sin_port);
+
+ return 0;
+}
+
+int
+libcfs_sock_read (int sock, void *buffer, int nob, int timeout)
+{
+ int rc;
+ struct pollfd pfd;
+ cfs_time_t start_time = cfs_time_current();
+
+ pfd.fd = sock;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ /* poll(2) measures timeout in msec */
+ timeout *= 1000;
+
+ while (nob != 0 && timeout > 0) {
+ cfs_time_t current_time;
+
+ rc = poll(&pfd, 1, timeout);
+ if (rc < 0)
+ return -errno;
+ if (rc == 0)
+ return -ETIMEDOUT;
+ if ((pfd.revents & POLLIN) == 0)
+ return -EIO;
+
+ rc = read(sock, buffer, nob);
+ if (rc < 0)
+ return -errno;
+ if (rc == 0)
+ return -EIO;
+
+ buffer = ((char *)buffer) + rc;
+ nob -= rc;
+
+ current_time = cfs_time_current();
+ timeout -= cfs_duration_sec(cfs_time_sub(cfs_time_current(),
+ start_time));
+ }
+
+ if (nob == 0)
+ return 0;
+ else
+ return -ETIMEDOUT;
+}
+
+/* Just try to connect to localhost to wake up entity that are
+ * sleeping in accept() */
+void
+libcfs_sock_abort_accept(__u16 port)
+{
+ int fd, rc;
+ struct sockaddr_in locaddr;
+
+ memset(&locaddr, 0, sizeof(locaddr));
+ locaddr.sin_family = AF_INET;
+ locaddr.sin_port = htons(port);
+ locaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
+
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if ( fd < 0 ) {
+ CERROR("socket() failed: errno==%d\n", errno);
+ return;
+ }
+
+ rc = connect(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
+ if ( rc != 0 ) {
+ if ( errno != ECONNREFUSED )
+ CERROR("connect() failed: errno==%d\n", errno);
+ else
+ CDEBUG(D_NET, "Nobody to wake up at %d\n", port);
+ }
+
+ close(fd);
+}
+
+/*
+ * Network functions of common use
+ */
+
+int
+libcfs_getpeername(int sock_fd, __u32 *ipaddr_p, __u16 *port_p)
+{
+ int rc;
+ struct sockaddr_in peer_addr;
+ socklen_t peer_addr_len = sizeof(peer_addr);
+
+ rc = getpeername(sock_fd, (struct sockaddr *)&peer_addr, &peer_addr_len);
+ if (rc != 0)
+ return -errno;
+
+ if (ipaddr_p != NULL)
+ *ipaddr_p = ntohl(peer_addr.sin_addr.s_addr);
+ if (port_p != NULL)
+ *port_p = ntohs(peer_addr.sin_port);
+
+ return 0;
+}
+
+int
+libcfs_socketpair(int *fdp)
+{
+ int rc, i;
+
+ rc = socketpair(AF_UNIX, SOCK_STREAM, 0, fdp);
+ if (rc != 0) {
+ rc = -errno;
+ CERROR ("Cannot create socket pair\n");
+ return rc;
+ }
+
+ for (i = 0; i < 2; i++) {
+ rc = libcfs_fcntl_nonblock(fdp[i]);
+ if (rc) {
+ close(fdp[0]);
+ close(fdp[1]);
+ return rc;
+ }
+ }
+
+ return 0;
+}
+
+int
+libcfs_fcntl_nonblock(int fd)
+{
+ int rc, flags;
+
+ flags = fcntl(fd, F_GETFL, 0);
+ if (flags == -1) {
+ rc = -errno;
+ CERROR ("Cannot get socket flags\n");
+ return rc;
+ }
+
+ rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ if (rc != 0) {
+ rc = -errno;
+ CERROR ("Cannot set socket flags\n");
+ return rc;
+ }
+
+ return 0;
+}
+
+int
+libcfs_sock_set_nagle(int fd, int nagle)
+{
+ int rc;
+ int option = nagle ? 0 : 1;
+
+#if defined(__sun__) || defined(__sun)
+ rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &option, sizeof(option));
+#else
+ rc = setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
+#endif
+
+ if (rc != 0) {
+ rc = -errno;
+ CERROR ("Cannot set NODELAY socket option\n");
+ return rc;
+ }
+
+ return 0;
+}
+
+int
+libcfs_sock_set_bufsiz(int fd, int bufsiz)
+{
+ int rc, option;
+
+ LASSERT (bufsiz != 0);
+
+ option = bufsiz;
+ rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
+ if (rc != 0) {
+ rc = -errno;
+ CERROR ("Cannot set SNDBUF socket option\n");
+ return rc;
+ }
+
+ option = bufsiz;
+ rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
+ if (rc != 0) {
+ rc = -errno;
+ CERROR ("Cannot set RCVBUF socket option\n");
+ return rc;
+ }
+
+ return 0;
+}
+
+int
+libcfs_sock_create(int *fdp)
+{
+ int rc, fd, option;
+
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd < 0) {
+ rc = -errno;
+ CERROR ("Cannot create socket\n");
+ return rc;
+ }
+
+ option = 1;
+ rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
+ &option, sizeof(option));
+ if (rc != 0) {
+ rc = -errno;
+ CERROR ("Cannot set SO_REUSEADDR for socket\n");
+ close(fd);
+ return rc;
+ }
+
+ *fdp = fd;
+ return 0;
+}
+
+int
+libcfs_sock_bind_to_port(int fd, __u16 port)
+{
+ int rc;
+ struct sockaddr_in locaddr;
+
+ memset(&locaddr, 0, sizeof(locaddr));
+ locaddr.sin_family = AF_INET;
+ locaddr.sin_addr.s_addr = INADDR_ANY;
+ locaddr.sin_port = htons(port);
+
+ rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
+ if (rc != 0) {
+ rc = -errno;
+ CERROR ("Cannot bind to port %d\n", port);
+ return rc;
+ }
+
+ return 0;
+}
+
+int
+libcfs_sock_connect(int fd, __u32 ip, __u16 port)
+{
+ int rc;
+ struct sockaddr_in addr;
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(ip);
+ addr.sin_port = htons(port);
+
+ rc = connect(fd, (struct sockaddr *)&addr,
+ sizeof(struct sockaddr_in));
+
+ if(rc != 0 && errno != EINPROGRESS) {
+ rc = -errno;
+ if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL)
+ CERROR ("Cannot connect to %u.%u.%u.%u:%d (err=%d)\n",
+ HIPQUAD(ip), port, errno);
+ return rc;
+ }
+
+ return 0;
+}
+
+/* NB: EPIPE and ECONNRESET are considered as non-fatal
+ * because:
+ * 1) it still makes sense to continue reading &&
+ * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
+int libcfs_sock_writev(int fd, const struct iovec *vector, int count)
+{
+ int rc;
+
+ rc = syscall(SYS_writev, fd, vector, count);
+
+ if (rc == 0) /* write nothing */
+ return 0;
+
+ if (rc < 0) {
+ if (errno == EAGAIN || /* write nothing */
+ errno == EPIPE || /* non-fatal error */
+ errno == ECONNRESET) /* non-fatal error */
+ return 0;
+ else
+ return -errno;
+ }
+
+ return rc;
+}
+
+int libcfs_sock_readv(int fd, const struct iovec *vector, int count)
+{
+ int rc;
+
+ rc = syscall(SYS_readv, fd, vector, count);
+
+ if (rc == 0) /* EOF */
+ return -EIO;
+
+ if (rc < 0) {
+ if (errno == EAGAIN) /* read nothing */
+ return 0;
+ else
+ return -errno;
+ }
+
+ return rc;
+}
+
+#endif /* !__KERNEL__ */
}
#else /* __KERNEL__ */
+#ifdef HAVE_LIBPTHREAD
+
+static char *accept_type;
+static int accept_port = 988;
+static int accept_backlog;
+static int accept_timeout;
+
+struct {
+ int pta_shutdown;
+ int pta_sock;
+ struct cfs_completion pta_completion;
+} lnet_acceptor_state;
+
+int
+lnet_acceptor_port(void)
+{
+ return accept_port;
+}
+
+int
+lnet_parse_int_tunable(int *value, char *name, int dflt)
+{
+ char *env = getenv(name);
+ char *end;
+
+ if (env == NULL) {
+ *value = dflt;
+ return 0;
+ }
+
+ *value = strtoull(env, &end, 0);
+ if (*end == 0)
+ return 0;
+
+ CERROR("Can't parse tunable %s=%s\n", name, env);
+ return -EINVAL;
+}
+
+int
+lnet_parse_string_tunable(char **value, char *name, char *dflt)
+{
+ char *env = getenv(name);
+
+ if (env == NULL)
+ *value = dflt;
+ else
+ *value = env;
+
+ return 0;
+}
+
+int
+lnet_get_tunables()
+{
+ int rc;
+ rc = lnet_parse_string_tunable(&accept_type, "LNET_ACCEPT", "secure");
+
+ if (rc != 0)
+ return rc;
+
+ rc = lnet_parse_int_tunable(&accept_port, "LNET_ACCEPT_PORT", 988);
+
+ if (rc != 0)
+ return rc;
+
+ rc = lnet_parse_int_tunable(&accept_backlog, "LNET_ACCEPT_BACKLOG", 127);
+
+ if (rc != 0)
+ return rc;
+
+ rc = lnet_parse_int_tunable(&accept_timeout, "LNET_ACCEPT_TIMEOUT", 5);
+
+ if (rc != 0)
+ return rc;
+
+ CDEBUG(D_NET, "accept_type = %s\n", accept_type);
+ CDEBUG(D_NET, "accept_port = %d\n", accept_port);
+ CDEBUG(D_NET, "accept_backlog = %d\n", accept_backlog);
+ CDEBUG(D_NET, "accept_timeout = %d\n", accept_timeout);
+ return 0;
+}
+
+static inline int
+lnet_accept_magic(__u32 magic, __u32 constant)
+{
+ return (magic == constant ||
+ magic == __swab32(constant));
+}
+
+/* user-land lnet_accept() isn't used by any LND's directly. So, we don't
+ * do it visible outside acceptor.c and we can change its prototype
+ * freely */
+static int
+lnet_accept(int sock, __u32 magic, __u32 peer_ip, int peer_port)
+{
+ int rc, flip;
+ lnet_acceptor_connreq_t cr;
+ lnet_ni_t *ni;
+
+ if (!lnet_accept_magic(magic, LNET_PROTO_ACCEPTOR_MAGIC)) {
+ LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u magic %08x: "
+ "unsupported acceptor protocol\n",
+ HIPQUAD(peer_ip), magic);
+ return -EPROTO;
+ }
+
+ flip = (magic != LNET_PROTO_ACCEPTOR_MAGIC);
+
+ rc = libcfs_sock_read(sock, &cr.acr_version,
+ sizeof(cr.acr_version),
+ accept_timeout);
+ if (rc != 0) {
+ CERROR("Error %d reading connection request version from "
+ "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
+ return -EIO;
+ }
+
+ if (flip)
+ __swab32s(&cr.acr_version);
+
+ if (cr.acr_version != LNET_PROTO_ACCEPTOR_VERSION)
+ return -EPROTO;
+
+ rc = libcfs_sock_read(sock, &cr.acr_nid,
+ sizeof(cr) -
+ offsetof(lnet_acceptor_connreq_t, acr_nid),
+ accept_timeout);
+ if (rc != 0) {
+ CERROR("Error %d reading connection request from "
+ "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
+ return -EIO;
+ }
+
+ if (flip)
+ __swab64s(&cr.acr_nid);
+
+ ni = lnet_net2ni(LNET_NIDNET(cr.acr_nid));
+
+ if (ni == NULL || /* no matching net */
+ ni->ni_nid != cr.acr_nid) { /* right NET, wrong NID! */
+ if (ni != NULL)
+ lnet_ni_decref(ni);
+ LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: "
+ " No matching NI\n",
+ HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid));
+ return -EPERM;
+ }
+
+ if (ni->ni_lnd->lnd_accept == NULL) {
+ lnet_ni_decref(ni);
+ LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: "
+ " NI doesn not accept IP connections\n",
+ HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid));
+ return -EPERM;
+ }
+
+ CDEBUG(D_NET, "Accept %s from %u.%u.%u.%u\n",
+ libcfs_nid2str(cr.acr_nid), HIPQUAD(peer_ip));
+
+ rc = ni->ni_lnd->lnd_accept(ni, sock);
+
+ lnet_ni_decref(ni);
+ return rc;
+}
+
+int
+lnet_acceptor(void *arg)
+{
+ char name[16];
+ int secure = (int)((unsigned long)arg);
+ int rc;
+ int newsock;
+ __u32 peer_ip;
+ int peer_port;
+ __u32 magic;
+
+ snprintf(name, sizeof(name), "acceptor_%03d", accept_port);
+ cfs_daemonize(name);
+ cfs_block_allsigs();
+
+ rc = libcfs_sock_listen(&lnet_acceptor_state.pta_sock,
+ 0, accept_port, accept_backlog);
+ if (rc != 0) {
+ if (rc == -EADDRINUSE)
+ LCONSOLE_ERROR("Can't start acceptor on port %d: "
+ "port already in use\n",
+ accept_port);
+ else
+ LCONSOLE_ERROR("Can't start acceptor on port %d: "
+ "unexpected error %d\n",
+ accept_port, rc);
+
+ } else {
+ LCONSOLE(0, "Accept %s, port %d\n", accept_type, accept_port);
+ }
+
+ /* set init status and unblock parent */
+ lnet_acceptor_state.pta_shutdown = rc;
+ cfs_complete(&lnet_acceptor_state.pta_completion);
+
+ if (rc != 0)
+ return rc;
+
+ while (lnet_acceptor_state.pta_shutdown == 0) {
+
+ rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock,
+ &peer_ip, &peer_port);
+ if (rc != 0)
+ continue;
+
+ /* maybe we're waken up with libcfs_sock_abort_accept() */
+ if ( lnet_acceptor_state.pta_shutdown ) {
+ close(newsock);
+ break;
+ }
+
+ if (secure && peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
+ CERROR("Refusing connection from %u.%u.%u.%u: "
+ "insecure port %d\n",
+ HIPQUAD(peer_ip), peer_port);
+ goto failed;
+ }
+
+ rc = libcfs_sock_read(newsock, &magic, sizeof(magic),
+ accept_timeout);
+ if (rc != 0) {
+ CERROR("Error %d reading connection request from "
+ "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
+ goto failed;
+ }
+
+ rc = lnet_accept(newsock, magic, peer_ip, peer_port);
+ if (rc != 0)
+ goto failed;
+
+ continue;
+
+ failed:
+ close(newsock);
+ }
+
+ close(lnet_acceptor_state.pta_sock);
+ LCONSOLE(0,"Acceptor stopping\n");
+
+ /* unblock lnet_acceptor_stop() */
+ cfs_complete(&lnet_acceptor_state.pta_completion);
+
+ return 0;
+}
+
+static int skip_waiting_for_completion;
int
lnet_acceptor_start(void)
{
- return 0;
+ long secure;
+ int rc;
+
+ /* Do nothing if we're liblustre clients */
+ if (the_lnet.ln_pid & LNET_PID_USERFLAG)
+ return 0;
+
+ rc = lnet_get_tunables();
+ if ( rc !=0 )
+ return rc;
+
+ cfs_init_completion(&lnet_acceptor_state.pta_completion);
+
+ if (!strcmp(accept_type, "secure")) {
+ secure = 1;
+ } else if (!strcmp(accept_type, "all")) {
+ secure = 0;
+ } else if (!strcmp(accept_type, "none")) {
+ return 0;
+ } else {
+ LCONSOLE_ERROR ("Can't parse 'accept_type=\"%s\"'\n", accept_type);
+ cfs_fini_completion(&lnet_acceptor_state.pta_completion);
+ return -EINVAL;
+ }
+
+ if (lnet_count_acceptor_nis(NULL) == 0) { /* not required */
+ skip_waiting_for_completion = 1;
+ return 0;
+ }
+
+ rc = cfs_create_thread(lnet_acceptor, (void *)secure);
+ if (rc) {
+ CERROR("Can't start acceptor thread: %d\n", rc);
+ cfs_fini_completion(&lnet_acceptor_state.pta_completion);
+ return rc;
+ }
+
+ /* wait for acceptor to startup */
+ cfs_wait_for_completion(&lnet_acceptor_state.pta_completion);
+
+ if (lnet_acceptor_state.pta_shutdown == 0)
+ return 0;
+
+ cfs_fini_completion(&lnet_acceptor_state.pta_completion);
+ return -ENETDOWN;
}
void
lnet_acceptor_stop(void)
{
+ /* Do nothing if we're liblustre clients */
+ if (the_lnet.ln_pid & LNET_PID_USERFLAG)
+ return;
+
+ if ( !skip_waiting_for_completion ) {
+ lnet_acceptor_state.pta_shutdown = 1;
+ libcfs_sock_abort_accept(accept_port);
+
+ /* block until acceptor signals exit */
+ cfs_wait_for_completion(&lnet_acceptor_state.pta_completion);
+ }
+
+ cfs_fini_completion(&lnet_acceptor_state.pta_completion);
+}
+#else
+int
+lnet_acceptor_start(void)
+{
+ return 0;
}
+void
+lnet_acceptor_stop(void)
+{
+}
+#endif /* !HAVE_LIBPTHREAD */
#endif /* !__KERNEL__ */
LASSERT (list_empty(&the_lnet.ln_finalizeq));
}
+#ifndef __KERNEL__
+/* Temporary workaround to allow uOSS and test programs force server
+ * mode in userspace. See comments near ln_server_mode_flag in
+ * lnet/lib-types.h */
+
+void
+lnet_server_mode() {
+ the_lnet.ln_server_mode_flag = 1;
+}
+#endif
+
int
lnet_prepare(lnet_pid_t requested_pid)
{
LASSERT ((requested_pid & LNET_PID_USERFLAG) == 0);
the_lnet.ln_pid = requested_pid;
#else
- /* My PID must be unique on this node and flag I'm userspace */
- the_lnet.ln_pid = getpid() | LNET_PID_USERFLAG;
+ if (the_lnet.ln_server_mode_flag) {/* server case (uOSS) */
+ LASSERT ((requested_pid & LNET_PID_USERFLAG) == 0);
+
+ if (cfs_curproc_uid())/* Only root can run user-space server */
+ return -EPERM;
+ the_lnet.ln_pid = requested_pid;
+
+ } else {/* client case (liblustre) */
+
+ /* My PID must be unique on this node and flag I'm userspace */
+ the_lnet.ln_pid = getpid() | LNET_PID_USERFLAG;
+ }
#endif
rc = lnet_descriptor_setup();
* *first_ni so the acceptor can pass it connections "blind" to retain
* binary compatibility. */
int count = 0;
-#ifdef __KERNEL__
struct list_head *tmp;
lnet_ni_t *ni;
}
LNET_UNLOCK();
-#endif
return count;
}
#ifdef __KERNEL__
rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
#else
- rc = LNetNIInit(getpid());
+ rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);
#endif
if (rc < 0) {
CERROR ("LNetNIInit() has failed: %d\n", rc);
endif
endif
-noinst_HEADERS = pqtimer.h dispatch.h table.h timer.h \
- connection.h bridge.h procbridge.h
-libsocklnd_a_SOURCES = pqtimer.c select.c table.c pqtimer.h \
- dispatch.h table.h timer.h procapi.c proclib.c \
- connection.c tcplnd.c connection.h
+noinst_HEADERS = usocklnd.h
+libsocklnd_a_SOURCES = usocklnd.h usocklnd.c usocklnd_cb.c poll.c \
+ handlers.c conn.c
libsocklnd_a_CPPFLAGS = $(LLCPPFLAGS)
libsocklnd_a_CFLAGS = $(LLCFLAGS)
+++ /dev/null
-This library implements two NAL interfaces, both running over IP.
-The first, tcpnal, creates TCP connections between participating
-processes in order to transport the portals requests. The second,
-ernal, provides a simple transport protocol which runs over
-UDP datagrams.
-
-The interface functions return both of these values in host order for
-convenience and readability. However this means that addresses
-exchanged in messages between hosts of different orderings will not
-function properly.
-
-Both NALs use the same support functions in order to schedule events
-and communicate with the generic portals implementation.
-
- -------------------------
- | api |
- |_______________________|
- | lib |
- |_______________________|
- | ernal | |tcpnal |
- |--------| |----------|
- | udpsock| |connection|
- |-----------------------|
- | timer/select |
- -------------------------
-
-
- These NALs uses the framework from fdnal of a pipe between the api
-and library sides. This is wrapped up in the select on the library
-side, and blocks on the api side. Performance could be severely
-enhanced by collapsing this aritificial barrier, by using shared
-memory queues, or by wiring the api layer directly to the library.
-
-
-nid is defined as the low order 24-bits of the IP address of the
-physical node left shifted by 8 plus a virtual node number of 0
-through 255 (really only 239). The virtual node number of a tcpnal
-application should be specified using the environment variable
-PTL_VIRTNODE. pid is now a completely arbitrary number in the
-range of 0 to 255. The IP interface used can be overridden by
-specifying the appropriate hostid by setting the PTL_HOSTID
-environment variable. The value can be either dotted decimal
-(n.n.n.n) or hex starting with "0x".
-TCPNAL:
- As the NAL needs to try to send to a particular nid/pid pair, it
- will open up connections on demand. Because the port associated with
- the connecting socket is different from the bound port, two
- connections will normally be established between a pair of peers, with
- data flowing from the anonymous connect (active) port to the advertised
- or well-known bound (passive) port of each peer.
-
- Should the connection fail to open, an error is reported to the
- library component, which causes the api request to fail.
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
- */
-
-#ifndef TCPNAL_PROCBRIDGE_H
-#define TCPNAL_PROCBRIDGE_H
-
-#include <lnet/lib-lnet.h>
-
-typedef struct bridge {
- int alive;
- lnet_ni_t *b_ni;
- void *lower;
- void *local;
- /* this doesn't really belong here */
- unsigned char iptop8;
-} *bridge;
-
-#endif
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Author: Maxim Patlasov <maxim@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ */
+
+#include "usocklnd.h"
+
+/* Return 1 if the conn is timed out, 0 else */
+int
+usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time)
+{
+ if (conn->uc_tx_flag && /* sending is in progress */
+ cfs_time_aftereq(current_time, conn->uc_tx_deadline))
+ return 1;
+
+ if (conn->uc_rx_flag && /* receiving is in progress */
+ cfs_time_aftereq(current_time, conn->uc_rx_deadline))
+ return 1;
+
+ return 0;
+}
+
+void
+usocklnd_conn_kill(usock_conn_t *conn)
+{
+ pthread_mutex_lock(&conn->uc_lock);
+ if (conn->uc_state != UC_DEAD)
+ usocklnd_conn_kill_locked(conn);
+ pthread_mutex_unlock(&conn->uc_lock);
+}
+
+/* Mark the conn as DEAD and schedule its deletion */
+void
+usocklnd_conn_kill_locked(usock_conn_t *conn)
+{
+ conn->uc_rx_flag = conn->uc_tx_flag = 0;
+ conn->uc_state = UC_DEAD;
+ usocklnd_add_killrequest(conn);
+}
+
+usock_conn_t *
+usocklnd_conn_allocate()
+{
+ usock_conn_t *conn;
+ usock_pollrequest_t *pr;
+
+ LIBCFS_ALLOC (pr, sizeof(*pr));
+ if (pr == NULL)
+ return NULL;
+
+ LIBCFS_ALLOC (conn, sizeof(*conn));
+ if (conn == NULL) {
+ LIBCFS_FREE (pr, sizeof(*pr));
+ return NULL;
+ }
+ memset(conn, 0, sizeof(*conn));
+ conn->uc_preq = pr;
+
+ LIBCFS_ALLOC (conn->uc_rx_hello,
+ offsetof(ksock_hello_msg_t,
+ kshm_ips[LNET_MAX_INTERFACES]));
+ if (conn->uc_rx_hello == NULL) {
+ LIBCFS_FREE (pr, sizeof(*pr));
+ LIBCFS_FREE (conn, sizeof(*conn));
+ return NULL;
+ }
+
+ return conn;
+}
+
+void
+usocklnd_conn_free(usock_conn_t *conn)
+{
+ usock_pollrequest_t *pr = conn->uc_preq;
+
+ if (pr != NULL)
+ LIBCFS_FREE (pr, sizeof(*pr));
+
+ if (conn->uc_rx_hello != NULL)
+ LIBCFS_FREE (conn->uc_rx_hello,
+ offsetof(ksock_hello_msg_t,
+ kshm_ips[LNET_MAX_INTERFACES]));
+
+ LIBCFS_FREE (conn, sizeof(*conn));
+}
+
+void
+usocklnd_tear_peer_conn(usock_conn_t *conn)
+{
+ usock_peer_t *peer = conn->uc_peer;
+ int idx = usocklnd_type2idx(conn->uc_type);
+ lnet_ni_t *ni;
+ lnet_process_id_t id;
+ int decref_flag = 0;
+ int killall_flag = 0;
+
+ if (peer == NULL) /* nothing to tear */
+ return;
+
+ pthread_mutex_lock(&peer->up_lock);
+ pthread_mutex_lock(&conn->uc_lock);
+
+ ni = peer->up_ni;
+ id = peer->up_peerid;
+
+ if (peer->up_conns[idx] == conn) {
+ if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
+ /* change state not to finalize twice */
+ conn->uc_rx_state = UC_RX_KSM_HEADER;
+ lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
+ }
+
+ usocklnd_destroy_txlist(peer->up_ni,
+ &conn->uc_tx_list);
+
+ peer->up_conns[idx] = NULL;
+ conn->uc_peer = NULL;
+ decref_flag = 1;
+
+ if(conn->uc_errored && !peer->up_errored)
+ peer->up_errored = killall_flag = 1;
+ }
+
+ pthread_mutex_unlock(&conn->uc_lock);
+
+ if (killall_flag)
+ usocklnd_del_conns_locked(peer);
+
+ pthread_mutex_unlock(&peer->up_lock);
+
+ if (!decref_flag)
+ return;
+
+ usocklnd_conn_decref(conn);
+ usocklnd_peer_decref(peer);
+
+ usocklnd_check_peer_stale(ni, id);
+}
+
+/* Remove peer from hash list if all up_conns[i] is NULL &&
+ * hash table is the only consumer of the peer */
+void
+usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
+{
+ usock_peer_t *peer;
+
+ pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
+ peer = usocklnd_find_peer_locked(ni, id);
+
+ if (peer == NULL) {
+ pthread_rwlock_unlock(&usock_data.ud_peers_lock);
+ return;
+ }
+
+ if (cfs_atomic_read(&peer->up_refcount) == 2) {
+ int i;
+ for (i = 0; i < N_CONN_TYPES; i++)
+ LASSERT (peer->up_conns[i] == NULL);
+
+ list_del(&peer->up_list);
+
+ if (peer->up_errored &&
+ (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
+ lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
+ cfs_time_seconds(peer->up_last_alive));
+
+ usocklnd_peer_decref(peer);
+ }
+
+ usocklnd_peer_decref(peer);
+ pthread_rwlock_unlock(&usock_data.ud_peers_lock);
+}
+
+/* Returns 0 on success, <0 else */
+int
+usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
+{
+ int rc;
+ __u32 peer_ip;
+ __u16 peer_port;
+ usock_conn_t *conn;
+
+ rc = libcfs_getpeername(fd, &peer_ip, &peer_port);
+ if (rc)
+ return rc;
+
+ rc = usocklnd_set_sock_options(fd);
+ if (rc)
+ return rc;
+
+ conn = usocklnd_conn_allocate();
+ if (conn == NULL)
+ return -ENOMEM;
+
+ usocklnd_rx_hellomagic_state_transition(conn);
+
+ conn->uc_fd = fd;
+ conn->uc_peer_ip = peer_ip;
+ conn->uc_peer_port = peer_port;
+ conn->uc_state = UC_RECEIVING_HELLO;
+ conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);
+ conn->uc_ni = ni;
+ CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
+ CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
+ pthread_mutex_init(&conn->uc_lock, NULL);
+ cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
+
+ *connp = conn;
+ return 0;
+}
+
+/* Returns 0 on success, <0 else */
+int
+usocklnd_create_active_conn(usock_peer_t *peer, int type,
+ usock_conn_t **connp)
+{
+ int rc;
+ int fd;
+ usock_conn_t *conn;
+ __u32 dst_ip = LNET_NIDADDR(peer->up_peerid.nid);
+ __u16 dst_port = lnet_acceptor_port();
+
+ conn = usocklnd_conn_allocate();
+ if (conn == NULL)
+ return -ENOMEM;
+
+ conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,
+ peer->up_peerid.nid);
+ if (conn->uc_tx_hello == NULL) {
+ usocklnd_conn_free(conn);
+ return -ENOMEM;
+ }
+
+ if (the_lnet.ln_pid & LNET_PID_USERFLAG)
+ rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);
+ else
+ rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);
+
+ if (rc) {
+ usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
+ usocklnd_conn_free(conn);
+ return rc;
+ }
+
+ conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
+ conn->uc_tx_flag = 1;
+
+ conn->uc_fd = fd;
+ conn->uc_peer_ip = dst_ip;
+ conn->uc_peer_port = dst_port;
+ conn->uc_type = type;
+ conn->uc_activeflag = 1;
+ conn->uc_state = UC_CONNECTING;
+ conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
+ conn->uc_ni = NULL;
+ conn->uc_peerid = peer->up_peerid;
+ conn->uc_peer = peer;
+ usocklnd_peer_addref(peer);
+ CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
+ CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
+ pthread_mutex_init(&conn->uc_lock, NULL);
+ cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */
+
+ *connp = conn;
+ return 0;
+}
+
+/* Returns 0 on success, <0 else */
+int
+usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
+{
+ __u16 port;
+ int fd;
+ int rc;
+
+ for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
+ port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
+ port--) {
+ /* Iterate through reserved ports. */
+
+ rc = libcfs_sock_create(&fd);
+ if (rc)
+ return rc;
+
+ rc = libcfs_sock_bind_to_port(fd, port);
+ if (rc) {
+ close(fd);
+ continue;
+ }
+
+ rc = usocklnd_set_sock_options(fd);
+ if (rc) {
+ close(fd);
+ return rc;
+ }
+
+ rc = libcfs_sock_connect(fd, dst_ip, dst_port);
+ if (rc == 0) {
+ *fdp = fd;
+ return 0;
+ }
+
+ if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
+ close(fd);
+ return rc;
+ }
+
+ close(fd);
+ }
+
+ CERROR("Can't bind to any reserved port\n");
+ return rc;
+}
+
+/* Returns 0 on success, <0 else */
+int
+usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
+{
+ int fd;
+ int rc;
+
+ rc = libcfs_sock_create(&fd);
+ if (rc)
+ return rc;
+
+ rc = usocklnd_set_sock_options(fd);
+ if (rc) {
+ close(fd);
+ return rc;
+ }
+
+ rc = libcfs_sock_connect(fd, dst_ip, dst_port);
+ if (rc) {
+ close(fd);
+ return rc;
+ }
+
+ *fdp = fd;
+ return 0;
+}
+
+int
+usocklnd_set_sock_options(int fd)
+{
+ int rc;
+
+ rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);
+ if (rc)
+ return rc;
+
+ if (usock_tuns.ut_sockbufsiz) {
+ rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);
+ if (rc)
+ return rc;
+ }
+
+ return libcfs_fcntl_nonblock(fd);
+}
+
+void
+usocklnd_init_msg(ksock_msg_t *msg, int type)
+{
+ msg->ksm_type = type;
+ msg->ksm_csum = 0;
+ msg->ksm_zc_req_cookie = 0;
+ msg->ksm_zc_ack_cookie = 0;
+}
+
+usock_tx_t *
+usocklnd_create_noop_tx(__u64 cookie)
+{
+ usock_tx_t *tx;
+
+ LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
+ if (tx == NULL)
+ return NULL;
+
+ tx->tx_size = sizeof(usock_tx_t);
+ tx->tx_lnetmsg = NULL;
+
+ usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
+ tx->tx_msg.ksm_zc_ack_cookie = cookie;
+
+ tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
+ tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
+ offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
+ tx->tx_iov = tx->tx_iova;
+ tx->tx_niov = 1;
+
+ return tx;
+}
+
+usock_tx_t *
+usocklnd_create_tx(lnet_msg_t *lntmsg)
+{
+ usock_tx_t *tx;
+ unsigned int payload_niov = lntmsg->msg_niov;
+ struct iovec *payload_iov = lntmsg->msg_iov;
+ unsigned int payload_offset = lntmsg->msg_offset;
+ unsigned int payload_nob = lntmsg->msg_len;
+ int size = offsetof(usock_tx_t,
+ tx_iova[1 + payload_niov]);
+
+ LIBCFS_ALLOC (tx, size);
+ if (tx == NULL)
+ return NULL;
+
+ tx->tx_size = size;
+ tx->tx_lnetmsg = lntmsg;
+
+ tx->tx_resid = tx->tx_nob =
+ offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
+ payload_nob;
+
+ usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
+ tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
+ tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
+ tx->tx_iova[0].iov_len = offsetof(ksock_msg_t,
+ ksm_u.lnetmsg.ksnm_payload);
+ tx->tx_iov = tx->tx_iova;
+
+ tx->tx_niov = 1 +
+ lnet_extract_iov(payload_niov, &tx->tx_iov[1],
+ payload_niov, payload_iov,
+ payload_offset, payload_nob);
+
+ return tx;
+}
+
+void
+usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
+ lnet_ni_t *ni, int type, lnet_nid_t peer_nid)
+{
+ usock_net_t *net = (usock_net_t *)ni->ni_data;
+
+ hello->kshm_magic = LNET_PROTO_MAGIC;
+ hello->kshm_version = KSOCK_PROTO_V2;
+ hello->kshm_nips = 0;
+ hello->kshm_ctype = type;
+
+ hello->kshm_dst_incarnation = 0; /* not used */
+ hello->kshm_src_incarnation = net->un_incarnation;
+
+ hello->kshm_src_pid = the_lnet.ln_pid;
+ hello->kshm_src_nid = ni->ni_nid;
+ hello->kshm_dst_nid = peer_nid;
+ hello->kshm_dst_pid = 0; /* not used */
+}
+
+usock_tx_t *
+usocklnd_create_hello_tx(lnet_ni_t *ni,
+ int type, lnet_nid_t peer_nid)
+{
+ usock_tx_t *tx;
+ int size;
+ ksock_hello_msg_t *hello;
+
+ size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);
+ LIBCFS_ALLOC (tx, size);
+ if (tx == NULL)
+ return NULL;
+
+ tx->tx_size = size;
+ tx->tx_lnetmsg = NULL;
+
+ hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
+ usocklnd_init_hello_msg(hello, ni, type, peer_nid);
+
+ tx->tx_iova[0].iov_base = (void *)hello;
+ tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
+ offsetof(ksock_hello_msg_t, kshm_ips);
+ tx->tx_iov = tx->tx_iova;
+ tx->tx_niov = 1;
+
+ return tx;
+}
+
+usock_tx_t *
+usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
+ int type, lnet_nid_t peer_nid)
+{
+ usock_tx_t *tx;
+ int size;
+ lnet_acceptor_connreq_t *cr;
+ ksock_hello_msg_t *hello;
+
+ size = sizeof(usock_tx_t) +
+ sizeof(lnet_acceptor_connreq_t) +
+ offsetof(ksock_hello_msg_t, kshm_ips);
+ LIBCFS_ALLOC (tx, size);
+ if (tx == NULL)
+ return NULL;
+
+ tx->tx_size = size;
+ tx->tx_lnetmsg = NULL;
+
+ cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];
+ memset(cr, 0, sizeof(*cr));
+ cr->acr_magic = LNET_PROTO_ACCEPTOR_MAGIC;
+ cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
+ cr->acr_nid = peer_nid;
+
+ hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
+ usocklnd_init_hello_msg(hello, ni, type, peer_nid);
+
+ tx->tx_iova[0].iov_base = (void *)cr;
+ tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
+ sizeof(lnet_acceptor_connreq_t) +
+ offsetof(ksock_hello_msg_t, kshm_ips);
+ tx->tx_iov = tx->tx_iova;
+ tx->tx_niov = 1;
+
+ return tx;
+}
+
+void
+usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx)
+{
+ lnet_msg_t *lnetmsg = tx->tx_lnetmsg;
+ int rc = (tx->tx_resid == 0) ? 0 : -EIO;
+
+ LASSERT (ni != NULL || lnetmsg == NULL);
+
+ LIBCFS_FREE (tx, tx->tx_size);
+
+ if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
+ lnet_finalize(ni, lnetmsg, rc);
+}
+
+void
+usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist)
+{
+ usock_tx_t *tx;
+
+ while (!list_empty(txlist)) {
+ tx = list_entry(txlist->next, usock_tx_t, tx_list);
+ list_del(&tx->tx_list);
+
+ usocklnd_destroy_tx(ni, tx);
+ }
+}
+
+void
+usocklnd_destroy_zcack_list(struct list_head *zcack_list)
+{
+ usock_zc_ack_t *zcack;
+
+ while (!list_empty(zcack_list)) {
+ zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
+ list_del(&zcack->zc_list);
+
+ LIBCFS_FREE (zcack, sizeof(*zcack));
+ }
+}
+
+void
+usocklnd_destroy_peer(usock_peer_t *peer)
+{
+ usock_net_t *net = peer->up_ni->ni_data;
+ int i;
+
+ for (i = 0; i < N_CONN_TYPES; i++)
+ LASSERT (peer->up_conns[i] == NULL);
+
+ LIBCFS_FREE (peer, sizeof (*peer));
+
+ pthread_mutex_lock(&net->un_lock);
+ if(--net->un_peercount == 0)
+ pthread_cond_signal(&net->un_cond);
+ pthread_mutex_unlock(&net->un_lock);
+}
+
+void
+usocklnd_destroy_conn(usock_conn_t *conn)
+{
+ LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);
+
+ if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
+ LASSERT (conn->uc_peer != NULL);
+ lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
+ }
+
+ if (!list_empty(&conn->uc_tx_list)) {
+ LASSERT (conn->uc_peer != NULL);
+ usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
+ }
+
+ usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
+
+ if (conn->uc_peer != NULL)
+ usocklnd_peer_decref(conn->uc_peer);
+
+ if (conn->uc_ni != NULL)
+ lnet_ni_decref(conn->uc_ni);
+
+ if (conn->uc_tx_hello)
+ usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
+
+ usocklnd_conn_free(conn);
+}
+
+int
+usocklnd_get_conn_type(lnet_msg_t *lntmsg)
+{
+ int nob;
+
+ if (the_lnet.ln_pid & LNET_PID_USERFLAG)
+ return SOCKLND_CONN_ANY;
+
+ nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +
+ lntmsg->msg_len;
+
+ if (nob >= usock_tuns.ut_min_bulk)
+ return SOCKLND_CONN_BULK_OUT;
+ else
+ return SOCKLND_CONN_CONTROL;
+}
+
+int usocklnd_type2idx(int type)
+{
+ switch (type) {
+ case SOCKLND_CONN_ANY:
+ case SOCKLND_CONN_CONTROL:
+ return 0;
+ case SOCKLND_CONN_BULK_IN:
+ return 1;
+ case SOCKLND_CONN_BULK_OUT:
+ return 2;
+ default:
+ LBUG();
+ }
+}
+
+usock_peer_t *
+usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id)
+{
+ struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);
+ struct list_head *tmp;
+ usock_peer_t *peer;
+
+ list_for_each (tmp, peer_list) {
+
+ peer = list_entry (tmp, usock_peer_t, up_list);
+
+ if (peer->up_ni != ni)
+ continue;
+
+ if (peer->up_peerid.nid != id.nid ||
+ peer->up_peerid.pid != id.pid)
+ continue;
+
+ usocklnd_peer_addref(peer);
+ return peer;
+ }
+ return (NULL);
+}
+
+int
+usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
+ usock_peer_t **peerp)
+{
+ usock_net_t *net = ni->ni_data;
+ usock_peer_t *peer;
+ int i;
+
+ LIBCFS_ALLOC (peer, sizeof (*peer));
+ if (peer == NULL)
+ return -ENOMEM;
+
+ for (i = 0; i < N_CONN_TYPES; i++)
+ peer->up_conns[i] = NULL;
+
+ peer->up_peerid = id;
+ peer->up_ni = ni;
+ peer->up_incrn_is_set = 0;
+ peer->up_errored = 0;
+ peer->up_last_alive = 0;
+ cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
+ pthread_mutex_init(&peer->up_lock, NULL);
+
+ pthread_mutex_lock(&net->un_lock);
+ net->un_peercount++;
+ pthread_mutex_unlock(&net->un_lock);
+
+ *peerp = peer;
+ return 0;
+}
+
+/* Safely create new peer if needed. Save result in *peerp.
+ * Returns 0 on success, <0 else */
+int
+usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
+ usock_peer_t **peerp)
+{
+ int rc;
+ usock_peer_t *peer;
+ usock_peer_t *peer2;
+ usock_net_t *net = ni->ni_data;
+
+ pthread_rwlock_rdlock(&usock_data.ud_peers_lock);
+ peer = usocklnd_find_peer_locked(ni, id);
+ pthread_rwlock_unlock(&usock_data.ud_peers_lock);
+
+ if (peer != NULL)
+ goto find_or_create_peer_done;
+
+ rc = usocklnd_create_peer(ni, id, &peer);
+ if (rc)
+ return rc;
+
+ pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
+ peer2 = usocklnd_find_peer_locked(ni, id);
+ if (peer2 == NULL) {
+ if (net->un_shutdown) {
+ pthread_rwlock_unlock(&usock_data.ud_peers_lock);
+ usocklnd_peer_decref(peer); /* should destroy peer */
+ CERROR("Can't create peer: network shutdown\n");
+ return -ESHUTDOWN;
+ }
+
+ /* peer table will take 1 of my refs on peer */
+ usocklnd_peer_addref(peer);
+ list_add_tail (&peer->up_list,
+ usocklnd_nid2peerlist(id.nid));
+ } else {
+ usocklnd_peer_decref(peer); /* should destroy peer */
+ peer = peer2;
+ }
+ pthread_rwlock_unlock(&usock_data.ud_peers_lock);
+
+ find_or_create_peer_done:
+ *peerp = peer;
+ return 0;
+}
+
+/* NB: both peer and conn locks are held */
+static int
+usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
+{
+ if (conn->uc_state == UC_READY &&
+ list_empty(&conn->uc_tx_list) &&
+ list_empty(&conn->uc_zcack_list) &&
+ !conn->uc_sending) {
+ int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,
+ POLLOUT);
+ if (rc != 0)
+ return rc;
+ }
+
+ list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
+ return 0;
+}
+
+/* NB: both peer and conn locks are held
+ * NB: if sending isn't in progress. the caller *MUST* send tx
+ * immediately after we'll return */
+static void
+usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
+ int *send_immediately)
+{
+ if (conn->uc_state == UC_READY &&
+ list_empty(&conn->uc_tx_list) &&
+ list_empty(&conn->uc_zcack_list) &&
+ !conn->uc_sending) {
+ conn->uc_sending = 1;
+ *send_immediately = 1;
+ return;
+ }
+
+ *send_immediately = 0;
+ list_add_tail(&tx->tx_list, &conn->uc_tx_list);
+}
+
+/* Safely create new conn if needed. Save result in *connp.
+ * Returns 0 on success, <0 else */
+int
+usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
+ usock_conn_t **connp,
+ usock_tx_t *tx, usock_zc_ack_t *zc_ack,
+ int *send_immediately)
+{
+ usock_conn_t *conn;
+ int idx;
+ int rc;
+ lnet_pid_t userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
+
+ if (userflag)
+ type = SOCKLND_CONN_ANY;
+
+ idx = usocklnd_type2idx(type);
+
+ pthread_mutex_lock(&peer->up_lock);
+ if (peer->up_conns[idx] != NULL) {
+ conn = peer->up_conns[idx];
+ LASSERT(conn->uc_type == type);
+ } else {
+ if (userflag) {
+ CERROR("Refusing to create a connection to "
+ "userspace process %s\n",
+ libcfs_id2str(peer->up_peerid));
+ rc = -EHOSTUNREACH;
+ goto find_or_create_conn_failed;
+ }
+
+ rc = usocklnd_create_active_conn(peer, type, &conn);
+ if (rc) {
+ peer->up_errored = 1;
+ usocklnd_del_conns_locked(peer);
+ goto find_or_create_conn_failed;
+ }
+
+ /* peer takes 1 of conn refcount */
+ usocklnd_link_conn_to_peer(conn, peer, idx);
+
+ rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
+ if (rc) {
+ peer->up_conns[idx] = NULL;
+ usocklnd_conn_decref(conn); /* should destroy conn */
+ goto find_or_create_conn_failed;
+ }
+ usocklnd_wakeup_pollthread(conn->uc_pt_idx);
+ }
+
+ pthread_mutex_lock(&conn->uc_lock);
+ LASSERT(conn->uc_peer == peer);
+
+ LASSERT(tx == NULL || zc_ack == NULL);
+ if (tx != NULL) {
+ usocklnd_enqueue_tx(conn, tx, send_immediately);
+ } else {
+ rc = usocklnd_enqueue_zcack(conn, zc_ack);
+ if (rc != 0) {
+ usocklnd_conn_kill_locked(conn);
+ pthread_mutex_unlock(&conn->uc_lock);
+ goto find_or_create_conn_failed;
+ }
+ }
+ pthread_mutex_unlock(&conn->uc_lock);
+
+ usocklnd_conn_addref(conn);
+ pthread_mutex_unlock(&peer->up_lock);
+
+ *connp = conn;
+ return 0;
+
+ find_or_create_conn_failed:
+ pthread_mutex_unlock(&peer->up_lock);
+ return rc;
+}
+
+void
+usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
+{
+ peer->up_conns[idx] = conn;
+ peer->up_errored = 0; /* this new fresh conn will try
+ * revitalize even stale errored peer */
+}
+
+int
+usocklnd_invert_type(int type)
+{
+ switch (type)
+ {
+ case SOCKLND_CONN_ANY:
+ case SOCKLND_CONN_CONTROL:
+ return (type);
+ case SOCKLND_CONN_BULK_IN:
+ return SOCKLND_CONN_BULK_OUT;
+ case SOCKLND_CONN_BULK_OUT:
+ return SOCKLND_CONN_BULK_IN;
+ default:
+ return SOCKLND_CONN_NONE;
+ }
+}
+
+void
+usocklnd_conn_new_state(usock_conn_t *conn, int new_state)
+{
+ pthread_mutex_lock(&conn->uc_lock);
+ if (conn->uc_state != UC_DEAD)
+ conn->uc_state = new_state;
+ pthread_mutex_unlock(&conn->uc_lock);
+}
+
+/* NB: peer is locked by caller */
+void
+usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
+ usock_conn_t *skip_conn)
+{
+ int i;
+
+ if (!peer->up_incrn_is_set) {
+ peer->up_incarnation = incrn;
+ peer->up_incrn_is_set = 1;
+ return;
+ }
+
+ if (peer->up_incarnation == incrn)
+ return;
+
+ peer->up_incarnation = incrn;
+
+ for (i = 0; i < N_CONN_TYPES; i++) {
+ usock_conn_t *conn = peer->up_conns[i];
+
+ if (conn == NULL || conn == skip_conn)
+ continue;
+
+ pthread_mutex_lock(&conn->uc_lock);
+ LASSERT (conn->uc_peer == peer);
+ conn->uc_peer = NULL;
+ peer->up_conns[i] = NULL;
+ if (conn->uc_state != UC_DEAD)
+ usocklnd_conn_kill_locked(conn);
+ pthread_mutex_unlock(&conn->uc_lock);
+
+ usocklnd_conn_decref(conn);
+ usocklnd_peer_decref(peer);
+ }
+}
+
+/* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive
+ * MAGIC part of hello and set uc_rx_state
+ */
+void
+usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn)
+{
+ LASSERT(conn->uc_rx_hello != NULL);
+
+ conn->uc_rx_niov = 1;
+ conn->uc_rx_iov = conn->uc_rx_iova;
+ conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;
+ conn->uc_rx_iov[0].iov_len =
+ conn->uc_rx_nob_wanted =
+ conn->uc_rx_nob_left =
+ sizeof(conn->uc_rx_hello->kshm_magic);
+
+ conn->uc_rx_state = UC_RX_HELLO_MAGIC;
+
+ conn->uc_rx_flag = 1; /* waiting for incoming hello */
+ conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
+}
+
+/* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive
+ * VERSION part of hello and set uc_rx_state
+ */
+void
+usocklnd_rx_helloversion_state_transition(usock_conn_t *conn)
+{
+ LASSERT(conn->uc_rx_hello != NULL);
+
+ conn->uc_rx_niov = 1;
+ conn->uc_rx_iov = conn->uc_rx_iova;
+ conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;
+ conn->uc_rx_iov[0].iov_len =
+ conn->uc_rx_nob_wanted =
+ conn->uc_rx_nob_left =
+ sizeof(conn->uc_rx_hello->kshm_version);
+
+ conn->uc_rx_state = UC_RX_HELLO_VERSION;
+}
+
+/* RX state transition to UC_RX_HELLO_BODY: update RX part to receive
+ * the rest of hello and set uc_rx_state
+ */
+void
+usocklnd_rx_hellobody_state_transition(usock_conn_t *conn)
+{
+ LASSERT(conn->uc_rx_hello != NULL);
+
+ conn->uc_rx_niov = 1;
+ conn->uc_rx_iov = conn->uc_rx_iova;
+ conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;
+ conn->uc_rx_iov[0].iov_len =
+ conn->uc_rx_nob_wanted =
+ conn->uc_rx_nob_left =
+ offsetof(ksock_hello_msg_t, kshm_ips) -
+ offsetof(ksock_hello_msg_t, kshm_src_nid);
+
+ conn->uc_rx_state = UC_RX_HELLO_BODY;
+}
+
+/* RX state transition to UC_RX_HELLO_IPS: update RX part to receive
+ * array of IPs and set uc_rx_state
+ */
+void
+usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn)
+{
+ LASSERT(conn->uc_rx_hello != NULL);
+
+ conn->uc_rx_niov = 1;
+ conn->uc_rx_iov = conn->uc_rx_iova;
+ conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;
+ conn->uc_rx_iov[0].iov_len =
+ conn->uc_rx_nob_wanted =
+ conn->uc_rx_nob_left =
+ conn->uc_rx_hello->kshm_nips *
+ sizeof(conn->uc_rx_hello->kshm_ips[0]);
+
+ conn->uc_rx_state = UC_RX_HELLO_IPS;
+}
+
+/* RX state transition to UC_RX_LNET_HEADER: update RX part to receive
+ * LNET header and set uc_rx_state
+ */
+void
+usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn)
+{
+ conn->uc_rx_niov = 1;
+ conn->uc_rx_iov = conn->uc_rx_iova;
+ conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
+ conn->uc_rx_iov[0].iov_len =
+ conn->uc_rx_nob_wanted =
+ conn->uc_rx_nob_left =
+ sizeof(ksock_lnet_msg_t);
+
+ conn->uc_rx_state = UC_RX_LNET_HEADER;
+ conn->uc_rx_flag = 1;
+}
+
+/* RX state transition to UC_RX_KSM_HEADER: update RX part to receive
+ * KSM header and set uc_rx_state
+ */
+void
+usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn)
+{
+ conn->uc_rx_niov = 1;
+ conn->uc_rx_iov = conn->uc_rx_iova;
+ conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
+ conn->uc_rx_iov[0].iov_len =
+ conn->uc_rx_nob_wanted =
+ conn->uc_rx_nob_left =
+ offsetof(ksock_msg_t, ksm_u);
+
+ conn->uc_rx_state = UC_RX_KSM_HEADER;
+ conn->uc_rx_flag = 0;
+}
+
+/* RX state transition to UC_RX_SKIPPING: update RX part for
+ * skipping and set uc_rx_state
+ */
+void
+usocklnd_rx_skipping_state_transition(usock_conn_t *conn)
+{
+ static char skip_buffer[4096];
+
+ int nob;
+ unsigned int niov = 0;
+ int skipped = 0;
+ int nob_to_skip = conn->uc_rx_nob_left;
+
+ LASSERT(nob_to_skip != 0);
+
+ conn->uc_rx_iov = conn->uc_rx_iova;
+
+ /* Set up to skip as much as possible now. If there's more left
+ * (ran out of iov entries) we'll get called again */
+
+ do {
+ nob = MIN (nob_to_skip, sizeof(skip_buffer));
+
+ conn->uc_rx_iov[niov].iov_base = skip_buffer;
+ conn->uc_rx_iov[niov].iov_len = nob;
+ niov++;
+ skipped += nob;
+ nob_to_skip -=nob;
+
+ } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
+ niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));
+
+ conn->uc_rx_niov = niov;
+ conn->uc_rx_nob_wanted = skipped;
+
+ conn->uc_rx_state = UC_RX_SKIPPING;
+}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- *
- * This file is part of Lustre, http://www.lustre.org.
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-/* connection.c:
- This file provides a simple stateful connection manager which
- builds tcp connections on demand and leaves them open for
- future use.
-*/
-
-#include <stdlib.h>
-#include <pqtimer.h>
-#include <dispatch.h>
-#include <table.h>
-#include <stdio.h>
-#include <stdarg.h>
-#include <string.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <lnet/types.h>
-#include <lnet/lib-types.h>
-#include <lnet/socklnd.h>
-#include <libcfs/kp30.h>
-#include <connection.h>
-#include <pthread.h>
-#include <errno.h>
-#ifndef __CYGWIN__
-#include <syscall.h>
-#endif
-
-/* tunables (via environment) */
-int tcpnal_acceptor_port = 988;
-int tcpnal_buffer_size = 0;
-int tcpnal_nagle = 0;
-
-int
-tcpnal_env_param (char *name, int *val)
-{
- char *env = getenv(name);
- int n;
-
- if (env == NULL)
- return 1;
-
- n = strlen(env); /* scanf may not assign on EOS */
- if (sscanf(env, "%i%n", val, &n) >= 1 && n == strlen(env)) {
- CDEBUG(D_INFO, "Environment variable %s set to %d\n",
- name, *val);
- return 1;
- }
-
- CERROR("Can't parse environment variable '%s=%s'\n",
- name, env);
- return 0;
-}
-
-int
-tcpnal_set_global_params (void)
-{
- return tcpnal_env_param("TCPNAL_PORT",
- &tcpnal_acceptor_port) &&
- tcpnal_env_param("TCPLND_PORT",
- &tcpnal_acceptor_port) &&
- tcpnal_env_param("TCPNAL_BUFFER_SIZE",
- &tcpnal_buffer_size) &&
- tcpnal_env_param("TCPLND_BUFFER_SIZE",
- &tcpnal_buffer_size) &&
- tcpnal_env_param("TCPNAL_NAGLE",
- &tcpnal_nagle) &&
- tcpnal_env_param("TCPLND_NAGLE",
- &tcpnal_nagle);
-}
-
-/* Function: compare_connection
- * Arguments: connection c: a connection in the hash table
- * lnet_process_id_t: an id to verify agains
- * Returns: 1 if the connection is the one requested, 0 otherwise
- *
- * compare_connection() tests for collisions in the hash table
- */
-static int compare_connection(void *arg1, void *arg2)
-{
- connection c = arg1;
- lnet_nid_t *nid = arg2;
-
- return (c->peer_nid == *nid);
-}
-
-/* Function: connection_key
- * Arguments: lnet_process_id_t id: an id to hash
- * Returns: a not-particularily-well-distributed hash
- * of the id
- */
-static unsigned int connection_key(void *arg)
-{
- lnet_nid_t *nid = arg;
-
- return (unsigned int)(*nid);
-}
-
-void
-close_connection(void *arg)
-{
- connection c = arg;
-
- close(c->fd);
- free(c);
-}
-
-/* Function: remove_connection
- * Arguments: c: the connection to remove
- */
-void remove_connection(void *arg)
-{
- connection c = arg;
-
- hash_table_remove(c->m->connections,&c->peer_nid);
- close_connection(c);
-}
-
-
-/* Function: read_connection:
- * Arguments: c: the connection to read from
- * dest: the buffer to read into
- * len: the number of bytes to read
- * Returns: success as 1, or failure as 0
- *
- * read_connection() reads data from the connection, continuing
- * to read partial results until the request is satisfied or
- * it errors. TODO: this read should be covered by signal protection.
- */
-int read_connection(connection c,
- unsigned char *dest,
- int len)
-{
- int offset = 0,rc;
-
- if (len) {
- do {
-#ifndef __CYGWIN__
- rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
-#else
- rc = recv(c->fd, dest+offset, len-offset, 0);
-#endif
- if (rc <= 0) {
- if (errno == EINTR) {
- rc = 0;
- } else {
- remove_connection(c);
- return (0);
- }
- }
- offset += rc;
- } while (offset < len);
- }
- return (1);
-}
-
-static int connection_input(void *d)
-{
- connection c = d;
- return((*c->m->handler)(c->m->handler_arg,c));
-}
-
-
-static connection
-allocate_connection(manager m,
- lnet_nid_t nid,
- int fd)
-{
- connection c=malloc(sizeof(struct connection));
-
- c->m=m;
- c->fd=fd;
- c->peer_nid = nid;
-
- register_io_handler(fd,READ_HANDLER,connection_input,c);
- hash_table_insert(m->connections,c,&nid);
- return(c);
-}
-
-int
-tcpnal_write(lnet_nid_t nid, int sockfd, void *buffer, int nob)
-{
- int rc = syscall(SYS_write, sockfd, buffer, nob);
-
- /* NB called on an 'empty' socket with huge buffering! */
- if (rc == nob)
- return 0;
-
- if (rc < 0) {
- CERROR("Failed to send to %s: %s\n",
- libcfs_nid2str(nid), strerror(errno));
- return -1;
- }
-
- CERROR("Short send to %s: %d/%d\n",
- libcfs_nid2str(nid), rc, nob);
- return -1;
-}
-
-int
-tcpnal_read(lnet_nid_t nid, int sockfd, void *buffer, int nob)
-{
- int rc;
-
- while (nob > 0) {
- rc = syscall(SYS_read, sockfd, buffer, nob);
-
- if (rc == 0) {
- CERROR("Unexpected EOF from %s\n",
- libcfs_nid2str(nid));
- return -1;
- }
-
- if (rc < 0) {
- CERROR("Failed to receive from %s: %s\n",
- libcfs_nid2str(nid), strerror(errno));
- return -1;
- }
-
- nob -= rc;
- }
- return 0;
-}
-
-int
-tcpnal_hello (int sockfd, lnet_nid_t nid)
-{
- struct timeval tv;
- __u64 incarnation;
- int rc;
- int nob;
- lnet_acceptor_connreq_t cr;
- lnet_hdr_t hdr;
- lnet_magicversion_t hmv;
-
- gettimeofday(&tv, NULL);
- incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
-
- memset(&cr, 0, sizeof(cr));
- cr.acr_magic = LNET_PROTO_ACCEPTOR_MAGIC;
- cr.acr_version = LNET_PROTO_ACCEPTOR_VERSION;
- cr.acr_nid = nid;
-
- /* hmv initialised and copied separately into hdr; compiler "optimize"
- * likely due to confusion about pointer alias of hmv and hdr when this
- * was done in-place. */
- hmv.magic = cpu_to_le32(LNET_PROTO_TCP_MAGIC);
- hmv.version_major = cpu_to_le32(LNET_PROTO_TCP_VERSION_MAJOR);
- hmv.version_minor = cpu_to_le32(LNET_PROTO_TCP_VERSION_MINOR);
-
- memset (&hdr, 0, sizeof (hdr));
-
- CLASSERT (sizeof (hmv) == sizeof (hdr.dest_nid));
- memcpy(&hdr.dest_nid, &hmv, sizeof(hmv));
-
- /* hdr.src_nid/src_pid are ignored at dest */
-
- hdr.type = cpu_to_le32(LNET_MSG_HELLO);
- hdr.msg.hello.type = cpu_to_le32(SOCKLND_CONN_ANY);
- hdr.msg.hello.incarnation = cpu_to_le64(incarnation);
-
- /* I don't send any interface info */
-
- /* Assume sufficient socket buffering for these messages... */
- rc = tcpnal_write(nid, sockfd, &cr, sizeof(cr));
- if (rc != 0)
- return -1;
-
- rc = tcpnal_write(nid, sockfd, &hdr, sizeof(hdr));
- if (rc != 0)
- return -1;
-
- rc = tcpnal_read(nid, sockfd, &hmv, sizeof(hmv));
- if (rc != 0)
- return -1;
-
- if (hmv.magic != le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
- CERROR ("Bad magic %#08x (%#08x expected) from %s\n",
- cpu_to_le32(hmv.magic), LNET_PROTO_TCP_MAGIC,
- libcfs_nid2str(nid));
- return -1;
- }
-
- if (hmv.version_major != cpu_to_le16 (LNET_PROTO_TCP_VERSION_MAJOR) ||
- hmv.version_minor != cpu_to_le16 (LNET_PROTO_TCP_VERSION_MINOR)) {
- CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
- " from %s\n",
- le16_to_cpu (hmv.version_major),
- le16_to_cpu (hmv.version_minor),
- LNET_PROTO_TCP_VERSION_MAJOR,
- LNET_PROTO_TCP_VERSION_MINOR,
- libcfs_nid2str(nid));
- return -1;
- }
-
-#if (LNET_PROTO_TCP_VERSION_MAJOR != 1)
-# error "This code only understands protocol version 1.x"
-#endif
- /* version 1 sends magic/version as the dest_nid of a 'hello' header,
- * so read the rest of it in now... */
-
- rc = tcpnal_read(nid, sockfd, ((char *)&hdr) + sizeof (hmv),
- sizeof(hdr) - sizeof(hmv));
- if (rc != 0)
- return -1;
-
- /* ...and check we got what we expected */
- if (hdr.type != cpu_to_le32 (LNET_MSG_HELLO)) {
- CERROR ("Expecting a HELLO hdr "
- " but got type %d with %d payload from %s\n",
- le32_to_cpu (hdr.type),
- le32_to_cpu (hdr.payload_length), libcfs_nid2str(nid));
- return -1;
- }
-
- if (le64_to_cpu(hdr.src_nid) == LNET_NID_ANY) {
- CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY\n");
- return -1;
- }
-
- if (nid != le64_to_cpu (hdr.src_nid)) {
- CERROR ("Connected to %s, but expecting %s\n",
- libcfs_nid2str(le64_to_cpu (hdr.src_nid)),
- libcfs_nid2str(nid));
- return -1;
- }
-
- /* Ignore any interface info in the payload */
- nob = le32_to_cpu(hdr.payload_length);
- if (nob != 0) {
- CERROR("Unexpected HELLO payload %d from %s\n",
- nob, libcfs_nid2str(nid));
- return -1;
- }
-
- return 0;
-}
-
-/* Function: force_tcp_connection
- * Arguments: t: tcpnal
- * dest: portals endpoint for the connection
- * Returns: an allocated connection structure, either
- * a pre-existing one, or a new connection
- */
-connection force_tcp_connection(manager m,
- lnet_nid_t nid,
- procbridge pb)
-{
- unsigned int ip = LNET_NIDADDR(nid);
- connection conn;
- struct sockaddr_in addr;
- struct sockaddr_in locaddr;
- int fd;
- int option;
- int rc;
- int sz;
-
- pthread_mutex_lock(&m->conn_lock);
-
- conn = hash_table_find(m->connections, &nid);
- if (conn)
- goto out;
-
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl(ip);
- addr.sin_port = htons(tcpnal_acceptor_port);
-
- memset(&locaddr, 0, sizeof(locaddr));
- locaddr.sin_family = AF_INET;
- locaddr.sin_addr.s_addr = INADDR_ANY;
- locaddr.sin_port = htons(m->port);
-
-#if 1 /* tcpnal connects from a non-privileged port */
- fd = socket(AF_INET, SOCK_STREAM, 0);
- if (fd < 0) {
- perror("tcpnal socket failed");
- goto out;
- }
-
- option = 1;
- rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
- &option, sizeof(option));
- if (rc != 0) {
- perror ("Can't set SO_REUSEADDR for socket");
- close(fd);
- goto out;
- }
-
- if (m->port != 0) {
- /* Bind all subsequent connections to the same port */
- rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
- if (rc != 0) {
- perror("Error binding port");
- close(fd);
- goto out;
- }
- }
-
- rc = connect(fd, (struct sockaddr *)&addr,
- sizeof(struct sockaddr_in));
- if (rc != 0) {
- perror("Error connecting to remote host");
- close(fd);
- goto out;
- }
-
- sz = sizeof(locaddr);
- rc = getsockname(fd, (struct sockaddr *)&locaddr, &sz);
- if (rc != 0) {
- perror ("Error on getsockname");
- close(fd);
- goto out;
- }
-
- if (m->port == 0)
- m->port = ntohs(locaddr.sin_port);
-
-#else
- for (rport = IPPORT_RESERVED - 1; rport > IPPORT_RESERVED / 2; --rport) {
- fd = socket(AF_INET, SOCK_STREAM, 0);
- if (fd < 0) {
- perror("tcpnal socket failed");
- goto out;
- }
-
- option = 1;
- rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
- &option, sizeof(option));
- if (rc != 0) {
- perror ("Can't set SO_REUSEADDR for socket");
- close(fd);
- goto out;
- }
-
- locaddr.sin_port = htons(rport);
- rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
- if (rc == 0 || errno == EACCES) {
- rc = connect(fd, (struct sockaddr *)&addr,
- sizeof(struct sockaddr_in));
- if (rc == 0) {
- break;
- } else if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) {
- perror("Error connecting to remote host");
- close(fd);
- goto out;
- }
- } else if (errno != EADDRINUSE) {
- perror("Error binding to privileged port");
- close(fd);
- goto out;
- }
- close(fd);
- }
-
- if (rport == IPPORT_RESERVED / 2) {
- fprintf(stderr, "Out of ports trying to bind to a reserved port\n");
- goto out;
- }
-#endif
-
- option = tcpnal_nagle ? 0 : 1;
- setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
- option = tcpnal_buffer_size;
- if (option != 0) {
- setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
- option = tcpnal_buffer_size;
- setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
- }
-
- /* say hello */
- if (tcpnal_hello(fd, nid)) {
- close(fd);
- goto out;
- }
-
- conn = allocate_connection(m, nid, fd);
-
- /* let nal thread know this event right away */
- if (conn)
- procbridge_wakeup_nal(pb);
-
-out:
- pthread_mutex_unlock(&m->conn_lock);
- return (conn);
-}
-
-
-#if 0 /* we don't accept connections */
-/* Function: new_connection
- * Arguments: t: opaque argument holding the tcpname
- * Returns: 1 in order to reregister for new connection requests
- *
- * called when the bound service socket recieves
- * a new connection request, it always accepts and
- * installs a new connection
- */
-static int new_connection(void *z)
-{
- manager m=z;
- struct sockaddr_in s;
- int len=sizeof(struct sockaddr_in);
- int fd=accept(m->bound,(struct sockaddr *)&s,&len);
- unsigned int nid=*((unsigned int *)&s.sin_addr);
- /* cfs specific hack */
- //unsigned short pid=s.sin_port;
- pthread_mutex_lock(&m->conn_lock);
- allocate_connection(m,htonl(nid),0/*pid*/,fd);
- pthread_mutex_unlock(&m->conn_lock);
- return(1);
-}
-
-/* Function: bind_socket
- * Arguments: t: the nal state for this interface
- * port: the port to attempt to bind to
- * Returns: 1 on success, or 0 on error
- *
- * bind_socket() attempts to allocate and bind a socket to the requested
- * port, or dynamically assign one from the kernel should the port be
- * zero. Sets the bound and bound_handler elements of m.
- *
- * TODO: The port should be an explicitly sized type.
- */
-static int bind_socket(manager m,unsigned short port)
-{
- struct sockaddr_in addr;
- int alen=sizeof(struct sockaddr_in);
-
- if ((m->bound = socket(AF_INET, SOCK_STREAM, 0)) < 0)
- return(0);
-
- bzero((char *) &addr, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = 0;
- addr.sin_port = htons(port);
-
- if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
- perror ("tcpnal bind");
- return(0);
- }
-
- getsockname(m->bound,(struct sockaddr *)&addr, &alen);
-
- m->bound_handler=register_io_handler(m->bound,READ_HANDLER,
- new_connection,m);
- listen(m->bound,5);
- m->port=addr.sin_port;
- return(1);
-}
-#endif
-
-
-/* Function: shutdown_connections
- * Arguments: m: the manager structure
- *
- * close all connections and reclaim resources
- */
-void shutdown_connections(manager m)
-{
-#if 0
- /* we don't accept connections */
- close(m->bound);
- remove_io_handler(m->bound_handler);
-#endif
- hash_destroy_table(m->connections,close_connection);
- free(m);
-}
-
-
-/* Function: init_connections
- * Arguments: t: the nal state for this interface
- * Returns: a newly allocated manager structure, or
- * zero if the fixed port could not be bound
- */
-manager init_connections(int (*input)(void *, void *), void *a)
-{
- manager m = (manager)malloc(sizeof(struct manager));
-
- m->connections = hash_create_table(compare_connection,connection_key);
- m->handler = input;
- m->handler_arg = a;
- m->port = 0; /* set on first connection */
- pthread_mutex_init(&m->conn_lock, 0);
-
- return m;
-#if 0
- if (bind_socket(m,pid))
- return(m);
-
- free(m);
- return(0);
-#endif
-}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
- */
-
-#include <table.h>
-#include <procbridge.h>
-
-typedef struct manager {
- table connections;
- pthread_mutex_t conn_lock; /* protect connections table */
-#if 0 /* we don't accept connections */
- int bound;
- io_handler bound_handler;
-#endif
- int (*handler)(void *, void *);
- void *handler_arg;
- int port;
-} *manager;
-
-
-typedef struct connection {
- lnet_nid_t peer_nid;
- int fd;
- manager m;
-} *connection;
-
-connection force_tcp_connection(manager m, lnet_nid_t nid, procbridge pb);
-manager init_connections(int (*f)(void *, void *), void *);
-void remove_connection(void *arg);
-void shutdown_connections(manager m);
-int read_connection(connection c, unsigned char *dest, int len);
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2002 Eric Hoffman
- *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
- */
-
-/* this file is only called dispatch.h to prevent it
- from colliding with /usr/include/sys/select.h */
-
-typedef struct io_handler *io_handler;
-
-struct io_handler{
- io_handler *last;
- io_handler next;
- int fd;
- int type;
- int (*function)(void *);
- void *argument;
- int disabled;
-};
-
-
-#define READ_HANDLER 1
-#define WRITE_HANDLER 2
-#define EXCEPTION_HANDLER 4
-#define ALL_HANDLER (READ_HANDLER | WRITE_HANDLER | EXCEPTION_HANDLER)
-
-io_handler register_io_handler(int fd,
- int type,
- int (*function)(void *),
- void *arg);
-
-void remove_io_handler (io_handler i);
-void init_unix_timer(void);
-void select_timer_block(when until);
-when now(void);
-
-/*
- * hacking for CFS internal MPI testing
- */
-#undef ENABLE_SELECT_DISPATCH
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Author: Maxim Patlasov <maxim@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ */
+
+#include "usocklnd.h"
+#include <unistd.h>
+#include <syscall.h>
+
+int
+usocklnd_notifier_handler(int fd)
+{
+ int notification;
+ return syscall(SYS_read, fd, ¬ification, sizeof(notification));
+}
+
+void
+usocklnd_exception_handler(usock_conn_t *conn)
+{
+ pthread_mutex_lock(&conn->uc_lock);
+
+ if (conn->uc_state == UC_CONNECTING ||
+ conn->uc_state == UC_SENDING_HELLO)
+ usocklnd_conn_kill_locked(conn);
+
+ pthread_mutex_unlock(&conn->uc_lock);
+}
+
+int
+usocklnd_read_handler(usock_conn_t *conn)
+{
+ int rc;
+ int continue_reading;
+ int state;
+
+ read_again:
+ rc = 0;
+ pthread_mutex_lock(&conn->uc_lock);
+ state = conn->uc_state;
+
+ /* process special case: LNET calls lnd_recv() asyncronously */
+ if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) {
+ /* still don't have usocklnd_recv() called */
+ rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0);
+ if (rc == 0)
+ conn->uc_rx_state = UC_RX_PARSE_WAIT;
+ else
+ usocklnd_conn_kill_locked(conn);
+
+ pthread_mutex_unlock(&conn->uc_lock);
+ return rc;
+ }
+
+ pthread_mutex_unlock(&conn->uc_lock);
+ /* From here and below the conn cannot be changed
+ * asyncronously, except:
+ * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list,
+ * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */
+
+ switch (state) {
+
+ case UC_RECEIVING_HELLO:
+ case UC_READY:
+ if (conn->uc_rx_nob_wanted != 0) {
+ /* read from conn fd as much wanted data as possible */
+ rc = usocklnd_read_data(conn);
+ if (rc == 0) /* partial read */
+ break;
+ if (rc < 0) {/* error happened or EOF */
+ usocklnd_conn_kill(conn);
+ break;
+ }
+ }
+
+ /* process incoming data */
+ if (state == UC_READY )
+ rc = usocklnd_read_msg(conn, &continue_reading);
+ else /* state == UC_RECEIVING_HELLO */
+ rc = usocklnd_read_hello(conn, &continue_reading);
+
+ if (rc < 0) {
+ usocklnd_conn_kill(conn);
+ break;
+ }
+
+ if (continue_reading)
+ goto read_again;
+
+ break;
+
+ case UC_DEAD:
+ break;
+
+ default:
+ LBUG();
+ }
+
+ return rc;
+}
+
+/* Switch on rx_state.
+ * Return 0 on success, 1 if whole packet is read, else return <0
+ * Always set cont_flag: 1 if we're ready to continue reading, else 0
+ * NB: If whole packet is read, cont_flag will be set to zero to take
+ * care of fairess
+ */
+int
+usocklnd_read_msg(usock_conn_t *conn, int *cont_flag)
+{
+ int rc = 0;
+ __u64 cookie;
+
+ *cont_flag = 0;
+
+ /* smth. new emerged in RX part - let's process it */
+ switch (conn->uc_rx_state) {
+ case UC_RX_KSM_HEADER:
+ if (conn->uc_flip) {
+ __swab32s(&conn->uc_rx_msg.ksm_type);
+ __swab32s(&conn->uc_rx_msg.ksm_csum);
+ __swab64s(&conn->uc_rx_msg.ksm_zc_req_cookie);
+ __swab64s(&conn->uc_rx_msg.ksm_zc_ack_cookie);
+ }
+
+ /* we never send packets for wich zc-acking is required */
+ if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
+ conn->uc_rx_msg.ksm_zc_ack_cookie != 0) {
+ conn->uc_errored = 1;
+ return -EPROTO;
+ }
+
+ /* zc_req will be processed later, when
+ lnet payload will be received */
+
+ usocklnd_rx_lnethdr_state_transition(conn);
+ *cont_flag = 1;
+ break;
+
+ case UC_RX_LNET_HEADER:
+ if (the_lnet.ln_pid & LNET_PID_USERFLAG) {
+ /* replace dest_nid,pid (ksocknal sets its own) */
+ conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid =
+ cpu_to_le64(conn->uc_peer->up_ni->ni_nid);
+ conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid =
+ cpu_to_le32(the_lnet.ln_pid);
+
+ } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) {
+ /* Userspace peer */
+ lnet_process_id_t *id = &conn->uc_peer->up_peerid;
+ lnet_hdr_t *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr;
+
+ /* Substitute process ID assigned at connection time */
+ lhdr->src_pid = cpu_to_le32(id->pid);
+ lhdr->src_nid = cpu_to_le64(id->nid);
+ }
+
+ conn->uc_rx_state = UC_RX_PARSE;
+ usocklnd_conn_addref(conn); /* ++ref while parsing */
+
+ rc = lnet_parse(conn->uc_peer->up_ni,
+ &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr,
+ conn->uc_peerid.nid, conn, 0);
+
+ if (rc < 0) {
+ /* I just received garbage: give up on this conn */
+ conn->uc_errored = 1;
+ usocklnd_conn_decref(conn);
+ return -EPROTO;
+ }
+
+ /* Race with usocklnd_recv() is possible */
+ pthread_mutex_lock(&conn->uc_lock);
+ LASSERT (conn->uc_rx_state == UC_RX_PARSE ||
+ conn->uc_rx_state == UC_RX_LNET_PAYLOAD);
+
+ /* check whether usocklnd_recv() got called */
+ if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD)
+ *cont_flag = 1;
+ pthread_mutex_unlock(&conn->uc_lock);
+ break;
+
+ case UC_RX_PARSE:
+ LBUG(); /* it's error to be here, because this special
+ * case is handled by caller */
+ break;
+
+ case UC_RX_PARSE_WAIT:
+ LBUG(); /* it's error to be here, because the conn
+ * shouldn't wait for POLLIN event in this
+ * state */
+ break;
+
+ case UC_RX_LNET_PAYLOAD:
+ /* payload all received */
+
+ lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);
+
+ cookie = conn->uc_rx_msg.ksm_zc_req_cookie;
+ if (cookie != 0)
+ rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
+
+ if (rc != 0) {
+ /* change state not to finalize twice */
+ conn->uc_rx_state = UC_RX_KSM_HEADER;
+ return -EPROTO;
+ }
+
+ /* Fall through */
+
+ case UC_RX_SKIPPING:
+ if (conn->uc_rx_nob_left != 0) {
+ usocklnd_rx_skipping_state_transition(conn);
+ *cont_flag = 1;
+ } else {
+ usocklnd_rx_ksmhdr_state_transition(conn);
+ rc = 1; /* whole packet is read */
+ }
+
+ break;
+
+ default:
+ LBUG(); /* unknown state */
+ }
+
+ return rc;
+}
+
+/* Handle incoming ZC request from sender.
+ * NB: it's called only from read_handler, so we're sure that
+ * the conn cannot become zombie in the middle of processing */
+int
+usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie)
+{
+ usock_conn_t *conn;
+ usock_zc_ack_t *zc_ack;
+ int type;
+ int rc;
+ int dummy;
+
+ LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack));
+ if (zc_ack == NULL)
+ return -ENOMEM;
+ zc_ack->zc_cookie = cookie;
+
+ /* Let's assume that CONTROL is the best type for zcack,
+ * but userspace clients don't use typed connections */
+ if (the_lnet.ln_pid & LNET_PID_USERFLAG)
+ type = SOCKLND_CONN_ANY;
+ else
+ type = SOCKLND_CONN_CONTROL;
+
+ rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack,
+ &dummy);
+ if (rc != 0) {
+ LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
+ return rc;
+ }
+ usocklnd_conn_decref(conn);
+
+ return 0;
+}
+
+/* Switch on rx_state.
+ * Return 0 on success, else return <0
+ * Always set cont_flag: 1 if we're ready to continue reading, else 0
+ */
+int
+usocklnd_read_hello(usock_conn_t *conn, int *cont_flag)
+{
+ int rc = 0;
+ ksock_hello_msg_t *hello = conn->uc_rx_hello;
+
+ *cont_flag = 0;
+
+ /* smth. new emerged in hello - let's process it */
+ switch (conn->uc_rx_state) {
+ case UC_RX_HELLO_MAGIC:
+ if (hello->kshm_magic == LNET_PROTO_MAGIC)
+ conn->uc_flip = 0;
+ else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
+ conn->uc_flip = 1;
+ else
+ return -EPROTO;
+
+ usocklnd_rx_helloversion_state_transition(conn);
+ *cont_flag = 1;
+ break;
+
+ case UC_RX_HELLO_VERSION:
+ if ((!conn->uc_flip &&
+ (hello->kshm_version != KSOCK_PROTO_V2)) ||
+ (conn->uc_flip &&
+ (hello->kshm_version != __swab32(KSOCK_PROTO_V2))))
+ return -EPROTO;
+
+ usocklnd_rx_hellobody_state_transition(conn);
+ *cont_flag = 1;
+ break;
+
+ case UC_RX_HELLO_BODY:
+ if (conn->uc_flip) {
+ ksock_hello_msg_t *hello = conn->uc_rx_hello;
+ __swab32s(&hello->kshm_src_pid);
+ __swab64s(&hello->kshm_src_nid);
+ __swab32s(&hello->kshm_dst_pid);
+ __swab64s(&hello->kshm_dst_nid);
+ __swab64s(&hello->kshm_src_incarnation);
+ __swab64s(&hello->kshm_dst_incarnation);
+ __swab32s(&hello->kshm_ctype);
+ __swab32s(&hello->kshm_nips);
+ }
+
+ if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) {
+ CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n",
+ conn->uc_rx_hello->kshm_nips,
+ HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port);
+ return -EPROTO;
+ }
+
+ if (conn->uc_rx_hello->kshm_nips) {
+ usocklnd_rx_helloIPs_state_transition(conn);
+ *cont_flag = 1;
+ break;
+ }
+ /* fall through */
+
+ case UC_RX_HELLO_IPS:
+ if (conn->uc_activeflag == 1) /* active conn */
+ rc = usocklnd_activeconn_hellorecv(conn);
+ else /* passive conn */
+ rc = usocklnd_passiveconn_hellorecv(conn);
+
+ break;
+
+ default:
+ LBUG(); /* unknown state */
+ }
+
+ return rc;
+}
+
+/* All actions that we need after receiving hello on active conn:
+ * 1) Schedule removing if we're zombie
+ * 2) Restart active conn if we lost the race
+ * 3) Else: update RX part to receive KSM header
+ */
+int
+usocklnd_activeconn_hellorecv(usock_conn_t *conn)
+{
+ int rc = 0;
+ ksock_hello_msg_t *hello = conn->uc_rx_hello;
+ usock_peer_t *peer = conn->uc_peer;
+
+ /* Active conn with peer==NULL is zombie.
+ * Don't try to link it to peer because the conn
+ * has already had a chance to proceed at the beginning */
+ if (peer == NULL) {
+ LASSERT(list_empty(&conn->uc_tx_list) &&
+ list_empty(&conn->uc_zcack_list));
+
+ usocklnd_conn_kill(conn);
+ return 0;
+ }
+
+ peer->up_last_alive = cfs_time_current();
+
+ /* peer says that we lost the race */
+ if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
+ /* Start new active conn, relink txs and zc_acks from
+ * the conn to new conn, schedule removing the conn.
+ * Actually, we're expecting that a passive conn will
+ * make us zombie soon and take care of our txs and
+ * zc_acks */
+
+ struct list_head tx_list, zcack_list;
+ usock_conn_t *conn2;
+ int idx = usocklnd_type2idx(conn->uc_type);
+
+ CFS_INIT_LIST_HEAD (&tx_list);
+ CFS_INIT_LIST_HEAD (&zcack_list);
+
+ /* Block usocklnd_send() to check peer->up_conns[idx]
+ * and to enqueue more txs */
+ pthread_mutex_lock(&peer->up_lock);
+ pthread_mutex_lock(&conn->uc_lock);
+
+ /* usocklnd_shutdown() could kill us */
+ if (conn->uc_state == UC_DEAD) {
+ pthread_mutex_unlock(&conn->uc_lock);
+ pthread_mutex_unlock(&peer->up_lock);
+ return 0;
+ }
+
+ LASSERT (peer == conn->uc_peer);
+ LASSERT (peer->up_conns[idx] == conn);
+
+ rc = usocklnd_create_active_conn(peer, conn->uc_type, &conn2);
+ if (rc) {
+ conn->uc_errored = 1;
+ pthread_mutex_unlock(&conn->uc_lock);
+ pthread_mutex_unlock(&peer->up_lock);
+ return rc;
+ }
+
+ usocklnd_link_conn_to_peer(conn2, peer, idx);
+ conn2->uc_peer = peer;
+
+ /* unlink txs and zcack from the conn */
+ list_add(&tx_list, &conn->uc_tx_list);
+ list_del_init(&conn->uc_tx_list);
+ list_add(&zcack_list, &conn->uc_zcack_list);
+ list_del_init(&conn->uc_zcack_list);
+
+ /* link they to the conn2 */
+ list_add(&conn2->uc_tx_list, &tx_list);
+ list_del_init(&tx_list);
+ list_add(&conn2->uc_zcack_list, &zcack_list);
+ list_del_init(&zcack_list);
+
+ /* make conn zombie */
+ conn->uc_peer = NULL;
+ usocklnd_peer_decref(peer);
+
+ /* schedule conn2 for processing */
+ rc = usocklnd_add_pollrequest(conn2, POLL_ADD_REQUEST, POLLOUT);
+ if (rc) {
+ peer->up_conns[idx] = NULL;
+ usocklnd_conn_decref(conn2); /* should destroy conn */
+ } else {
+ usocklnd_conn_kill_locked(conn);
+ }
+
+ pthread_mutex_unlock(&conn->uc_lock);
+ pthread_mutex_unlock(&peer->up_lock);
+ usocklnd_conn_decref(conn);
+
+ } else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */
+ if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype))
+ return -EPROTO;
+
+ pthread_mutex_lock(&peer->up_lock);
+ usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation,
+ conn);
+ pthread_mutex_unlock(&peer->up_lock);
+
+ /* safely transit to UC_READY state */
+ /* rc == 0 */
+ pthread_mutex_lock(&conn->uc_lock);
+ if (conn->uc_state != UC_DEAD) {
+ usocklnd_rx_ksmhdr_state_transition(conn);
+
+ /* POLLIN is already set because we just
+ * received hello, but maybe we've smth. to
+ * send? */
+ LASSERT (conn->uc_sending == 0);
+ if ( !list_empty(&conn->uc_tx_list) ||
+ !list_empty(&conn->uc_zcack_list) ) {
+
+ conn->uc_tx_deadline =
+ cfs_time_shift(usock_tuns.ut_timeout);
+ conn->uc_tx_flag = 1;
+ rc = usocklnd_add_pollrequest(conn,
+ POLL_SET_REQUEST,
+ POLLIN | POLLOUT);
+ }
+
+ if (rc == 0)
+ conn->uc_state = UC_READY;
+ }
+ pthread_mutex_unlock(&conn->uc_lock);
+ }
+
+ return rc;
+}
+
+/* All actions that we need after receiving hello on passive conn:
+ * 1) Stash peer's nid, pid, incarnation and conn type
+ * 2) Cope with easy case: conn[idx] is empty - just save conn there
+ * 3) Resolve race:
+ * a) if our nid is higher - reply with CONN_NONE and make us zombie
+ * b) if peer's nid is higher - postpone race resolution till
+ * READY state
+ * 4) Anyhow, send reply hello
+*/
+int
+usocklnd_passiveconn_hellorecv(usock_conn_t *conn)
+{
+ ksock_hello_msg_t *hello = conn->uc_rx_hello;
+ int type;
+ int idx;
+ int rc;
+ usock_peer_t *peer;
+ lnet_ni_t *ni = conn->uc_ni;
+ __u32 peer_ip = conn->uc_peer_ip;
+ __u16 peer_port = conn->uc_peer_port;
+
+ /* don't know parent peer yet and not zombie */
+ LASSERT (conn->uc_peer == NULL &&
+ ni != NULL);
+
+ /* don't know peer's nid and incarnation yet */
+ if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
+ /* do not trust liblustre clients */
+ conn->uc_peerid.pid = peer_port | LNET_PID_USERFLAG;
+ conn->uc_peerid.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
+ peer_ip);
+ if (hello->kshm_ctype != SOCKLND_CONN_ANY) {
+ lnet_ni_decref(ni);
+ conn->uc_ni = NULL;
+ CERROR("Refusing to accept connection of type=%d from "
+ "userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype,
+ HIPQUAD(peer_ip), peer_port);
+ return -EINVAL;
+ }
+ } else {
+ conn->uc_peerid.pid = hello->kshm_src_pid;
+ conn->uc_peerid.nid = hello->kshm_src_nid;
+ }
+ conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype);
+
+ rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer);
+ if (rc) {
+ lnet_ni_decref(ni);
+ conn->uc_ni = NULL;
+ return rc;
+ }
+
+ peer->up_last_alive = cfs_time_current();
+
+ idx = usocklnd_type2idx(conn->uc_type);
+
+ /* safely check whether we're first */
+ pthread_mutex_lock(&peer->up_lock);
+
+ usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL);
+
+ if (peer->up_conns[idx] == NULL) {
+ peer->up_last_alive = cfs_time_current();
+ conn->uc_peer = peer;
+ conn->uc_ni = NULL;
+ usocklnd_link_conn_to_peer(conn, peer, idx);
+ usocklnd_conn_addref(conn);
+ } else {
+ usocklnd_peer_decref(peer);
+
+ /* Resolve race in favour of higher NID */
+ if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) {
+ /* make us zombie */
+ conn->uc_ni = NULL;
+ type = SOCKLND_CONN_NONE;
+ }
+
+ /* if conn->uc_peerid.nid > conn->uc_ni->ni_nid,
+ * postpone race resolution till READY state
+ * (hopefully that conn[idx] will die because of
+ * incoming hello of CONN_NONE type) */
+ }
+ pthread_mutex_unlock(&peer->up_lock);
+
+ /* allocate and initialize fake tx with hello */
+ conn->uc_tx_hello = usocklnd_create_hello_tx(ni, type,
+ conn->uc_peerid.nid);
+ if (conn->uc_ni == NULL)
+ lnet_ni_decref(ni);
+
+ if (conn->uc_tx_hello == NULL)
+ return -ENOMEM;
+
+ /* rc == 0 */
+ pthread_mutex_lock(&conn->uc_lock);
+ if (conn->uc_state == UC_DEAD)
+ goto passive_hellorecv_done;
+
+ conn->uc_state = UC_SENDING_HELLO;
+ conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
+ conn->uc_tx_flag = 1;
+ rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT);
+
+ passive_hellorecv_done:
+ pthread_mutex_unlock(&conn->uc_lock);
+ return rc;
+}
+
+int
+usocklnd_write_handler(usock_conn_t *conn)
+{
+ usock_tx_t *tx;
+ int ret;
+ int rc = 0;
+ int state;
+ usock_peer_t *peer;
+ lnet_ni_t *ni;
+
+ pthread_mutex_lock(&conn->uc_lock); /* like membar */
+ state = conn->uc_state;
+ pthread_mutex_unlock(&conn->uc_lock);
+
+ switch (state) {
+ case UC_CONNECTING:
+ /* hello_tx has already been initialized
+ * in usocklnd_create_active_conn() */
+ usocklnd_conn_new_state(conn, UC_SENDING_HELLO);
+ /* fall through */
+
+ case UC_SENDING_HELLO:
+ rc = usocklnd_send_tx(conn, conn->uc_tx_hello);
+ if (rc <= 0) /* error or partial send or connection closed */
+ break;
+
+ /* tx with hello was sent successfully */
+ usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
+ conn->uc_tx_hello = NULL;
+
+ if (conn->uc_activeflag == 1) /* active conn */
+ rc = usocklnd_activeconn_hellosent(conn);
+ else /* passive conn */
+ rc = usocklnd_passiveconn_hellosent(conn);
+
+ break;
+
+ case UC_READY:
+ pthread_mutex_lock(&conn->uc_lock);
+
+ peer = conn->uc_peer;
+ LASSERT (peer != NULL);
+ ni = peer->up_ni;
+
+ if (list_empty(&conn->uc_tx_list) &&
+ list_empty(&conn->uc_zcack_list)) {
+ LASSERT(usock_tuns.ut_fair_limit > 1);
+ pthread_mutex_unlock(&conn->uc_lock);
+ return 0;
+ }
+
+ tx = usocklnd_try_piggyback(&conn->uc_tx_list,
+ &conn->uc_zcack_list);
+ if (tx != NULL)
+ conn->uc_sending = 1;
+ else
+ rc = -ENOMEM;
+
+ pthread_mutex_unlock(&conn->uc_lock);
+
+ if (rc)
+ break;
+
+ rc = usocklnd_send_tx(conn, tx);
+ if (rc == 0) { /* partial send or connection closed */
+ pthread_mutex_lock(&conn->uc_lock);
+ list_add(&tx->tx_list, &conn->uc_tx_list);
+ conn->uc_sending = 0;
+ pthread_mutex_unlock(&conn->uc_lock);
+ break;
+ }
+ if (rc < 0) { /* real error */
+ usocklnd_destroy_tx(ni, tx);
+ break;
+ }
+
+ /* rc == 1: tx was sent completely */
+ usocklnd_destroy_tx(ni, tx);
+
+ pthread_mutex_lock(&conn->uc_lock);
+ conn->uc_sending = 0;
+ if (conn->uc_state != UC_DEAD &&
+ list_empty(&conn->uc_tx_list) &&
+ list_empty(&conn->uc_zcack_list)) {
+ conn->uc_tx_flag = 0;
+ ret = usocklnd_add_pollrequest(conn,
+ POLL_TX_SET_REQUEST, 0);
+ if (ret)
+ rc = ret;
+ }
+ pthread_mutex_unlock(&conn->uc_lock);
+
+ break;
+
+ case UC_DEAD:
+ break;
+
+ default:
+ LBUG();
+ }
+
+ if (rc < 0)
+ usocklnd_conn_kill(conn);
+
+ return rc;
+}
+
+/* Return the first tx from tx_list with piggybacked zc_ack
+ * from zcack_list when possible. If tx_list is empty, return
+ * brand new noop tx for zc_ack from zcack_list. Return NULL
+ * if an error happened */
+usock_tx_t *
+usocklnd_try_piggyback(struct list_head *tx_list_p,
+ struct list_head *zcack_list_p)
+{
+ usock_tx_t *tx;
+ usock_zc_ack_t *zc_ack;
+
+ /* assign tx and zc_ack */
+ if (list_empty(tx_list_p))
+ tx = NULL;
+ else {
+ tx = list_entry(tx_list_p->next, usock_tx_t, tx_list);
+ list_del(&tx->tx_list);
+
+ /* already piggybacked or partially send */
+ if (tx->tx_msg.ksm_zc_ack_cookie ||
+ tx->tx_resid != tx->tx_nob)
+ return tx;
+ }
+
+ if (list_empty(zcack_list_p)) {
+ /* nothing to piggyback */
+ return tx;
+ } else {
+ zc_ack = list_entry(zcack_list_p->next,
+ usock_zc_ack_t, zc_list);
+ list_del(&zc_ack->zc_list);
+ }
+
+ if (tx != NULL)
+ /* piggyback the zc-ack cookie */
+ tx->tx_msg.ksm_zc_ack_cookie = zc_ack->zc_cookie;
+ else
+ /* cannot piggyback, need noop */
+ tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);
+
+ LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
+ return tx;
+}
+
+/* All actions that we need after sending hello on active conn:
+ * 1) update RX iov to receive hello
+ * 2) state transition to UC_RECEIVING_HELLO
+ * 3) notify poll_thread that we're waiting for incoming hello */
+int
+usocklnd_activeconn_hellosent(usock_conn_t *conn)
+{
+ int rc = 0;
+
+ pthread_mutex_lock(&conn->uc_lock);
+
+ if (conn->uc_state != UC_DEAD) {
+ usocklnd_rx_hellomagic_state_transition(conn);
+ conn->uc_state = UC_RECEIVING_HELLO;
+ conn->uc_tx_flag = 0;
+ rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN);
+ }
+
+ pthread_mutex_unlock(&conn->uc_lock);
+
+ return rc;
+}
+
+/* All actions that we need after sending hello on passive conn:
+ * 1) Cope with 1st easy case: conn is already linked to a peer
+ * 2) Cope with 2nd easy case: remove zombie conn
+ * 3) Resolve race:
+ * a) find the peer
+ * b) link the conn to the peer if conn[idx] is empty
+ * c) if the conn[idx] isn't empty and is in READY state,
+ * remove the conn as duplicated
+ * d) if the conn[idx] isn't empty and isn't in READY state,
+ * override conn[idx] with the conn
+ */
+int
+usocklnd_passiveconn_hellosent(usock_conn_t *conn)
+{
+ usock_conn_t *conn2;
+ usock_peer_t *peer;
+ struct list_head tx_list;
+ struct list_head zcack_list;
+ int idx;
+ int rc = 0;
+
+ /* almost nothing to do if conn is already linked to peer hash table */
+ if (conn->uc_peer != NULL)
+ goto passive_hellosent_done;
+
+ /* conn->uc_peer == NULL, so the conn isn't accessible via
+ * peer hash list, so nobody can touch the conn but us */
+
+ if (conn->uc_ni == NULL) /* remove zombie conn */
+ goto passive_hellosent_connkill;
+
+ /* all code below is race resolution, because normally
+ * passive conn is linked to peer just after receiving hello */
+ CFS_INIT_LIST_HEAD (&tx_list);
+ CFS_INIT_LIST_HEAD (&zcack_list);
+
+ /* conn is passive and isn't linked to any peer,
+ so its tx and zc_ack lists have to be empty */
+ LASSERT (list_empty(&conn->uc_tx_list) &&
+ list_empty(&conn->uc_zcack_list) &&
+ conn->uc_sending == 0);
+
+ rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer);
+ if (rc)
+ return rc;
+
+ idx = usocklnd_type2idx(conn->uc_type);
+
+ /* try to link conn to peer */
+ pthread_mutex_lock(&peer->up_lock);
+ if (peer->up_conns[idx] == NULL) {
+ usocklnd_link_conn_to_peer(conn, peer, idx);
+ usocklnd_conn_addref(conn);
+ conn->uc_peer = peer;
+ usocklnd_peer_addref(peer);
+ } else {
+ conn2 = peer->up_conns[idx];
+ pthread_mutex_lock(&conn2->uc_lock);
+
+ if (conn2->uc_state == UC_READY) {
+ /* conn2 is in READY state, so conn is "duplicated" */
+ pthread_mutex_unlock(&conn2->uc_lock);
+ pthread_mutex_unlock(&peer->up_lock);
+ usocklnd_peer_decref(peer);
+ goto passive_hellosent_connkill;
+ }
+
+ /* uc_state != UC_READY => switch conn and conn2 */
+ /* Relink txs and zc_acks from conn2 to conn.
+ * We're sure that nobody but us can access to conn,
+ * nevertheless we use mutex (if we're wrong yet,
+ * deadlock is easy to see that corrupted list */
+ list_add(&tx_list, &conn2->uc_tx_list);
+ list_del_init(&conn2->uc_tx_list);
+ list_add(&zcack_list, &conn2->uc_zcack_list);
+ list_del_init(&conn2->uc_zcack_list);
+
+ pthread_mutex_lock(&conn->uc_lock);
+ list_add_tail(&conn->uc_tx_list, &tx_list);
+ list_del_init(&tx_list);
+ list_add_tail(&conn->uc_zcack_list, &zcack_list);
+ list_del_init(&zcack_list);
+ conn->uc_peer = peer;
+ pthread_mutex_unlock(&conn->uc_lock);
+
+ conn2->uc_peer = NULL; /* make conn2 zombie */
+ pthread_mutex_unlock(&conn2->uc_lock);
+ usocklnd_conn_decref(conn2);
+
+ usocklnd_link_conn_to_peer(conn, peer, idx);
+ usocklnd_conn_addref(conn);
+ conn->uc_peer = peer;
+ }
+
+ lnet_ni_decref(conn->uc_ni);
+ conn->uc_ni = NULL;
+ pthread_mutex_unlock(&peer->up_lock);
+ usocklnd_peer_decref(peer);
+
+ passive_hellosent_done:
+ /* safely transit to UC_READY state */
+ /* rc == 0 */
+ pthread_mutex_lock(&conn->uc_lock);
+ if (conn->uc_state != UC_DEAD) {
+ usocklnd_rx_ksmhdr_state_transition(conn);
+
+ /* we're ready to recive incoming packets and maybe
+ already have smth. to transmit */
+ LASSERT (conn->uc_sending == 0);
+ if ( list_empty(&conn->uc_tx_list) &&
+ list_empty(&conn->uc_zcack_list) ) {
+ conn->uc_tx_flag = 0;
+ rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
+ POLLIN);
+ } else {
+ conn->uc_tx_deadline =
+ cfs_time_shift(usock_tuns.ut_timeout);
+ conn->uc_tx_flag = 1;
+ rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,
+ POLLIN | POLLOUT);
+ }
+
+ if (rc == 0)
+ conn->uc_state = UC_READY;
+ }
+ pthread_mutex_unlock(&conn->uc_lock);
+ return rc;
+
+ passive_hellosent_connkill:
+ usocklnd_conn_kill(conn);
+ return 0;
+}
+
+/* Send as much tx data as possible.
+ * Returns 0 or 1 on succsess, <0 if fatal error.
+ * 0 means partial send or non-fatal error, 1 - complete.
+ * Rely on libcfs_sock_writev() for differentiating fatal and
+ * non-fatal errors. An error should be considered as non-fatal if:
+ * 1) it still makes sense to continue reading &&
+ * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
+int
+usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx)
+{
+ struct iovec *iov;
+ int nob;
+ int fd = conn->uc_fd;
+ cfs_time_t t;
+
+ LASSERT (tx->tx_resid != 0);
+
+ do {
+ usock_peer_t *peer = conn->uc_peer;
+
+ LASSERT (tx->tx_niov > 0);
+
+ nob = libcfs_sock_writev(fd, tx->tx_iov, tx->tx_niov);
+ if (nob < 0)
+ conn->uc_errored = 1;
+ if (nob <= 0) /* write queue is flow-controlled or error */
+ return nob;
+
+ LASSERT (nob <= tx->tx_resid);
+ tx->tx_resid -= nob;
+ t = cfs_time_current();
+ conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
+
+ if(peer != NULL)
+ peer->up_last_alive = t;
+
+ /* "consume" iov */
+ iov = tx->tx_iov;
+ do {
+ LASSERT (tx->tx_niov > 0);
+
+ if (nob < iov->iov_len) {
+ iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
+ iov->iov_len -= nob;
+ break;
+ }
+
+ nob -= iov->iov_len;
+ tx->tx_iov = ++iov;
+ tx->tx_niov--;
+ } while (nob != 0);
+
+ } while (tx->tx_resid != 0);
+
+ return 1; /* send complete */
+}
+
+/* Read from wire as much data as possible.
+ * Returns 0 or 1 on succsess, <0 if error or EOF.
+ * 0 means partial read, 1 - complete */
+int
+usocklnd_read_data(usock_conn_t *conn)
+{
+ struct iovec *iov;
+ int nob;
+ cfs_time_t t;
+
+ LASSERT (conn->uc_rx_nob_wanted != 0);
+
+ do {
+ usock_peer_t *peer = conn->uc_peer;
+
+ LASSERT (conn->uc_rx_niov > 0);
+
+ nob = libcfs_sock_readv(conn->uc_fd, conn->uc_rx_iov, conn->uc_rx_niov);
+ if (nob <= 0) {/* read nothing or error */
+ conn->uc_errored = 1;
+ return nob;
+ }
+
+ LASSERT (nob <= conn->uc_rx_nob_wanted);
+ conn->uc_rx_nob_wanted -= nob;
+ conn->uc_rx_nob_left -= nob;
+ t = cfs_time_current();
+ conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
+
+ if(peer != NULL)
+ peer->up_last_alive = t;
+
+ /* "consume" iov */
+ iov = conn->uc_rx_iov;
+ do {
+ LASSERT (conn->uc_rx_niov > 0);
+
+ if (nob < iov->iov_len) {
+ iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
+ iov->iov_len -= nob;
+ break;
+ }
+
+ nob -= iov->iov_len;
+ conn->uc_rx_iov = ++iov;
+ conn->uc_rx_niov--;
+ } while (nob != 0);
+
+ } while (conn->uc_rx_nob_wanted != 0);
+
+ return 1; /* read complete */
+}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Author: Maxim Patlasov <maxim@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ */
+
+#include "usocklnd.h"
+#include <unistd.h>
+#include <syscall.h>
+
+void
+usocklnd_process_stale_list(usock_pollthread_t *pt_data)
+{
+ while (!list_empty(&pt_data->upt_stale_list)) {
+ usock_conn_t *conn;
+ conn = list_entry(pt_data->upt_stale_list.next,
+ usock_conn_t, uc_stale_list);
+
+ list_del(&conn->uc_stale_list);
+
+ usocklnd_tear_peer_conn(conn);
+ usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
+ }
+}
+
+int
+usocklnd_poll_thread(void *arg)
+{
+ int rc = 0;
+ usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;
+ cfs_time_t current_time;
+ cfs_time_t planned_time;
+ int idx;
+ int idx_start;
+ int idx_finish;
+ int chunk;
+ int saved_nfds;
+ int extra;
+ int times;
+
+ /* mask signals to avoid SIGPIPE, etc */
+ sigset_t sigs;
+ sigfillset (&sigs);
+ pthread_sigmask (SIG_SETMASK, &sigs, 0);
+
+ LASSERT(pt_data != NULL);
+
+ planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
+ chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
+ saved_nfds = pt_data->upt_nfds;
+ idx_start = 1;
+
+ /* Main loop */
+ while (usock_data.ud_shutdown == 0) {
+ rc = 0;
+
+ /* Process all enqueued poll requests */
+ pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
+ while (!list_empty(&pt_data->upt_pollrequests)) {
+ usock_pollrequest_t *pr;
+ pr = list_entry(pt_data->upt_pollrequests.next,
+ usock_pollrequest_t, upr_list);
+
+ list_del(&pr->upr_list);
+ rc = usocklnd_process_pollrequest(pr, pt_data);
+ if (rc)
+ break;
+ }
+ pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
+
+ if (rc)
+ break;
+
+ /* Delete conns orphaned due to POLL_DEL_REQUESTs */
+ usocklnd_process_stale_list(pt_data);
+
+ /* Actual polling for events */
+ rc = poll(pt_data->upt_pollfd,
+ pt_data->upt_nfds,
+ usock_tuns.ut_poll_timeout * 1000);
+
+ if (rc < 0) {
+ CERROR("Cannot poll(2): errno=%d\n", errno);
+ break;
+ }
+
+ if (rc > 0)
+ usocklnd_execute_handlers(pt_data);
+
+ current_time = cfs_time_current();
+
+ if (pt_data->upt_nfds < 2 ||
+ cfs_time_before(current_time, planned_time))
+ continue;
+
+ /* catch up growing pollfd[] */
+ if (pt_data->upt_nfds > saved_nfds) {
+ extra = pt_data->upt_nfds - saved_nfds;
+ saved_nfds = pt_data->upt_nfds;
+ } else {
+ extra = 0;
+ }
+
+ times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;
+ idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
+
+ for (idx = idx_start; idx < idx_finish; idx++) {
+ usock_conn_t *conn = pt_data->upt_idx2conn[idx];
+ pthread_mutex_lock(&conn->uc_lock);
+ if (usocklnd_conn_timed_out(conn, current_time) &&
+ conn->uc_state != UC_DEAD) {
+ conn->uc_errored = 1;
+ usocklnd_conn_kill_locked(conn);
+ }
+ pthread_mutex_unlock(&conn->uc_lock);
+ }
+
+ if (idx_finish == pt_data->upt_nfds) {
+ chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
+ saved_nfds = pt_data->upt_nfds;
+ idx_start = 1;
+ }
+ else {
+ idx_start = idx_finish;
+ }
+
+ planned_time = cfs_time_add(current_time,
+ cfs_time_seconds(usock_tuns.ut_poll_timeout));
+ }
+
+ /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
+ LASSERT (rc != 0 || pt_data->upt_nfds == 1);
+
+ if (rc) {
+ pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
+
+ /* Block new poll requests to be enqueued */
+ pt_data->upt_errno = rc;
+
+ while (!list_empty(&pt_data->upt_pollrequests)) {
+ usock_pollrequest_t *pr;
+ pr = list_entry(pt_data->upt_pollrequests.next,
+ usock_pollrequest_t, upr_list);
+
+ list_del(&pr->upr_list);
+
+ if (pr->upr_type == POLL_ADD_REQUEST) {
+ close(pr->upr_conn->uc_fd);
+ list_add_tail(&pr->upr_conn->uc_stale_list,
+ &pt_data->upt_stale_list);
+ } else {
+ usocklnd_conn_decref(pr->upr_conn);
+ }
+
+ LIBCFS_FREE (pr, sizeof(*pr));
+ }
+ pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
+
+ usocklnd_process_stale_list(pt_data);
+
+ for (idx = 1; idx < pt_data->upt_nfds; idx++) {
+ usock_conn_t *conn = pt_data->upt_idx2conn[idx];
+ LASSERT(conn != NULL);
+ close(conn->uc_fd);
+ usocklnd_tear_peer_conn(conn);
+ usocklnd_conn_decref(conn);
+ }
+ }
+
+ /* unblock usocklnd_shutdown() */
+ cfs_complete(&pt_data->upt_completion);
+
+ return 0;
+}
+
+/* Returns 0 on success, <0 else */
+int
+usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value)
+{
+ int pt_idx = conn->uc_pt_idx;
+ usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
+ usock_pollrequest_t *pr;
+
+ LIBCFS_ALLOC(pr, sizeof(*pr));
+ if (pr == NULL) {
+ CERROR ("Cannot allocate poll request\n");
+ return -ENOMEM;
+ }
+
+ pr->upr_conn = conn;
+ pr->upr_type = type;
+ pr->upr_value = value;
+
+ usocklnd_conn_addref(conn); /* +1 for poll request */
+
+ pthread_mutex_lock(&pt->upt_pollrequests_lock);
+
+ if (pt->upt_errno) { /* very rare case: errored poll thread */
+ int rc = pt->upt_errno;
+ pthread_mutex_unlock(&pt->upt_pollrequests_lock);
+ usocklnd_conn_decref(conn);
+ LIBCFS_FREE(pr, sizeof(*pr));
+ return rc;
+ }
+
+ list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
+ pthread_mutex_unlock(&pt->upt_pollrequests_lock);
+ return 0;
+}
+
+void
+usocklnd_add_killrequest(usock_conn_t *conn)
+{
+ int pt_idx = conn->uc_pt_idx;
+ usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
+ usock_pollrequest_t *pr = conn->uc_preq;
+
+ /* Use preallocated poll request because there is no good
+ * workaround for ENOMEM error while killing connection */
+ if (pr) {
+ pr->upr_conn = conn;
+ pr->upr_type = POLL_DEL_REQUEST;
+ pr->upr_value = 0;
+
+ usocklnd_conn_addref(conn); /* +1 for poll request */
+
+ pthread_mutex_lock(&pt->upt_pollrequests_lock);
+
+ if (pt->upt_errno) { /* very rare case: errored poll thread */
+ pthread_mutex_unlock(&pt->upt_pollrequests_lock);
+ usocklnd_conn_decref(conn);
+ return; /* conn will be killed in poll thread anyway */
+ }
+
+ list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
+ pthread_mutex_unlock(&pt->upt_pollrequests_lock);
+
+ conn->uc_preq = NULL;
+ }
+}
+
+/* Process poll request. Update poll data.
+ * Returns 0 on success, <0 else */
+int
+usocklnd_process_pollrequest(usock_pollrequest_t *pr,
+ usock_pollthread_t *pt_data)
+{
+ int type = pr->upr_type;
+ short value = pr->upr_value;
+ usock_conn_t *conn = pr->upr_conn;
+ int idx = 0;
+ struct pollfd *pollfd = pt_data->upt_pollfd;
+ int *fd2idx = pt_data->upt_fd2idx;
+ usock_conn_t **idx2conn = pt_data->upt_idx2conn;
+ int *skip = pt_data->upt_skip;
+
+ LASSERT(conn != NULL);
+ LASSERT(conn->uc_fd >=0);
+ LASSERT(type == POLL_ADD_REQUEST ||
+ conn->uc_fd < pt_data->upt_nfd2idx);
+
+ if (type != POLL_ADD_REQUEST) {
+ idx = fd2idx[conn->uc_fd];
+ if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
+ LASSERT(pollfd[idx].fd == conn->uc_fd);
+ } else { /* unlikely */
+ CWARN("Very unlikely event happend: trying to"
+ " handle poll request of type %d but idx=%d"
+ " is out of range [1 ... %d]. Is shutdown"
+ " in progress (%d)?\n",
+ type, idx, pt_data->upt_nfds - 1,
+ usock_data.ud_shutdown);
+
+ LIBCFS_FREE (pr, sizeof(*pr));
+ usocklnd_conn_decref(conn);
+ return 0;
+ }
+ }
+
+ LIBCFS_FREE (pr, sizeof(*pr));
+
+ switch (type) {
+ case POLL_ADD_REQUEST:
+ if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
+ /* resize pollfd[], idx2conn[] and skip[] */
+ struct pollfd *new_pollfd;
+ int new_npollfd = pt_data->upt_npollfd * 2;
+ usock_conn_t **new_idx2conn;
+ int *new_skip;
+
+ new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *
+ sizeof(struct pollfd));
+ if (new_pollfd == NULL)
+ goto process_pollrequest_enomem;
+ pt_data->upt_pollfd = pollfd = new_pollfd;
+
+ new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
+ sizeof(usock_conn_t *));
+ if (new_idx2conn == NULL)
+ goto process_pollrequest_enomem;
+ pt_data->upt_idx2conn = idx2conn = new_idx2conn;
+
+ new_skip = LIBCFS_REALLOC(skip, new_npollfd *
+ sizeof(int));
+ if (new_skip == NULL)
+ goto process_pollrequest_enomem;
+ pt_data->upt_skip = new_skip;
+
+ pt_data->upt_npollfd = new_npollfd;
+ }
+
+ if (conn->uc_fd >= pt_data->upt_nfd2idx) {
+ /* resize fd2idx[] */
+ int *new_fd2idx;
+ int new_nfd2idx = pt_data->upt_nfd2idx * 2;
+
+ while (new_nfd2idx <= conn->uc_fd)
+ new_nfd2idx *= 2;
+
+ new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
+ sizeof(int));
+ if (new_fd2idx == NULL)
+ goto process_pollrequest_enomem;
+
+ pt_data->upt_fd2idx = fd2idx = new_fd2idx;
+ memset(fd2idx + pt_data->upt_nfd2idx, 0,
+ (new_nfd2idx - pt_data->upt_nfd2idx)
+ * sizeof(int));
+ pt_data->upt_nfd2idx = new_nfd2idx;
+ }
+
+ LASSERT(fd2idx[conn->uc_fd] == 0);
+
+ idx = pt_data->upt_nfds++;
+ idx2conn[idx] = conn;
+ fd2idx[conn->uc_fd] = idx;
+
+ pollfd[idx].fd = conn->uc_fd;
+ pollfd[idx].events = value;
+ pollfd[idx].revents = 0;
+ break;
+ case POLL_DEL_REQUEST:
+ fd2idx[conn->uc_fd] = 0; /* invalidate this entry */
+
+ --pt_data->upt_nfds;
+ if (idx != pt_data->upt_nfds) {
+ /* shift last entry into released position */
+ memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
+ sizeof(struct pollfd));
+ idx2conn[idx] = idx2conn[pt_data->upt_nfds];
+ fd2idx[pollfd[idx].fd] = idx;
+ }
+
+ close(conn->uc_fd);
+ list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list);
+ break;
+ case POLL_RX_SET_REQUEST:
+ pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;
+ break;
+ case POLL_TX_SET_REQUEST:
+ pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;
+ break;
+ case POLL_SET_REQUEST:
+ pollfd[idx].events = value;
+ break;
+ default:
+ LBUG(); /* unknown type */
+ }
+
+ /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
+ * reference that poll request possesses */
+ if (type != POLL_ADD_REQUEST)
+ usocklnd_conn_decref(conn);
+
+ return 0;
+
+ process_pollrequest_enomem:
+ usocklnd_conn_decref(conn);
+ return -ENOMEM;
+}
+
+/* Loop on poll data executing handlers repeatedly until
+ * fair_limit is reached or all entries are exhausted */
+void
+usocklnd_execute_handlers(usock_pollthread_t *pt_data)
+{
+ struct pollfd *pollfd = pt_data->upt_pollfd;
+ int nfds = pt_data->upt_nfds;
+ usock_conn_t **idx2conn = pt_data->upt_idx2conn;
+ int *skip = pt_data->upt_skip;
+ int j;
+
+ if (pollfd[0].revents & POLLIN)
+ while (usocklnd_notifier_handler(pollfd[0].fd) > 0)
+ ;
+
+ skip[0] = 1; /* always skip notifier fd */
+
+ for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
+ int prev = 0;
+ int i = skip[0];
+
+ if (i >= nfds) /* nothing ready */
+ break;
+
+ do {
+ usock_conn_t *conn = idx2conn[i];
+ int next;
+
+ if (j == 0) /* first pass... */
+ next = skip[i] = i+1; /* set skip chain */
+ else /* later passes... */
+ next = skip[i]; /* skip unready pollfds */
+
+ /* kill connection if it's closed by peer and
+ * there is no data pending for reading */
+ if ((pollfd[i].revents & POLLERR) != 0 ||
+ (pollfd[i].revents & POLLHUP) != 0) {
+ if ((pollfd[i].events & POLLIN) != 0 &&
+ (pollfd[i].revents & POLLIN) == 0)
+ usocklnd_conn_kill(conn);
+ else
+ usocklnd_exception_handler(conn);
+ }
+
+ if ((pollfd[i].revents & POLLIN) != 0 &&
+ usocklnd_read_handler(conn) <= 0)
+ pollfd[i].revents &= ~POLLIN;
+
+ if ((pollfd[i].revents & POLLOUT) != 0 &&
+ usocklnd_write_handler(conn) <= 0)
+ pollfd[i].revents &= ~POLLOUT;
+
+ if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
+ skip[prev] = next; /* skip this entry next pass */
+ else
+ prev = i;
+
+ i = next;
+ } while (i < nfds);
+ }
+}
+
+int
+usocklnd_calculate_chunk_size(int num)
+{
+ const int n = 4;
+ const int p = usock_tuns.ut_poll_timeout;
+ int chunk = num;
+
+ /* chunk should be big enough to detect a timeout on any
+ * connection within (n+1)/n times the timeout interval
+ * if we checks every 'p' seconds 'chunk' conns */
+
+ if (usock_tuns.ut_timeout > n * p)
+ chunk = (chunk * n * p) / usock_tuns.ut_timeout;
+
+ if (chunk == 0)
+ chunk = 1;
+
+ return chunk;
+}
+
+void
+usocklnd_wakeup_pollthread(int i)
+{
+ usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
+ int notification = 0;
+ int rc;
+
+ rc = syscall(SYS_write, pt->upt_notifier_fd, ¬ification,
+ sizeof(notification));
+
+ if (rc != sizeof(notification))
+ CERROR("Very unlikely event happend: "
+ "cannot write to notifier fd (rc=%d; errno=%d)\n",
+ rc, errno);
+}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2002 Eric Hoffman
- *
- * This file is part of Lustre, http://www.lustre.org.
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-/* timer.c:
- * this file implements a simple priority-queue based timer system. when
- * combined with a file which implements now() and block(), it can
- * be used to provide course-grained time-based callbacks.
- */
-
-#include <pqtimer.h>
-#include <stdlib.h>
-#include <string.h>
-
-struct timer {
- void (*function)(void *);
- void *arg;
- when w;
- int interval;
- int disable;
-};
-
-typedef struct thunk *thunk;
-struct thunk {
- void (*f)(void *);
- void *a;
- thunk next;
-};
-
-extern when now(void);
-
-static thunk thunks;
-static int internal;
-static void (*block_function)(when);
-static int number_of_timers;
-static int size_of_pqueue;
-static timer *timers;
-
-
-static void heal(int where)
-{
- int left=(where<<1);
- int right=(where<<1)+1;
- int min=where;
- timer temp;
-
- if (left <= number_of_timers)
- if (timers[left]->w < timers[min]->w) min=left;
- if (right <= number_of_timers)
- if (timers[right]->w < timers[min]->w) min=right;
- if (min != where){
- temp=timers[where];
- timers[where]=timers[min];
- timers[min]=temp;
- heal(min);
- }
-}
-
-static void add_pqueue(int i)
-{
- timer temp;
- int parent=(i>>1);
- if ((i>1) && (timers[i]->w< timers[parent]->w)){
- temp=timers[i];
- timers[i]=timers[parent];
- timers[parent]=temp;
- add_pqueue(parent);
- }
-}
-
-static void add_timer(timer t)
-{
- if (size_of_pqueue<(number_of_timers+2)){
- int oldsize=size_of_pqueue;
- timer *new=(void *)malloc(sizeof(struct timer)*(size_of_pqueue+=10));
- memcpy(new,timers,sizeof(timer)*oldsize);
- timers=new;
- }
- timers[++number_of_timers]=t;
- add_pqueue(number_of_timers);
-}
-
-/* Function: register_timer
- * Arguments: interval: the time interval from the current time when
- * the timer function should be called
- * function: the function to call when the time has expired
- * argument: the argument to call it with.
- * Returns: a pointer to a timer structure
- */
-timer register_timer(when interval,
- void (*function)(void *),
- void *argument)
-{
- timer t=(timer)malloc(sizeof(struct timer));
-
- t->arg=argument;
- t->function=function;
- t->interval=interval;
- t->disable=0;
- t->w=now()+interval;
- add_timer(t);
- if (!internal && (number_of_timers==1))
- block_function(t->w);
- return(t);
-}
-
-/* Function: remove_timer
- * Arguments: t:
- * Returns: nothing
- *
- * remove_timer removes a timer from the system, insuring
- * that it will never be called. It does not actually
- * free the timer due to reentrancy issues.
- */
-
-void remove_timer(timer t)
-{
- t->disable=1;
-}
-
-
-
-void timer_fire()
-{
- timer current;
-
- current=timers[1];
- timers[1]=timers[number_of_timers--];
- heal(1);
- if (!current->disable) {
- (*current->function)(current->arg);
- }
- free(current);
-}
-
-when next_timer(void)
-{
- when here=now();
-
- while (number_of_timers && (timers[1]->w <= here)) timer_fire();
- if (number_of_timers) return(timers[1]->w);
- return(0);
-}
-
-/* Function: timer_loop
- * Arguments: none
- * Returns: never
- *
- * timer_loop() is the blocking dispatch function for the timer.
- * Is calls the block() function registered with init_timer,
- * and handles associated with timers that have been registered.
- */
-void timer_loop()
-{
- when here;
-
- while (1){
- thunk z;
- here=now();
-
- for (z=thunks;z;z=z->next) (*z->f)(z->a);
-
- if (number_of_timers){
- if (timers[1]->w > here){
- (*block_function)(timers[1]->w);
- } else {
- timer_fire();
- }
- } else {
- thunk z;
- for (z=thunks;z;z=z->next) (*z->f)(z->a);
- (*block_function)(0);
- }
- }
-}
-
-
-/* Function: register_thunk
- * Arguments: f: the function to call
- * a: the single argument to call it with
- *
- * Thunk functions get called at irregular intervals, they
- * should not assume when, or take a particularily long
- * amount of time. Thunks are for background cleanup tasks.
- */
-void register_thunk(void (*f)(void *),void *a)
-{
- thunk t=(void *)malloc(sizeof(struct thunk));
- t->f=f;
- t->a=a;
- t->next=thunks;
- thunks=t;
-}
-
-/* Function: initialize_timer
- * Arguments: block: the function to call to block for the specified interval
- *
- * initialize_timer() must be called before any other timer function,
- * including timer_loop.
- */
-void initialize_timer(void (*block)(when))
-{
- block_function=block;
- number_of_timers=0;
- size_of_pqueue=10;
- timers=(timer *)malloc(sizeof(timer)*size_of_pqueue);
- thunks=0;
-}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2002 Eric Hoffman
- *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
- */
-
-typedef unsigned long long when;
-when now(void);
-typedef struct timer *timer;
-timer register_timer(when interval,
- void (*function)(void *),
- void *argument);
-timer register_timer_wait(void);
-void remove_timer(timer);
-void timer_loop(void);
-void initialize_timer(void (*block)(when));
-void timer_fire(void);
-
-
-#define HZ 0x100000000ull
-
-
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2003 Cluster File Systems, Inc.
- *
- * This file is part of Lustre, http://www.lustre.org.
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-/* api.c:
- * This file provides the 'api' side for the process-based nals.
- * it is responsible for creating the 'library' side thread,
- * and passing wrapped portals transactions to it.
- *
- * Along with initialization, shutdown, and transport to the library
- * side, this file contains some stubs to satisfy the nal definition.
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-#ifndef __CYGWIN__
-# include <syscall.h>
-#endif
-#include <netdb.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <procbridge.h>
-#include <pqtimer.h>
-#include <dispatch.h>
-#include <errno.h>
-#ifdef HAVE_GETHOSTBYNAME
-# include <sys/utsname.h>
-#endif
-
-#if !HAVE_LIBPTHREAD
-# error "This LND requires a multi-threaded runtime"
-#endif
-
-/* XXX CFS workaround, to give a chance to let nal thread wake up
- * from waiting in select
- */
-static int procbridge_notifier_handler(void *arg)
-{
- static char buf[8];
- procbridge p = (procbridge) arg;
-
- syscall(SYS_read, p->notifier[1], buf, sizeof(buf));
- return 1;
-}
-
-void procbridge_wakeup_nal(procbridge p)
-{
- static char buf[8];
- syscall(SYS_write, p->notifier[0], buf, sizeof(buf));
-}
-
-lnd_t the_tcplnd = {
- .lnd_type = SOCKLND,
- .lnd_startup = procbridge_startup,
- .lnd_shutdown = procbridge_shutdown,
- .lnd_send = tcpnal_send,
- .lnd_recv = tcpnal_recv,
- .lnd_notify = tcpnal_notify,
-};
-int tcpnal_running;
-
-/* Function: shutdown
- * Arguments: ni: the instance of me
- *
- * cleanup nal state, reclaim the lower side thread and
- * its state using PTL_FINI codepoint
- */
-void
-procbridge_shutdown(lnet_ni_t *ni)
-{
- bridge b=(bridge)ni->ni_data;
- procbridge p=(procbridge)b->local;
-
- p->nal_flags |= NAL_FLAG_STOPPING;
- procbridge_wakeup_nal(p);
-
- do {
- pthread_mutex_lock(&p->mutex);
- if (p->nal_flags & NAL_FLAG_STOPPED) {
- pthread_mutex_unlock(&p->mutex);
- break;
- }
- pthread_cond_wait(&p->cond, &p->mutex);
- pthread_mutex_unlock(&p->mutex);
- } while (1);
-
- free(p);
- tcpnal_running = 0;
-}
-
-#ifdef ENABLE_SELECT_DISPATCH
-procbridge __global_procbridge = NULL;
-#endif
-
-/* Function: procbridge_startup
- *
- * Arguments: ni: the instance of me
- * interfaces: ignored
- *
- * Returns: portals rc
- *
- * initializes the tcp nal. we define unix_failure as an
- * error wrapper to cut down clutter.
- */
-int
-procbridge_startup (lnet_ni_t *ni)
-{
- procbridge p;
- bridge b;
- int rc;
-
- /* NB The local NID is not assigned. We only ever connect to the socknal,
- * which assigns the src nid/pid on incoming non-privileged connections
- * (i.e. us), and we don't accept connections. */
-
- LASSERT (ni->ni_lnd == &the_tcplnd);
- LASSERT (!tcpnal_running); /* only single instance supported */
- LASSERT (ni->ni_interfaces[0] == NULL); /* explicit interface(s) not supported */
-
- /* The credit settings here are pretty irrelevent. Userspace tcplnd has no
- * tx descriptor pool to exhaust and does a blocking send; that's the real
- * limit on send concurrency. */
- ni->ni_maxtxcredits = 1000;
- ni->ni_peertxcredits = 1000;
-
- init_unix_timer();
-
- b=(bridge)malloc(sizeof(struct bridge));
- p=(procbridge)malloc(sizeof(struct procbridge));
- b->local=p;
- b->b_ni = ni;
- ni->ni_data = b;
-
- /* init procbridge */
- pthread_mutex_init(&p->mutex,0);
- pthread_cond_init(&p->cond, 0);
- p->nal_flags = 0;
-
- /* initialize notifier */
- if (socketpair(AF_UNIX, SOCK_STREAM, 0, p->notifier)) {
- perror("socketpair failed");
- rc = -errno;
- return rc;
- }
-
- if (!register_io_handler(p->notifier[1], READ_HANDLER,
- procbridge_notifier_handler, p)) {
- perror("fail to register notifier handler");
- return -ENOMEM;
- }
-
-#ifdef ENABLE_SELECT_DISPATCH
- __global_procbridge = p;
-#endif
-
- /* create nal thread */
- rc = pthread_create(&p->t, NULL, nal_thread, b);
- if (rc != 0) {
- perror("nal_init: pthread_create");
- return -ESRCH;
- }
-
- do {
- pthread_mutex_lock(&p->mutex);
- if (p->nal_flags & (NAL_FLAG_RUNNING | NAL_FLAG_STOPPED)) {
- pthread_mutex_unlock(&p->mutex);
- break;
- }
- pthread_cond_wait(&p->cond, &p->mutex);
- pthread_mutex_unlock(&p->mutex);
- } while (1);
-
- if (p->nal_flags & NAL_FLAG_STOPPED)
- return -ENETDOWN;
-
- tcpnal_running = 1;
-
- return 0;
-}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2003 Cluster File Systems, Inc.
- *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
- */
-
-#ifndef _PROCBRIDGE_H_
-#define _PROCBRIDGE_H_
-
-#include <pthread.h>
-#include <bridge.h>
-
-
-#define NAL_FLAG_RUNNING 1
-#define NAL_FLAG_STOPPING 2
-#define NAL_FLAG_STOPPED 4
-
-typedef struct procbridge {
- /* sync between user threads and nal thread */
- pthread_t t;
- pthread_cond_t cond;
- pthread_mutex_t mutex;
-
- /* socket pair used to notify nal thread */
- int notifier[2];
-
- int nal_flags;
-
-} *procbridge;
-
-typedef struct nal_init_args {
- lnet_pid_t nia_requested_pid;
- bridge nia_bridge;
-} nal_init_args_t;
-
-extern void *nal_thread(void *);
-
-extern void procbridge_wakeup_nal(procbridge p);
-
-extern int procbridge_startup (lnet_ni_t *);
-extern void procbridge_shutdown (lnet_ni_t *);
-
-extern void tcpnal_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive);
-
-extern int tcpnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg);
-int tcpnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *cookie,
- int delayed, unsigned int niov,
- struct iovec *iov, lnet_kiov_t *kiov,
- unsigned int offset, unsigned int mlen, unsigned int rlen);
-extern int tcpnal_set_global_params();
-
-
-
-
-#endif
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2003 Cluster File Systems, Inc.
- *
- * This file is part of Lustre, http://www.lustre.org.
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-/* lib.c:
- * This file provides the 'library' side for the process-based nals.
- * it is responsible for communication with the 'api' side and
- * providing service to the generic portals 'library'
- * implementation. 'library' might be better termed 'communication'
- * or 'kernel'.
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <stdarg.h>
-#include <unistd.h>
-#include <procbridge.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <errno.h>
-#include <timer.h>
-#include <dispatch.h>
-
-/* the following functions are stubs to satisfy the nal definition
- without doing anything particularily useful*/
-extern int tcpnal_init(bridge);
-extern void tcpnal_shutdown(bridge);
-
-static void check_stopping(void *z)
-{
- bridge b = z;
- procbridge p = b->local;
-
- if ((p->nal_flags & NAL_FLAG_STOPPING) == 0)
- return;
-
- tcpnal_shutdown(b);
-
- pthread_mutex_lock(&p->mutex);
- p->nal_flags |= NAL_FLAG_STOPPED;
- pthread_cond_broadcast(&p->cond);
- pthread_mutex_unlock(&p->mutex);
-
- pthread_exit(0);
-}
-
-
-/* Function: nal_thread
- * Arguments: z: an opaque reference to a nal control structure
- * allocated and partially populated by the api level code
- * Returns: nothing, and only on error or explicit shutdown
- *
- * This function is the entry point of the pthread initiated on
- * the api side of the interface. This thread is used to handle
- * asynchronous delivery to the application.
- *
- * We define a limit macro to place a ceiling on limits
- * for syntactic convenience
- */
-
-void *nal_thread(void *z)
-{
- bridge b = (bridge) z;
- procbridge p=b->local;
- int rc;
-
- rc = tcpnal_init(b);
-
- /*
- * Whatever the initialization returned is passed back to the
- * user level code for further interpretation. We just exit if
- * it is non-zero since something went wrong.
- */
-
- pthread_mutex_lock(&p->mutex);
- p->nal_flags |= (rc != 0) ? NAL_FLAG_STOPPED : NAL_FLAG_RUNNING;
- pthread_cond_broadcast(&p->cond);
- pthread_mutex_unlock(&p->mutex);
-
- if (rc == 0) {
- /* the thunk function is called each time the timer loop
- performs an operation and returns to blocking mode. we
- overload this function to inform the api side that
- it may be interested in looking at the event queue */
- register_thunk(check_stopping,b);
- timer_loop();
- }
- return(0);
-}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2002 Eric Hoffman
- *
- * This file is part of Lustre, http://www.lustre.org.
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-/* select.c:
- * Provides a general mechanism for registering and dispatching
- * io events through the select system call.
- */
-
-#define DEBUG_SUBSYSTEM S_LND
-
-#ifdef sun
-#include <sys/filio.h>
-#else
-#include <sys/ioctl.h>
-#endif
-
-#include <sys/time.h>
-#include <sys/types.h>
-#include <stdlib.h>
-#include <syscall.h>
-#include <pthread.h>
-#include <errno.h>
-#include <pqtimer.h>
-#include <dispatch.h>
-#include <procbridge.h>
-
-
-static struct timeval beginning_of_epoch;
-static io_handler io_handlers;
-
-/* Function: now
- *
- * Return: the current time in canonical units: a 64 bit number
- * where the most significant 32 bits contains the number
- * of seconds, and the least signficant a count of (1/(2^32))ths
- * of a second.
- */
-when now()
-{
- struct timeval result;
-
- gettimeofday(&result,0);
- return((((unsigned long long)result.tv_sec)<<32)|
- (((unsigned long long)result.tv_usec)<<32)/1000000);
-}
-
-
-/* Function: register_io_handler
- * Arguments: fd: the file descriptor of interest
- * type: a mask of READ_HANDLER, WRITE_HANDLER, EXCEPTION_HANDLER
- * function: a function to call when io is available on fd
- * arg: an opaque correlator to return to the handler
- * Returns: a pointer to the io_handler structure
- */
-io_handler register_io_handler(int fd,
- int type,
- int (*function)(void *),
- void *arg)
-{
- io_handler i=(io_handler)malloc(sizeof(struct io_handler));
- if ((i->fd=fd)>=0){
- i->type=type;
- i->function=function;
- i->argument=arg;
- i->disabled=0;
- i->last=&io_handlers;
- if ((i->next=io_handlers)) i->next->last=&i->next;
- io_handlers=i;
- }
- return(i);
-}
-
-/* Function: remove_io_handler
- * Arguments: i: a pointer to the handler to stop servicing
- *
- * remove_io_handler() doesn't actually free the handler, due
- * to reentrancy problems. it just marks the handler for
- * later cleanup by the blocking function.
- */
-void remove_io_handler (io_handler i)
-{
- i->disabled=1;
-}
-
-static void set_flag(io_handler n,fd_set *r, fd_set *w, fd_set *e)
-{
- if (n->type & READ_HANDLER) FD_SET(n->fd, r);
- if (n->type & WRITE_HANDLER) FD_SET(n->fd, w);
- if (n->type & EXCEPTION_HANDLER) FD_SET(n->fd, e);
-}
-
-static int prepare_fd_sets(fd_set *r, fd_set *w, fd_set *e)
-{
- io_handler j;
- io_handler *k;
- int max = 0;
-
- FD_ZERO(r);
- FD_ZERO(w);
- FD_ZERO(e);
- for (k=&io_handlers;*k;){
- if ((*k)->disabled){
- j=*k;
- *k=(*k)->next;
- free(j);
- }
- if (*k) {
- set_flag(*k,r,w,e);
- if ((*k)->fd > max)
- max = (*k)->fd;
- k=&(*k)->next;
- }
- }
- return max + 1;
-}
-
-static int execute_callbacks(fd_set *r, fd_set *w, fd_set *e)
-{
- io_handler j;
- int n = 0, t;
-
- for (j = io_handlers; j; j = j->next) {
- if (j->disabled)
- continue;
-
- t = 0;
- if (FD_ISSET(j->fd, r) && (j->type & READ_HANDLER)) {
- FD_CLR(j->fd, r);
- t++;
- }
- if (FD_ISSET(j->fd, w) && (j->type & WRITE_HANDLER)) {
- FD_CLR(j->fd, w);
- t++;
- }
- if (FD_ISSET(j->fd, e) && (j->type & EXCEPTION_HANDLER)) {
- FD_CLR(j->fd, e);
- t++;
- }
- if (t == 0)
- continue;
-
- if (!(*j->function)(j->argument))
- j->disabled = 1;
-
- n += t;
- }
-
- return n;
-}
-
-#ifdef ENABLE_SELECT_DISPATCH
-
-static struct {
- pthread_mutex_t mutex;
- pthread_cond_t cond;
- int submitted;
- int nready;
- int maxfd;
- fd_set *rset;
- fd_set *wset;
- fd_set *eset;
- struct timeval *timeout;
- struct timeval submit_time;
-} fd_extra = {
- PTHREAD_MUTEX_INITIALIZER,
- PTHREAD_COND_INITIALIZER,
- 0, 0, 0,
- NULL, NULL, NULL, NULL,
-};
-
-extern int liblustre_wait_event(int timeout);
-extern procbridge __global_procbridge;
-
-/*
- * this will intercept syscall select() of user apps
- * such as MPI libs.
- */
-int select(int n, fd_set *rset, fd_set *wset, fd_set *eset,
- struct timeval *timeout)
-{
- LASSERT(fd_extra.submitted == 0);
-
- fd_extra.nready = 0;
- fd_extra.maxfd = n;
- fd_extra.rset = rset;
- fd_extra.wset = wset;
- fd_extra.eset = eset;
- fd_extra.timeout = timeout;
-
- liblustre_wait_event(0);
- pthread_mutex_lock(&fd_extra.mutex);
- gettimeofday(&fd_extra.submit_time, NULL);
- fd_extra.submitted = 1;
- LASSERT(__global_procbridge);
- procbridge_wakeup_nal(__global_procbridge);
-
-again:
- if (fd_extra.submitted)
- pthread_cond_wait(&fd_extra.cond, &fd_extra.mutex);
- pthread_mutex_unlock(&fd_extra.mutex);
-
- liblustre_wait_event(0);
-
- pthread_mutex_lock(&fd_extra.mutex);
- if (fd_extra.submitted)
- goto again;
- pthread_mutex_unlock(&fd_extra.mutex);
-
- LASSERT(fd_extra.nready >= 0);
- LASSERT(fd_extra.submitted == 0);
- return fd_extra.nready;
-}
-
-static int merge_fds(int max, fd_set *rset, fd_set *wset, fd_set *eset)
-{
- int i;
-
- LASSERT(rset);
- LASSERT(wset);
- LASSERT(eset);
-
- for (i = 0; i < __FD_SETSIZE/__NFDBITS; i++) {
- LASSERT(!fd_extra.rset ||
- !(__FDS_BITS(rset)[i] & __FDS_BITS(fd_extra.rset)[i]));
- LASSERT(!fd_extra.wset ||
- !(__FDS_BITS(wset)[i] & __FDS_BITS(fd_extra.wset)[i]));
- LASSERT(!fd_extra.eset ||
- !(__FDS_BITS(eset)[i] & __FDS_BITS(fd_extra.eset)[i]));
-
- if (fd_extra.rset && __FDS_BITS(fd_extra.rset)[i])
- __FDS_BITS(rset)[i] |= __FDS_BITS(fd_extra.rset)[i];
- if (fd_extra.wset && __FDS_BITS(fd_extra.wset)[i])
- __FDS_BITS(wset)[i] |= __FDS_BITS(fd_extra.wset)[i];
- if (fd_extra.eset && __FDS_BITS(fd_extra.eset)[i])
- __FDS_BITS(eset)[i] |= __FDS_BITS(fd_extra.eset)[i];
- }
-
- return (fd_extra.maxfd > max ? fd_extra.maxfd : max);
-}
-
-static inline
-int timeval_ge(struct timeval *tv1, struct timeval *tv2)
-{
- LASSERT(tv1 && tv2);
- return ((tv1->tv_sec - tv2->tv_sec) * 1000000 +
- (tv1->tv_usec - tv2->tv_usec) >= 0);
-}
-
-/*
- * choose the most recent timeout value
- */
-static struct timeval *choose_timeout(struct timeval *tv1,
- struct timeval *tv2)
-{
- if (!tv1)
- return tv2;
- else if (!tv2)
- return tv1;
-
- if (timeval_ge(tv1, tv2))
- return tv2;
- else
- return tv1;
-}
-
-/* Function: select_timer_block
- * Arguments: until: an absolute time when the select should return
- *
- * This function dispatches the various file descriptors' handler
- * functions, if the kernel indicates there is io available.
- */
-void select_timer_block(when until)
-{
- fd_set fds[3];
- struct timeval timeout;
- struct timeval *timeout_pointer, *select_timeout;
- int max, nready, nexec;
- int fd_handling;
-
-again:
- if (until) {
- when interval;
-
- interval = until - now();
- timeout.tv_sec = (interval >> 32);
- timeout.tv_usec = ((interval << 32) / 1000000) >> 32;
- timeout_pointer = &timeout;
- } else
- timeout_pointer = NULL;
-
- fd_handling = 0;
- max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]);
- select_timeout = timeout_pointer;
-
- pthread_mutex_lock(&fd_extra.mutex);
- fd_handling = fd_extra.submitted;
- pthread_mutex_unlock(&fd_extra.mutex);
- if (fd_handling) {
- max = merge_fds(max, &fds[0], &fds[1], &fds[2]);
- select_timeout = choose_timeout(timeout_pointer, fd_extra.timeout);
- }
-
- /* XXX only compile for linux */
-#if (__WORDSIZE == 64) && !defined(__mips64__)
- nready = syscall(SYS_select, max, &fds[0], &fds[1], &fds[2],
- select_timeout);
-#else
- nready = syscall(SYS__newselect, max, &fds[0], &fds[1], &fds[2],
- select_timeout);
-#endif
- if (nready < 0) {
- CERROR("select return err %d, errno %d\n", nready, errno);
- return;
- }
-
- if (nready) {
- nexec = execute_callbacks(&fds[0], &fds[1], &fds[2]);
- nready -= nexec;
- } else
- nexec = 0;
-
- /* even both nready & nexec are 0, we still need try to wakeup
- * upper thread since it may have timed out
- */
- if (fd_handling) {
- LASSERT(nready >= 0);
-
- pthread_mutex_lock(&fd_extra.mutex);
- if (nready) {
- if (fd_extra.rset)
- *fd_extra.rset = fds[0];
- if (fd_extra.wset)
- *fd_extra.wset = fds[1];
- if (fd_extra.eset)
- *fd_extra.eset = fds[2];
- fd_extra.nready = nready;
- fd_extra.submitted = 0;
- } else {
- struct timeval t;
-
- fd_extra.nready = 0;
- if (fd_extra.timeout) {
- gettimeofday(&t, NULL);
- if (timeval_ge(&t, &fd_extra.submit_time))
- fd_extra.submitted = 0;
- }
- }
-
- pthread_cond_signal(&fd_extra.cond);
- pthread_mutex_unlock(&fd_extra.mutex);
- }
-
- /* haven't found portals event, go back to loop if time
- * is not expired */
- if (!nexec) {
- if (timeout_pointer == NULL || now() >= until)
- goto again;
- }
-}
-
-#else /* !ENABLE_SELECT_DISPATCH */
-
-/* Function: select_timer_block
- * Arguments: until: an absolute time when the select should return
- *
- * This function dispatches the various file descriptors' handler
- * functions, if the kernel indicates there is io available.
- */
-void select_timer_block(when until)
-{
- fd_set fds[3];
- struct timeval timeout;
- struct timeval *timeout_pointer;
- int max, nready;
-
- if (until) {
- when interval;
- interval = until - now();
- timeout.tv_sec = (interval >> 32);
- timeout.tv_usec = ((interval << 32) / 1000000) >> 32;
- timeout_pointer = &timeout;
- } else
- timeout_pointer = NULL;
-
- max = prepare_fd_sets(&fds[0], &fds[1], &fds[2]);
-
- nready = select(max, &fds[0], &fds[1], &fds[2], timeout_pointer);
- if (nready > 0)
- execute_callbacks(&fds[0], &fds[1], &fds[2]);
-}
-#endif /* ENABLE_SELECT_DISPATCH */
-
-/* Function: init_unix_timer()
- * is called to initialize the library
- */
-void init_unix_timer()
-{
- io_handlers=0;
- gettimeofday(&beginning_of_epoch, 0);
- initialize_timer(select_timer_block);
-}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2002 Eric Hoffman
- *
- * This file is part of Lustre, http://www.lustre.org.
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-#include <table.h>
-#include <stdlib.h>
-#include <string.h>
-
-
-/* table.c:
- * a very simple hash table implementation with paramerterizable
- * comparison and key generation functions. it does resize
- * in order to accomidate more entries, but never collapses
- * the table
- */
-
-static table_entry *table_lookup (table t,void *comparator,
- unsigned int k,
- int (*compare_function)(void *, void *),
- int *success)
-{
- unsigned int key=k%t->size;
- table_entry *i;
-
- for (i=&(t->entries[key]);*i;i=&((*i)->next)){
- if (compare_function && ((*i)->key==k))
- if ((*t->compare_function)((*i)->value,comparator)){
- *success=1;
- return(i);
- }
- }
- *success=0;
- return(&(t->entries[key]));
-}
-
-
-static void resize_table(table t, int size)
-{
- int old_size=t->size;
- table_entry *old_entries=t->entries;
- int i;
- table_entry j,n;
- table_entry *position;
- int success;
-
- t->size=size;
- t->entries=(table_entry *)malloc(sizeof(table_entry)*t->size);
- memset(t->entries,0,sizeof(table_entry)*t->size);
-
- for (i=0;i<old_size;i++)
- for (j=old_entries[i];j;j=n){
- n=j->next;
- position=table_lookup(t,0,j->key,0,&success);
- j->next= *position;
- *position=j;
- }
- free(old_entries);
-}
-
-
-/* Function: key_from_int
- * Arguments: int i: value to compute the key of
- * Returns: the key
- */
-unsigned int key_from_int(int i)
-{
- return(i);
-}
-
-
-/* Function: key_from_string
- * Arguments: char *s: the null terminated string
- * to compute the key of
- * Returns: the key
- */
-unsigned int key_from_string(char *s)
-{
- unsigned int result=0, i;
- unsigned char *n;
-
- if (!s)
- return(1);
- for (n = (unsigned char *)s, i = 0; *n; n++, i++)
- result ^= (*n * 57) ^ *n * i;
-
- return(result);
-}
-
-
-/* Function: hash_create_table
- * Arguments: compare_function: a function to compare
- * a table instance with a correlator
- * key_function: a function to generate a 32 bit
- * hash key from a correlator
- * Returns: a pointer to the new table
- */
-table hash_create_table (int (*compare_function)(void *, void *),
- unsigned int (*key_function)(void *))
-{
- table new=(table)malloc(sizeof(struct table));
- memset(new, 0, sizeof(struct table));
-
- new->compare_function=compare_function;
- new->key_function=key_function;
- new->number_of_entries=0;
- new->size=4;
- new->entries=(table_entry *)malloc(sizeof(table_entry)*new->size);
- memset(new->entries,0,sizeof(table_entry)*new->size);
- return(new);
-}
-
-
-/* Function: hash_table_find
- * Arguments: t: a table to look in
- * comparator: a value to access the table entry
- * Returns: the element references to by comparator, or null
- */
-void *hash_table_find (table t, void *comparator)
-{
- int success;
- table_entry* entry=table_lookup(t,comparator,
- (*t->key_function)(comparator),
- t->compare_function,
- &success);
- if (success) return((*entry)->value);
- return(0);
-}
-
-
-/* Function: hash_table_insert
- * Arguments: t: a table to insert the object
- * value: the object to put in the table
- * comparator: the value by which the object
- * will be addressed
- * Returns: nothing
- */
-void hash_table_insert (table t, void *value, void *comparator)
-{
- int success;
- unsigned int k=(*t->key_function)(comparator);
- table_entry *position=table_lookup(t,comparator,k,
- t->compare_function,&success);
- table_entry entry;
-
- if (success) {
- entry = *position;
- } else {
- entry = (table_entry)malloc(sizeof(struct table_entry));
- memset(entry, 0, sizeof(struct table_entry));
- entry->next= *position;
- *position=entry;
- t->number_of_entries++;
- }
- entry->value=value;
- entry->key=k;
- if (t->number_of_entries > t->size) resize_table(t,t->size*2);
-}
-
-/* Function: hash_table_remove
- * Arguments: t: the table to remove the object from
- * comparator: the index value of the object to remove
- * Returns:
- */
-void hash_table_remove (table t, void *comparator)
-{
- int success;
- table_entry temp;
- table_entry *position=table_lookup(t,comparator,
- (*t->key_function)(comparator),
- t->compare_function,&success);
- if(success) {
- temp=*position;
- *position=(*position)->next;
- free(temp); /* the value? */
- t->number_of_entries--;
- }
-}
-
-/* Function: hash_iterate_table_entries
- * Arguments: t: the table to iterate over
- * handler: a function to call with each element
- * of the table, along with arg
- * arg: the opaque object to pass to handler
- * Returns: nothing
- */
-void hash_iterate_table_entries(table t,
- void (*handler)(void *,void *),
- void *arg)
-{
- int i;
- table_entry *j,*next;
-
- for (i=0;i<t->size;i++)
- for (j=t->entries+i;*j;j=next){
- next=&((*j)->next);
- (*handler)(arg,(*j)->value);
- }
-}
-
-/* Function: hash_filter_table_entries
- * Arguments: t: the table to iterate over
- * handler: a function to call with each element
- * of the table, along with arg
- * arg: the opaque object to pass to handler
- * Returns: nothing
- * Notes: operations on the table inside handler are not safe
- *
- * filter_table_entires() calls the handler function for each
- * item in the table, passing it and arg. The handler function
- * returns 1 if it is to be retained in the table, and 0
- * if it is to be removed.
- */
-void hash_filter_table_entries(table t, int (*handler)(void *, void *), void *arg)
-{
- int i;
- table_entry *j,*next,v;
-
- for (i=0;i<t->size;i++)
- for (j=t->entries+i;*j;j=next){
- next=&((*j)->next);
- if (!(*handler)(arg,(*j)->value)){
- next=j;
- v=*j;
- *j=(*j)->next;
- free(v);
- t->number_of_entries--;
- }
- }
-}
-
-/* Function: destroy_table
- * Arguments: t: the table to free
- * thunk: a function to call with each element,
- * most likely free()
- * Returns: nothing
- */
-void hash_destroy_table(table t,void (*thunk)(void *))
-{
- table_entry j,next;
- int i;
- for (i=0;i<t->size;i++)
- for (j=t->entries[i];j;j=next){
- next=j->next;
- if (thunk) (*thunk)(j->value);
- free(j);
- }
- free(t->entries);
- free(t);
-}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2002 Eric Hoffman
- *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
- */
-
-#ifndef E_TABLE
-#define E_TABLE
-
-typedef struct table_entry {
- unsigned int key;
- void *value;
- struct table_entry *next;
-} *table_entry;
-
-
-typedef struct table {
- unsigned int size;
- int number_of_entries;
- table_entry *entries;
- int (*compare_function)(void *, void *);
- unsigned int (*key_function)(void *);
-} *table;
-
-/* table.c */
-unsigned int key_from_int(int i);
-unsigned int key_from_string(char *s);
-table hash_create_table(int (*compare_function)(void *, void *),
- unsigned int (*key_function)(void *));
-void *hash_table_find(table t, void *comparator);
-void hash_table_insert(table t, void *value, void *comparator);
-void hash_table_remove(table t, void *comparator);
-void hash_iterate_table_entries(table t, void (*handler)(void *, void *), void *arg);
-void hash_filter_table_entries(table t, int (*handler)(void *, void *), void *arg);
-void hash_destroy_table(table t, void (*thunk)(void *));
-
-#endif
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2003 Cluster File Systems, Inc.
- *
- * This file is part of Lustre, http://www.lustre.org.
- *
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
- *
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- */
-
-/* tcpnal.c:
- This file implements the TCP-based nal by providing glue
- between the connection service and the generic NAL implementation */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <stdarg.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <pqtimer.h>
-#include <dispatch.h>
-#include <procbridge.h>
-#include <connection.h>
-#include <errno.h>
-
-#ifndef __CYGWIN__
-#include <syscall.h>
-#endif
-
-void
-tcpnal_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
-{
- bridge b = (bridge)ni->ni_data;
- connection c;
-
- if (!alive) {
- LBUG();
- }
-
- c = force_tcp_connection((manager)b->lower, nid, b->local);
- if (c == NULL)
- CERROR("Can't create connection to %s\n",
- libcfs_nid2str(nid));
-}
-
-/*
- * sends a packet to the peer, after insuring that a connection exists
- */
-int tcpnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
-{
- lnet_hdr_t *hdr = &lntmsg->msg_hdr;
- lnet_process_id_t target = lntmsg->msg_target;
- unsigned int niov = lntmsg->msg_niov;
- struct iovec *iov = lntmsg->msg_iov;
- unsigned int offset = lntmsg->msg_offset;
- unsigned int len = lntmsg->msg_len;
-
- connection c;
- bridge b = (bridge)ni->ni_data;
- struct iovec tiov[257];
- static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER;
- int rc = 0;
- int sysrc;
- int total;
- int ntiov;
- int i;
-
- if (!(c = force_tcp_connection((manager)b->lower, target.nid,
- b->local)))
- return(-EIO);
-
- /* TODO: these results should be checked. furthermore, provision
- must be made for the SIGPIPE which is delivered when
- writing on a tcp socket which has closed underneath
- the application. there is a linux flag in the sendmsg
- call which turns off the signally behaviour, but its
- nonstandard */
-
- LASSERT (niov <= 256);
- LASSERT (len == 0 || iov != NULL); /* I don't understand kiovs */
-
- tiov[0].iov_base = hdr;
- tiov[0].iov_len = sizeof(lnet_hdr_t);
- ntiov = 1 + lnet_extract_iov(256, &tiov[1], niov, iov, offset, len);
-
- pthread_mutex_lock(&send_lock);
-#if 1
- for (i = total = 0; i < ntiov; i++)
- total += tiov[i].iov_len;
-
- sysrc = syscall(SYS_writev, c->fd, tiov, ntiov);
- if (sysrc != total) {
- fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n",
- rc, total, errno);
- rc = -errno;
- }
-#else
- for (i = total = 0; i <= ntiov; i++) {
- rc = send(c->fd, tiov[i].iov_base, tiov[i].iov_len, 0);
-
- if (rc != tiov[i].iov_len) {
- fprintf (stderr, "BAD SEND rc %d != %d, errno %d\n",
- rc, tiov[i].iov_len, errno);
- rc = -errno;
- break;
- }
- total += rc;
- }
-#endif
-#if 0
- fprintf (stderr, "sent %s total %d in %d frags\n",
- hdr->type == LNET_MSG_ACK ? "ACK" :
- hdr->type == LNET_MSG_PUT ? "PUT" :
- hdr->type == LNET_MSG_GET ? "GET" :
- hdr->type == LNET_MSG_REPLY ? "REPLY" :
- hdr->type == LNET_MSG_HELLO ? "HELLO" : "UNKNOWN",
- total, niov + 1);
-#endif
- pthread_mutex_unlock(&send_lock);
-
- if (rc == 0) {
- /* NB the NAL only calls lnet_finalize() if it returns 0
- * from cb_send() */
- lnet_finalize(ni, lntmsg, 0);
- }
-
- return(rc);
-}
-
-
-int tcpnal_recv(lnet_ni_t *ni,
- void *private,
- lnet_msg_t *cookie,
- int delayed,
- unsigned int niov,
- struct iovec *iov,
- lnet_kiov_t *kiov,
- unsigned int offset,
- unsigned int mlen,
- unsigned int rlen)
-{
- struct iovec tiov[256];
- int ntiov;
- int i;
-
- if (mlen == 0)
- goto finalize;
-
- LASSERT(iov != NULL); /* I don't understand kiovs */
-
- ntiov = lnet_extract_iov(256, tiov, niov, iov, offset, mlen);
-
- /* FIXME
- * 1. Is this effecient enough? change to use readv() directly?
- * - MeiJia
- */
- for (i = 0; i < ntiov; i++)
- if (!read_connection(private, tiov[i].iov_base, tiov[i].iov_len))
- return -EIO;
-
-
-finalize:
- LASSERT(rlen >= mlen);
-
- if (mlen != rlen){
- int rc;
- char *trash=malloc(rlen - mlen);
-
- if (!trash)
- return -ENOMEM;
-
- rc = read_connection(private, trash, rlen - mlen);
- free(trash);
- if (!rc)
- return -EIO;
- }
-
- lnet_finalize(ni, cookie, 0);
- return(0);
-}
-
-
-/* Function: from_connection:
- * Arguments: c: the connection to read from
- * Returns: whether or not to continue reading from this connection,
- * expressed as a 1 to continue, and a 0 to not
- *
- * from_connection() is called from the select loop when i/o is
- * available. It attempts to read the portals header and
- * pass it to the generic library for processing.
- */
-static int from_connection(void *a, void *d)
-{
- connection c = d;
- bridge b = a;
- lnet_hdr_t hdr;
- int rc;
-
- if (read_connection(c, (unsigned char *)&hdr, sizeof(hdr))) {
- /* replace dest_nid,pid (socknal sets its own) */
- hdr.dest_nid = cpu_to_le64(b->b_ni->ni_nid);
- hdr.dest_pid = cpu_to_le32(the_lnet.ln_pid);
-
- rc = lnet_parse(b->b_ni, &hdr, c->peer_nid, c, 0);
- if (rc < 0) {
- CERROR("Error %d from lnet_parse\n", rc);
- return 0;
- }
-
- return(1);
- }
- return(0);
-}
-
-
-void tcpnal_shutdown(bridge b)
-{
- shutdown_connections(b->lower);
-}
-
-/* Function: PTL_IFACE_TCP
- * Arguments: pid_request: desired port number to bind to
- * desired: passed NAL limits structure
- * actual: returned NAL limits structure
- * Returns: a nal structure on success, or null on failure
- */
-int tcpnal_init(bridge b)
-{
- manager m;
-
- tcpnal_set_global_params();
-
- if (!(m = init_connections(from_connection, b))) {
- /* TODO: this needs to shut down the newly created junk */
- return(-ENXIO);
- }
- b->lower = m;
- return(0);
-}
+++ /dev/null
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * Copyright (c) 2002 Cray Inc.
- * Copyright (c) 2002 Eric Hoffman
- *
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
- */
-
-/* TODO: make this an explicit type when they become available */
-typedef unsigned long long when;
-
-typedef struct timer {
- void (*function)(void *);
- void *arg;
- when w;
- int interval;
- int disable;
-} *timer;
-
-timer register_timer(when, void (*f)(void *), void *a);
-void remove_timer(timer t);
-void timer_loop(void);
-void initialize_timer(void);
-void register_thunk(void (*f)(void *),void *a);
-
-
-#define HZ 0x100000000ull
-
-
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2007 Cluster File Systems, Inc.
+ * Author: Maxim Patlasov <maxim@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ */
+
+#include "usocklnd.h"
+#include <sys/time.h>
+
+lnd_t the_tcplnd = {
+ .lnd_type = SOCKLND,
+ .lnd_startup = usocklnd_startup,
+ .lnd_shutdown = usocklnd_shutdown,
+ .lnd_send = usocklnd_send,
+ .lnd_recv = usocklnd_recv,
+ .lnd_accept = usocklnd_accept,
+};
+
+usock_data_t usock_data;
+usock_tunables_t usock_tuns = {
+ .ut_timeout = 50,
+ .ut_poll_timeout = 1,
+ .ut_fair_limit = 1,
+ .ut_npollthreads = 0,
+ .ut_min_bulk = 1<<10,
+ .ut_txcredits = 256,
+ .ut_peertxcredits = 8,
+ .ut_socknagle = 0,
+ .ut_sockbufsiz = 0,
+};
+
+#define MAX_REASONABLE_TIMEOUT 36000 /* 10 hours */
+#define MAX_REASONABLE_NPT 1000
+
+int
+usocklnd_validate_tunables()
+{
+ if (usock_tuns.ut_timeout <= 0 ||
+ usock_tuns.ut_timeout > MAX_REASONABLE_TIMEOUT) {
+ CERROR("USOCK_TIMEOUT: %d is out of reasonable limits\n",
+ usock_tuns.ut_timeout);
+ return -1;
+ }
+
+ if (usock_tuns.ut_poll_timeout <= 0 ||
+ usock_tuns.ut_poll_timeout > MAX_REASONABLE_TIMEOUT) {
+ CERROR("USOCK_POLL_TIMEOUT: %d is out of reasonable limits\n",
+ usock_tuns.ut_poll_timeout);
+ return -1;
+ }
+
+ if (usock_tuns.ut_fair_limit <= 0) {
+ CERROR("Invalid USOCK_FAIR_LIMIT: %d (should be >0)\n",
+ usock_tuns.ut_fair_limit);
+ return -1;
+ }
+
+ if (usock_tuns.ut_npollthreads < 0 ||
+ usock_tuns.ut_npollthreads > MAX_REASONABLE_NPT) {
+ CERROR("USOCK_NPOLLTHREADS: %d is out of reasonable limits\n",
+ usock_tuns.ut_npollthreads);
+ return -1;
+ }
+
+ if (usock_tuns.ut_txcredits <= 0) {
+ CERROR("USOCK_TXCREDITS: %d should be positive\n",
+ usock_tuns.ut_txcredits);
+ return -1;
+ }
+
+ if (usock_tuns.ut_peertxcredits <= 0) {
+ CERROR("USOCK_PEERTXCREDITS: %d should be positive\n",
+ usock_tuns.ut_peertxcredits);
+ return -1;
+ }
+
+ if (usock_tuns.ut_peertxcredits > usock_tuns.ut_txcredits) {
+ CERROR("USOCK_PEERTXCREDITS: %d should not be greater"
+ " than USOCK_TXCREDITS: %d\n",
+ usock_tuns.ut_peertxcredits, usock_tuns.ut_txcredits);
+ return -1;
+ }
+
+ if (usock_tuns.ut_socknagle != 0 &&
+ usock_tuns.ut_socknagle != 1) {
+ CERROR("USOCK_SOCKNAGLE: %d should be 0 or 1\n",
+ usock_tuns.ut_socknagle);
+ return -1;
+ }
+
+ if (usock_tuns.ut_sockbufsiz < 0) {
+ CERROR("USOCK_SOCKBUFSIZ: %d should be 0 or positive\n",
+ usock_tuns.ut_sockbufsiz);
+ return -1;
+ }
+
+ return 0;
+}
+
+void
+usocklnd_release_poll_states(int n)
+{
+ int i;
+
+ for (i = 0; i < n; i++) {
+ usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
+
+ close(pt->upt_notifier_fd);
+ close(pt->upt_pollfd[0].fd);
+
+ pthread_mutex_destroy(&pt->upt_pollrequests_lock);
+ cfs_fini_completion(&pt->upt_completion);
+
+ LIBCFS_FREE (pt->upt_pollfd,
+ sizeof(struct pollfd) * pt->upt_npollfd);
+ LIBCFS_FREE (pt->upt_idx2conn,
+ sizeof(usock_conn_t *) * pt->upt_npollfd);
+ LIBCFS_FREE (pt->upt_fd2idx,
+ sizeof(int) * pt->upt_nfd2idx);
+ }
+}
+
+int
+usocklnd_update_tunables()
+{
+ int rc;
+
+ rc = cfs_parse_int_tunable(&usock_tuns.ut_timeout,
+ "USOCK_TIMEOUT");
+ if (rc)
+ return rc;
+
+ rc = cfs_parse_int_tunable(&usock_tuns.ut_poll_timeout,
+ "USOCK_POLL_TIMEOUT");
+ if (rc)
+ return rc;
+
+ rc = cfs_parse_int_tunable(&usock_tuns.ut_npollthreads,
+ "USOCK_NPOLLTHREADS");
+ if (rc)
+ return rc;
+
+ rc = cfs_parse_int_tunable(&usock_tuns.ut_fair_limit,
+ "USOCK_FAIR_LIMIT");
+ if (rc)
+ return rc;
+
+ rc = cfs_parse_int_tunable(&usock_tuns.ut_min_bulk,
+ "USOCK_MIN_BULK");
+ if (rc)
+ return rc;
+
+ rc = cfs_parse_int_tunable(&usock_tuns.ut_txcredits,
+ "USOCK_TXCREDITS");
+ if (rc)
+ return rc;
+
+ rc = cfs_parse_int_tunable(&usock_tuns.ut_peertxcredits,
+ "USOCK_PEERTXCREDITS");
+ if (rc)
+ return rc;
+
+ rc = cfs_parse_int_tunable(&usock_tuns.ut_socknagle,
+ "USOCK_SOCKNAGLE");
+ if (rc)
+ return rc;
+
+ rc = cfs_parse_int_tunable(&usock_tuns.ut_sockbufsiz,
+ "USOCK_SOCKBUFSIZ");
+ if (rc)
+ return rc;
+
+ if (usocklnd_validate_tunables())
+ return -EINVAL;
+
+ if (usock_tuns.ut_npollthreads == 0) {
+ usock_tuns.ut_npollthreads = cfs_online_cpus();
+
+ if (usock_tuns.ut_npollthreads <= 0) {
+ CERROR("Cannot find out the number of online CPUs\n");
+ return -EINVAL;
+ }
+ }
+
+ return 0;
+}
+
+
+int
+usocklnd_base_startup()
+{
+ usock_pollthread_t *pt;
+ int i;
+ int rc;
+
+ rc = usocklnd_update_tunables();
+ if (rc)
+ return rc;
+
+ usock_data.ud_npollthreads = usock_tuns.ut_npollthreads;
+
+ LIBCFS_ALLOC (usock_data.ud_pollthreads,
+ usock_data.ud_npollthreads *
+ sizeof(usock_pollthread_t));
+ if (usock_data.ud_pollthreads == NULL)
+ return -ENOMEM;
+
+ /* Initialize poll thread state structures */
+ for (i = 0; i < usock_data.ud_npollthreads; i++) {
+ int notifier[2];
+
+ pt = &usock_data.ud_pollthreads[i];
+
+ rc = -ENOMEM;
+
+ LIBCFS_ALLOC (pt->upt_pollfd,
+ sizeof(struct pollfd) * UPT_START_SIZ);
+ if (pt->upt_pollfd == NULL)
+ goto base_startup_failed_0;
+
+ LIBCFS_ALLOC (pt->upt_idx2conn,
+ sizeof(usock_conn_t *) * UPT_START_SIZ);
+ if (pt->upt_idx2conn == NULL)
+ goto base_startup_failed_1;
+
+ LIBCFS_ALLOC (pt->upt_fd2idx,
+ sizeof(int) * UPT_START_SIZ);
+ if (pt->upt_fd2idx == NULL)
+ goto base_startup_failed_2;
+
+ memset(pt->upt_fd2idx, 0,
+ sizeof(int) * UPT_START_SIZ);
+
+ LIBCFS_ALLOC (pt->upt_skip,
+ sizeof(int) * UPT_START_SIZ);
+ if (pt->upt_skip == NULL)
+ goto base_startup_failed_3;
+
+ pt->upt_npollfd = pt->upt_nfd2idx = UPT_START_SIZ;
+
+ rc = libcfs_socketpair(notifier);
+ if (rc != 0)
+ goto base_startup_failed_4;
+
+ pt->upt_notifier_fd = notifier[0];
+
+ pt->upt_pollfd[0].fd = notifier[1];
+ pt->upt_pollfd[0].events = POLLIN;
+ pt->upt_pollfd[0].revents = 0;
+
+ pt->upt_nfds = 1;
+ pt->upt_idx2conn[0] = NULL;
+
+ pt->upt_errno = 0;
+ CFS_INIT_LIST_HEAD (&pt->upt_pollrequests);
+ CFS_INIT_LIST_HEAD (&pt->upt_stale_list);
+ pthread_mutex_init(&pt->upt_pollrequests_lock, NULL);
+ cfs_init_completion(&pt->upt_completion);
+ }
+
+ /* Initialize peer hash list */
+ for (i = 0; i < UD_PEER_HASH_SIZE; i++)
+ CFS_INIT_LIST_HEAD(&usock_data.ud_peers[i]);
+
+ pthread_rwlock_init(&usock_data.ud_peers_lock, NULL);
+
+ /* Spawn poll threads */
+ for (i = 0; i < usock_data.ud_npollthreads; i++) {
+ rc = cfs_create_thread(usocklnd_poll_thread,
+ &usock_data.ud_pollthreads[i]);
+ if (rc) {
+ usocklnd_base_shutdown(i);
+ return rc;
+ }
+ }
+
+ usock_data.ud_state = UD_STATE_INITIALIZED;
+
+ return 0;
+
+ base_startup_failed_4:
+ LIBCFS_FREE (pt->upt_skip, sizeof(int) * UPT_START_SIZ);
+ base_startup_failed_3:
+ LIBCFS_FREE (pt->upt_fd2idx, sizeof(int) * UPT_START_SIZ);
+ base_startup_failed_2:
+ LIBCFS_FREE (pt->upt_idx2conn, sizeof(usock_conn_t *) * UPT_START_SIZ);
+ base_startup_failed_1:
+ LIBCFS_FREE (pt->upt_pollfd, sizeof(struct pollfd) * UPT_START_SIZ);
+ base_startup_failed_0:
+ LASSERT(rc != 0);
+ usocklnd_release_poll_states(i);
+ LIBCFS_FREE (usock_data.ud_pollthreads,
+ usock_data.ud_npollthreads *
+ sizeof(usock_pollthread_t));
+ return rc;
+}
+
+void
+usocklnd_base_shutdown(int n)
+{
+ int i;
+
+ usock_data.ud_shutdown = 1;
+ for (i = 0; i < n; i++) {
+ usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
+ usocklnd_wakeup_pollthread(i);
+ cfs_wait_for_completion(&pt->upt_completion);
+ }
+
+ pthread_rwlock_destroy(&usock_data.ud_peers_lock);
+
+ usocklnd_release_poll_states(usock_data.ud_npollthreads);
+
+ LIBCFS_FREE (usock_data.ud_pollthreads,
+ usock_data.ud_npollthreads *
+ sizeof(usock_pollthread_t));
+
+ usock_data.ud_state = UD_STATE_INIT_NOTHING;
+}
+
+__u64
+usocklnd_new_incarnation()
+{
+ struct timeval tv;
+ int rc = gettimeofday(&tv, NULL);
+ LASSERT (rc == 0);
+ return (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+}
+
+static int
+usocklnd_assign_ni_nid(lnet_ni_t *ni)
+{
+ int rc;
+ int up;
+ __u32 ipaddr;
+
+ /* Find correct IP-address and update ni_nid with it.
+ * Two cases are supported:
+ * 1) no explicit interfaces are defined. NID will be assigned to
+ * first non-lo interface that is up;
+ * 2) exactly one explicit interface is defined. For example,
+ * LNET_NETWORKS='tcp(eth0)' */
+
+ if (ni->ni_interfaces[0] == NULL) {
+ char **names;
+ int i, n;
+
+ n = libcfs_ipif_enumerate(&names);
+ if (n <= 0) {
+ CERROR("Can't enumerate interfaces: %d\n", n);
+ return -1;
+ }
+
+ for (i = 0; i < n; i++) {
+
+ if (!strcmp(names[i], "lo")) /* skip the loopback IF */
+ continue;
+
+ rc = libcfs_ipif_query(names[i], &up, &ipaddr);
+ if (rc != 0) {
+ CWARN("Can't get interface %s info: %d\n",
+ names[i], rc);
+ continue;
+ }
+
+ if (!up) {
+ CWARN("Ignoring interface %s (down)\n",
+ names[i]);
+ continue;
+ }
+
+ break; /* one address is quite enough */
+ }
+
+ libcfs_ipif_free_enumeration(names, n);
+
+ if (i >= n) {
+ CERROR("Can't find any usable interfaces\n");
+ return -1;
+ }
+
+ CDEBUG(D_NET, "No explicit interfaces defined. "
+ "%u.%u.%u.%u used\n", HIPQUAD(ipaddr));
+ } else {
+ if (ni->ni_interfaces[1] != NULL) {
+ CERROR("only one explicit interface is allowed\n");
+ return -1;
+ }
+
+ rc = libcfs_ipif_query(ni->ni_interfaces[0], &up, &ipaddr);
+ if (rc != 0) {
+ CERROR("Can't get interface %s info: %d\n",
+ ni->ni_interfaces[0], rc);
+ return -1;
+ }
+
+ if (!up) {
+ CERROR("Explicit interface defined: %s but is down\n",
+ ni->ni_interfaces[0]);
+ return -1;
+ }
+
+ CDEBUG(D_NET, "Explicit interface defined: %s. "
+ "%u.%u.%u.%u used\n",
+ ni->ni_interfaces[0], HIPQUAD(ipaddr));
+
+ }
+
+ ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ipaddr);
+
+ return 0;
+}
+
+int
+usocklnd_startup(lnet_ni_t *ni)
+{
+ int rc;
+ usock_net_t *net;
+
+ if (usock_data.ud_state == UD_STATE_INIT_NOTHING) {
+ rc = usocklnd_base_startup();
+ if (rc != 0)
+ return rc;
+ }
+
+ LIBCFS_ALLOC(net, sizeof(*net));
+ if (net == NULL)
+ goto startup_failed_0;
+
+ memset(net, 0, sizeof(*net));
+ net->un_incarnation = usocklnd_new_incarnation();
+ pthread_mutex_init(&net->un_lock, NULL);
+ pthread_cond_init(&net->un_cond, NULL);
+
+ ni->ni_data = net;
+
+ if (!(the_lnet.ln_pid & LNET_PID_USERFLAG)) {
+ rc = usocklnd_assign_ni_nid(ni);
+ if (rc != 0)
+ goto startup_failed_1;
+ }
+
+ LASSERT (ni->ni_lnd == &the_tcplnd);
+
+ ni->ni_maxtxcredits = usock_tuns.ut_txcredits;
+ ni->ni_peertxcredits = usock_tuns.ut_peertxcredits;
+
+ usock_data.ud_nets_count++;
+ return 0;
+
+ startup_failed_1:
+ pthread_mutex_destroy(&net->un_lock);
+ pthread_cond_destroy(&net->un_cond);
+ LIBCFS_FREE(net, sizeof(*net));
+ startup_failed_0:
+ if (usock_data.ud_nets_count == 0)
+ usocklnd_base_shutdown(usock_data.ud_npollthreads);
+
+ return -ENETDOWN;
+}
+
+void
+usocklnd_shutdown(lnet_ni_t *ni)
+{
+ usock_net_t *net = ni->ni_data;
+
+ net->un_shutdown = 1;
+
+ usocklnd_del_all_peers(ni);
+
+ /* Wait for all peer state to clean up */
+ pthread_mutex_lock(&net->un_lock);
+ while (net->un_peercount != 0)
+ pthread_cond_wait(&net->un_cond, &net->un_lock);
+ pthread_mutex_unlock(&net->un_lock);
+
+ /* Release usock_net_t structure */
+ pthread_mutex_destroy(&net->un_lock);
+ pthread_cond_destroy(&net->un_cond);
+ LIBCFS_FREE(net, sizeof(*net));
+
+ usock_data.ud_nets_count--;
+ if (usock_data.ud_nets_count == 0)
+ usocklnd_base_shutdown(usock_data.ud_npollthreads);
+}
+
+void
+usocklnd_del_all_peers(lnet_ni_t *ni)
+{
+ struct list_head *ptmp;
+ struct list_head *pnxt;
+ usock_peer_t *peer;
+ int i;
+
+ pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
+
+ for (i = 0; i < UD_PEER_HASH_SIZE; i++) {
+ list_for_each_safe (ptmp, pnxt, &usock_data.ud_peers[i]) {
+ peer = list_entry (ptmp, usock_peer_t, up_list);
+
+ if (peer->up_ni != ni)
+ continue;
+
+ usocklnd_del_peer_and_conns(peer);
+ }
+ }
+
+ pthread_rwlock_unlock(&usock_data.ud_peers_lock);
+
+ /* wakeup all threads */
+ for (i = 0; i < usock_data.ud_npollthreads; i++)
+ usocklnd_wakeup_pollthread(i);
+}
+
+void
+usocklnd_del_peer_and_conns(usock_peer_t *peer)
+{
+ /* peer cannot disappear because it's still in hash list */
+
+ pthread_mutex_lock(&peer->up_lock);
+ /* content of conn[] array cannot change now */
+ usocklnd_del_conns_locked(peer);
+ pthread_mutex_unlock(&peer->up_lock);
+
+ /* peer hash list is still protected by the caller */
+ list_del(&peer->up_list);
+
+ usocklnd_peer_decref(peer); /* peer isn't in hash list anymore */
+}
+
+void
+usocklnd_del_conns_locked(usock_peer_t *peer)
+{
+ int i;
+
+ for (i=0; i < N_CONN_TYPES; i++) {
+ usock_conn_t *conn = peer->up_conns[i];
+ if (conn != NULL)
+ usocklnd_conn_kill(conn);
+ }
+}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Author: Maxim Patlasov <maxim@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ */
+#define _GNU_SOURCE
+#include <pthread.h>
+#include <poll.h>
+#include <lnet/lib-lnet.h>
+#include <lnet/socklnd.h>
+
+typedef struct {
+ struct list_head tx_list; /* neccessary to form tx list */
+ lnet_msg_t *tx_lnetmsg; /* lnet message for lnet_finalize() */
+ ksock_msg_t tx_msg; /* buffer for wire header of ksock msg */
+ int tx_resid; /* # of residual bytes */
+ int tx_nob; /* # of packet bytes */
+ int tx_size; /* size of this descriptor */
+ struct iovec *tx_iov; /* points to tx_iova[i] */
+ int tx_niov; /* # of packet iovec frags */
+ struct iovec tx_iova[1]; /* iov for header */
+} usock_tx_t;
+
+struct usock_peer_s;
+
+typedef struct {
+ int uc_fd; /* socket */
+ int uc_type; /* conn type */
+ int uc_activeflag; /* active side of connection? */
+ int uc_flip; /* is peer other endian? */
+ int uc_state; /* connection state */
+ struct usock_peer_s *uc_peer; /* owning peer */
+ lnet_process_id_t uc_peerid; /* id of remote peer */
+ int uc_pt_idx; /* index in ud_pollthreads[] of
+ * owning poll thread */
+ lnet_ni_t *uc_ni; /* parent NI while accepting */
+ struct usock_preq_s *uc_preq; /* preallocated request */
+ __u32 uc_peer_ip; /* IP address of the peer */
+ __u16 uc_peer_port; /* port of the peer */
+ struct list_head uc_stale_list; /* orphaned connections */
+
+ /* Receive state */
+ int uc_rx_state; /* message or hello state */
+ ksock_hello_msg_t *uc_rx_hello; /* hello buffer */
+ struct iovec *uc_rx_iov; /* points to uc_rx_iova[i] */
+ struct iovec uc_rx_iova[LNET_MAX_IOV]; /* message frags */
+ int uc_rx_niov; /* # frags */
+ int uc_rx_nob_left; /* # bytes to next hdr/body */
+ int uc_rx_nob_wanted; /* # of bytes actually wanted */
+ void *uc_rx_lnetmsg; /* LNET message being received */
+ cfs_time_t uc_rx_deadline; /* when to time out */
+ int uc_rx_flag; /* deadline valid? */
+ ksock_msg_t uc_rx_msg; /* message buffer */
+
+ /* Send state */
+ struct list_head uc_tx_list; /* pending txs */
+ struct list_head uc_zcack_list; /* pending zc_acks */
+ cfs_time_t uc_tx_deadline; /* when to time out */
+ int uc_tx_flag; /* deadline valid? */
+ int uc_sending; /* send op is in progress */
+ usock_tx_t *uc_tx_hello; /* fake tx with hello */
+
+ cfs_atomic_t uc_refcount; /* # of users */
+ pthread_mutex_t uc_lock; /* serialize */
+ int uc_errored; /* a flag for lnet_notify() */
+} usock_conn_t;
+
+/* Allowable conn states are: */
+#define UC_CONNECTING 1
+#define UC_SENDING_HELLO 2
+#define UC_RECEIVING_HELLO 3
+#define UC_READY 4
+#define UC_DEAD 5
+
+/* Allowable RX states are: */
+#define UC_RX_HELLO_MAGIC 1
+#define UC_RX_HELLO_VERSION 2
+#define UC_RX_HELLO_BODY 3
+#define UC_RX_HELLO_IPS 4
+#define UC_RX_KSM_HEADER 5
+#define UC_RX_LNET_HEADER 6
+#define UC_RX_PARSE 7
+#define UC_RX_PARSE_WAIT 8
+#define UC_RX_LNET_PAYLOAD 9
+#define UC_RX_SKIPPING 10
+
+#define N_CONN_TYPES 3 /* CONTROL, BULK_IN and BULK_OUT */
+
+typedef struct usock_peer_s {
+ struct list_head up_list; /* neccessary to form peer list */
+ lnet_process_id_t up_peerid; /* id of remote peer */
+ usock_conn_t *up_conns[N_CONN_TYPES]; /* conns that connect us
+ * us with the peer */
+ lnet_ni_t *up_ni; /* pointer to parent NI */
+ __u64 up_incarnation; /* peer's incarnation */
+ int up_incrn_is_set; /* 0 if peer's incarnation
+ * hasn't been set so far */
+ cfs_atomic_t up_refcount; /* # of users */
+ pthread_mutex_t up_lock; /* serialize */
+ int up_errored; /* a flag for lnet_notify() */
+ cfs_time_t up_last_alive; /* when the peer was last alive */
+} usock_peer_t;
+
+typedef struct {
+ int upt_notifier_fd; /* notifier fd for writing */
+ struct pollfd *upt_pollfd; /* poll fds */
+ int upt_nfds; /* active poll fds */
+ int upt_npollfd; /* allocated poll fds */
+ usock_conn_t **upt_idx2conn; /* conns corresponding to
+ * upt_pollfd[idx] */
+ int *upt_skip; /* skip chain */
+ int *upt_fd2idx; /* index into upt_pollfd[]
+ * by fd */
+ int upt_nfd2idx; /* # of allocated elements
+ * of upt_fd2idx[] */
+ struct list_head upt_stale_list; /* list of orphaned conns */
+ struct list_head upt_pollrequests; /* list of poll requests */
+ pthread_mutex_t upt_pollrequests_lock; /* serialize */
+ int upt_errno; /* non-zero if errored */
+ struct cfs_completion upt_completion; /* wait/signal facility for
+ * syncronizing shutdown */
+} usock_pollthread_t;
+
+/* Number of elements in upt_pollfd[], upt_idx2conn[] and upt_fd2idx[]
+ * at initialization time. Will be resized on demand */
+#define UPT_START_SIZ 32
+
+/* # peer lists */
+#define UD_PEER_HASH_SIZE 101
+
+typedef struct {
+ int ud_state; /* initialization state */
+ int ud_npollthreads; /* # of poll threads */
+ usock_pollthread_t *ud_pollthreads; /* their state */
+ int ud_shutdown; /* shutdown flag */
+ int ud_nets_count; /* # of instances */
+ struct list_head ud_peers[UD_PEER_HASH_SIZE]; /* peer hash table */
+ pthread_rwlock_t ud_peers_lock; /* serialize */
+} usock_data_t;
+
+extern usock_data_t usock_data;
+
+/* ud_state allowed values */
+#define UD_STATE_INIT_NOTHING 0
+#define UD_STATE_INITIALIZED 1
+
+typedef struct {
+ int un_peercount; /* # of peers */
+ int un_shutdown; /* shutdown flag */
+ __u64 un_incarnation; /* my epoch */
+ pthread_cond_t un_cond; /* condvar to wait for notifications */
+ pthread_mutex_t un_lock; /* a lock to protect un_cond */
+} usock_net_t;
+
+typedef struct {
+ int ut_poll_timeout; /* the third arg for poll(2) (seconds) */
+ int ut_timeout; /* "stuck" socket timeout (seconds) */
+ int ut_npollthreads; /* number of poll thread to spawn */
+ int ut_fair_limit; /* how many packets can we receive or transmit
+ * without calling poll(2) */
+ int ut_min_bulk; /* smallest "large" message */
+ int ut_txcredits; /* # concurrent sends */
+ int ut_peertxcredits; /* # concurrent sends to 1 peer */
+ int ut_socknagle; /* Is Nagle alg on ? */
+ int ut_sockbufsiz; /* size of socket buffers */
+} usock_tunables_t;
+
+extern usock_tunables_t usock_tuns;
+
+typedef struct usock_preq_s {
+ int upr_type; /* type of requested action */
+ short upr_value; /* bitmask of POLLIN and POLLOUT bits */
+ usock_conn_t * upr_conn; /* a conn for the sake of which
+ * action will be performed */
+ struct list_head upr_list; /* neccessary to form list */
+} usock_pollrequest_t;
+
+/* Allowable poll request types are: */
+#define POLL_ADD_REQUEST 1
+#define POLL_DEL_REQUEST 2
+#define POLL_RX_SET_REQUEST 3
+#define POLL_TX_SET_REQUEST 4
+#define POLL_SET_REQUEST 5
+
+typedef struct {
+ struct list_head zc_list; /* neccessary to form zc_ack list */
+ __u64 zc_cookie; /* zero-copy cookie */
+} usock_zc_ack_t;
+
+static inline void
+usocklnd_conn_addref(usock_conn_t *conn)
+{
+ LASSERT (cfs_atomic_read(&conn->uc_refcount) > 0);
+ cfs_atomic_inc(&conn->uc_refcount);
+}
+
+void usocklnd_destroy_conn(usock_conn_t *conn);
+
+static inline void
+usocklnd_conn_decref(usock_conn_t *conn)
+{
+ LASSERT (cfs_atomic_read(&conn->uc_refcount) > 0);
+ if (cfs_atomic_dec_and_test(&conn->uc_refcount))
+ usocklnd_destroy_conn(conn);
+}
+
+static inline void
+usocklnd_peer_addref(usock_peer_t *peer)
+{
+ LASSERT (cfs_atomic_read(&peer->up_refcount) > 0);
+ cfs_atomic_inc(&peer->up_refcount);
+}
+
+void usocklnd_destroy_peer(usock_peer_t *peer);
+
+static inline void
+usocklnd_peer_decref(usock_peer_t *peer)
+{
+ LASSERT (cfs_atomic_read(&peer->up_refcount) > 0);
+ if (cfs_atomic_dec_and_test(&peer->up_refcount))
+ usocklnd_destroy_peer(peer);
+}
+
+static inline int
+usocklnd_ip2pt_idx(__u32 ip) {
+ return ip % usock_data.ud_npollthreads;
+}
+
+static inline struct list_head *
+usocklnd_nid2peerlist(lnet_nid_t nid)
+{
+ unsigned int hash = ((unsigned int)nid) % UD_PEER_HASH_SIZE;
+
+ return &usock_data.ud_peers[hash];
+}
+
+int usocklnd_startup(lnet_ni_t *ni);
+void usocklnd_shutdown(lnet_ni_t *ni);
+int usocklnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg);
+int usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
+ unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
+ unsigned int offset, unsigned int mlen, unsigned int rlen);
+int usocklnd_accept(lnet_ni_t *ni, int sock_fd);
+
+int usocklnd_poll_thread(void *arg);
+int usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value);
+void usocklnd_add_killrequest(usock_conn_t *conn);
+int usocklnd_process_pollrequest(usock_pollrequest_t *pr,
+ usock_pollthread_t *pt_data);
+void usocklnd_execute_handlers(usock_pollthread_t *pt_data);
+int usocklnd_calculate_chunk_size(int num);
+void usocklnd_wakeup_pollthread(int i);
+
+int usocklnd_notifier_handler(int fd);
+void usocklnd_exception_handler(usock_conn_t *conn);
+int usocklnd_read_handler(usock_conn_t *conn);
+int usocklnd_read_msg(usock_conn_t *conn, int *cont_flag);
+int usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie);
+int usocklnd_read_hello(usock_conn_t *conn, int *cont_flag);
+int usocklnd_activeconn_hellorecv(usock_conn_t *conn);
+int usocklnd_passiveconn_hellorecv(usock_conn_t *conn);
+int usocklnd_write_handler(usock_conn_t *conn);
+usock_tx_t * usocklnd_try_piggyback(struct list_head *tx_list_p,
+ struct list_head *zcack_list_p);
+int usocklnd_activeconn_hellosent(usock_conn_t *conn);
+int usocklnd_passiveconn_hellosent(usock_conn_t *conn);
+int usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx);
+int usocklnd_read_data(usock_conn_t *conn);
+
+void usocklnd_release_poll_states(int n);
+int usocklnd_base_startup();
+void usocklnd_base_shutdown(int n);
+__u64 usocklnd_new_incarnation();
+void usocklnd_del_all_peers(lnet_ni_t *ni);
+void usocklnd_del_peer_and_conns(usock_peer_t *peer);
+void usocklnd_del_conns_locked(usock_peer_t *peer);
+
+int usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time);
+void usocklnd_conn_kill(usock_conn_t *conn);
+void usocklnd_conn_kill_locked(usock_conn_t *conn);
+usock_conn_t *usocklnd_conn_allocate();
+void usocklnd_conn_free(usock_conn_t *conn);
+void usocklnd_tear_peer_conn(usock_conn_t *conn);
+void usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id);
+int usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp);
+int usocklnd_create_active_conn(usock_peer_t *peer, int type,
+ usock_conn_t **connp);
+int usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port);
+int usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port);
+int usocklnd_set_sock_options(int fd);
+void usocklnd_init_msg(ksock_msg_t *msg, int type);
+usock_tx_t *usocklnd_create_noop_tx(__u64 cookie);
+usock_tx_t *usocklnd_create_tx(lnet_msg_t *lntmsg);
+void usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
+ lnet_ni_t *ni, int type, lnet_nid_t peer_nid);
+usock_tx_t *usocklnd_create_hello_tx(lnet_ni_t *ni,
+ int type, lnet_nid_t peer_nid);
+usock_tx_t *usocklnd_create_cr_hello_tx(lnet_ni_t *ni,
+ int type, lnet_nid_t peer_nid);
+void usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx);
+void usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist);
+void usocklnd_destroy_zcack_list(struct list_head *zcack_list);
+void usocklnd_destroy_peer (usock_peer_t *peer);
+int usocklnd_get_conn_type(lnet_msg_t *lntmsg);
+int usocklnd_type2idx(int type);
+usock_peer_t *usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id);
+int usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
+ usock_peer_t **peerp);
+int usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,
+ usock_peer_t **peerp);
+int usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
+ usock_conn_t **connp,
+ usock_tx_t *tx, usock_zc_ack_t *zc_ack,
+ int *send_immediately_flag);
+void usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx);
+int usocklnd_invert_type(int type);
+void usocklnd_conn_new_state(usock_conn_t *conn, int new_state);
+void usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,
+ usock_conn_t *skip_conn);
+
+void usocklnd_rx_hellomagic_state_transition(usock_conn_t *conn);
+void usocklnd_rx_helloversion_state_transition(usock_conn_t *conn);
+void usocklnd_rx_hellobody_state_transition(usock_conn_t *conn);
+void usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn);
+void usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn);
+void usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn);
+void usocklnd_rx_skipping_state_transition(usock_conn_t *conn);
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Author: Maxim Patlasov <maxim@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ */
+
+#include "usocklnd.h"
+
+static int
+usocklnd_send_tx_immediately(usock_conn_t *conn, usock_tx_t *tx)
+{
+ int rc;
+ int rc2;
+ int partial_send = 0;
+ usock_peer_t *peer = conn->uc_peer;
+
+ LASSERT (peer != NULL);
+
+ /* usocklnd_enqueue_tx() turned it on for us */
+ LASSERT(conn->uc_sending);
+
+ //counter_imm_start++;
+ rc = usocklnd_send_tx(conn, tx);
+ if (rc == 0) { /* partial send or connection closed */
+ pthread_mutex_lock(&conn->uc_lock);
+ list_add(&tx->tx_list, &conn->uc_tx_list);
+ conn->uc_sending = 0;
+ pthread_mutex_unlock(&conn->uc_lock);
+ partial_send = 1;
+ } else {
+ usocklnd_destroy_tx(peer->up_ni, tx);
+ /* NB: lnetmsg was finalized, so we *must* return 0 */
+
+ if (rc < 0) { /* real error */
+ usocklnd_conn_kill(conn);
+ return 0;
+ }
+
+ /* rc == 1: tx was sent completely */
+ rc = 0; /* let's say to caller 'Ok' */
+ //counter_imm_complete++;
+ }
+
+ pthread_mutex_lock(&conn->uc_lock);
+ conn->uc_sending = 0;
+
+ /* schedule write handler */
+ if (partial_send ||
+ (conn->uc_state == UC_READY &&
+ (!list_empty(&conn->uc_tx_list) ||
+ !list_empty(&conn->uc_zcack_list)))) {
+ conn->uc_tx_deadline =
+ cfs_time_shift(usock_tuns.ut_timeout);
+ conn->uc_tx_flag = 1;
+ rc2 = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST, POLLOUT);
+ if (rc2 != 0)
+ usocklnd_conn_kill_locked(conn);
+ else
+ usocklnd_wakeup_pollthread(conn->uc_pt_idx);
+ }
+
+ pthread_mutex_unlock(&conn->uc_lock);
+
+ return rc;
+}
+
+int
+usocklnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
+{
+ usock_tx_t *tx;
+ lnet_process_id_t target = lntmsg->msg_target;
+ usock_peer_t *peer;
+ int type;
+ int rc;
+ usock_conn_t *conn;
+ int send_immediately;
+
+ tx = usocklnd_create_tx(lntmsg);
+ if (tx == NULL)
+ return -ENOMEM;
+
+ rc = usocklnd_find_or_create_peer(ni, target, &peer);
+ if (rc) {
+ LIBCFS_FREE (tx, tx->tx_size);
+ return rc;
+ }
+ /* peer cannot disappear now because its refcount was incremented */
+
+ type = usocklnd_get_conn_type(lntmsg);
+ rc = usocklnd_find_or_create_conn(peer, type, &conn, tx, NULL,
+ &send_immediately);
+ if (rc != 0) {
+ usocklnd_peer_decref(peer);
+ usocklnd_check_peer_stale(ni, target);
+ LIBCFS_FREE (tx, tx->tx_size);
+ return rc;
+ }
+ /* conn cannot disappear now because its refcount was incremented */
+
+ if (send_immediately)
+ rc = usocklnd_send_tx_immediately(conn, tx);
+
+ usocklnd_conn_decref(conn);
+ usocklnd_peer_decref(peer);
+ return rc;
+}
+
+int
+usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
+ unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
+ unsigned int offset, unsigned int mlen, unsigned int rlen)
+{
+ int rc = 0;
+ usock_conn_t *conn = (usock_conn_t *)private;
+
+ /* I don't think that we'll win much concurrency moving lock()
+ * call below lnet_extract_iov() */
+ pthread_mutex_lock(&conn->uc_lock);
+
+ conn->uc_rx_lnetmsg = msg;
+ conn->uc_rx_nob_wanted = mlen;
+ conn->uc_rx_nob_left = rlen;
+ conn->uc_rx_iov = conn->uc_rx_iova;
+ conn->uc_rx_niov =
+ lnet_extract_iov(LNET_MAX_IOV, conn->uc_rx_iov,
+ niov, iov, offset, mlen);
+
+ /* the gap between lnet_parse() and usocklnd_recv() happened? */
+ if (conn->uc_rx_state == UC_RX_PARSE_WAIT) {
+ conn->uc_rx_flag = 1; /* waiting for incoming lnet payload */
+ conn->uc_rx_deadline =
+ cfs_time_shift(usock_tuns.ut_timeout);
+ rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, POLLIN);
+ if (rc != 0) {
+ usocklnd_conn_kill_locked(conn);
+ goto recv_out;
+ }
+ usocklnd_wakeup_pollthread(conn->uc_pt_idx);
+ }
+
+ conn->uc_rx_state = UC_RX_LNET_PAYLOAD;
+ recv_out:
+ pthread_mutex_unlock(&conn->uc_lock);
+ usocklnd_conn_decref(conn);
+ return rc;
+}
+
+int
+usocklnd_accept(lnet_ni_t *ni, int sock_fd)
+{
+ int rc;
+ usock_conn_t *conn;
+
+ rc = usocklnd_create_passive_conn(ni, sock_fd, &conn);
+ if (rc)
+ return rc;
+ LASSERT(conn != NULL);
+
+ /* disable shutdown event temporarily */
+ lnet_ni_addref(ni);
+
+ rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLIN);
+ if (rc == 0)
+ usocklnd_wakeup_pollthread(conn->uc_pt_idx);
+
+ /* NB: conn reference counter was incremented while adding
+ * poll request if rc == 0 */
+
+ usocklnd_conn_decref(conn); /* should destroy conn if rc != 0 */
+ return rc;
+}