From b74555726d0e8ba1f11ebe959029c59ce9cdc842 Mon Sep 17 00:00:00 2001 From: maxim Date: Tue, 3 Feb 2009 13:43:21 +0000 Subject: [PATCH 1/1] b=14132 i=isaac acceptor.c cleanup: - removed code duplication in acceptor.c for the cases of kernel and user-space code - uniformed user-space libcfs tcpip primitives to have prototypes similar to kernel ones - removed parse_int_tunable() from libcfs to lnet because it's not generic enough to reside in libcfs - untabified and removed trailing spaces in files touched - minor cosmetic changes in usocklnd becasue now a socket is represented with cfs_socket_t structure and should be released properly (just close(fd) is not ehough). --- libcfs/include/libcfs/libcfs_prim.h | 2 +- libcfs/include/libcfs/user-prim.h | 1 - libcfs/include/libcfs/user-tcpip.h | 40 ++- libcfs/libcfs/user-prim.c | 17 - libcfs/libcfs/user-tcpip.c | 378 ++++++++++++++--------- lnet/include/lnet/lib-lnet.h | 85 +++-- lnet/include/lnet/lib-types.h | 54 ++-- lnet/lnet/acceptor.c | 600 +++++++++++------------------------- lnet/ulnds/socklnd/conn.c | 256 +++++++-------- lnet/ulnds/socklnd/handlers.c | 229 +++++++------- lnet/ulnds/socklnd/poll.c | 113 +++---- lnet/ulnds/socklnd/usocklnd.c | 119 ++++--- lnet/ulnds/socklnd/usocklnd.h | 30 +- lnet/ulnds/socklnd/usocklnd_cb.c | 34 +- 14 files changed, 915 insertions(+), 1043 deletions(-) diff --git a/libcfs/include/libcfs/libcfs_prim.h b/libcfs/include/libcfs/libcfs_prim.h index 8dca0a9..869b12f 100644 --- a/libcfs/include/libcfs/libcfs_prim.h +++ b/libcfs/include/libcfs/libcfs_prim.h @@ -57,7 +57,7 @@ void cfs_cond_resched(void); void cfs_waitq_init(cfs_waitq_t *waitq); void cfs_waitlink_init(cfs_waitlink_t *link); void cfs_waitq_add(cfs_waitq_t *waitq, cfs_waitlink_t *link); -void cfs_waitq_add_exclusive(cfs_waitq_t *waitq, +void cfs_waitq_add_exclusive(cfs_waitq_t *waitq, cfs_waitlink_t *link); void cfs_waitq_del(cfs_waitq_t *waitq, cfs_waitlink_t *link); int cfs_waitq_active(cfs_waitq_t *waitq); diff --git a/libcfs/include/libcfs/user-prim.h b/libcfs/include/libcfs/user-prim.h index 967cf07..7a38d7c 100644 --- a/libcfs/include/libcfs/user-prim.h +++ b/libcfs/include/libcfs/user-prim.h @@ -147,7 +147,6 @@ int cfs_create_thread(cfs_thread_t func, void *arg); #define cfs_create_thread(l,m) LBUG() #endif -int cfs_parse_int_tunable(int *value, char *name); uid_t cfs_curproc_uid(void); #define LIBCFS_REALLOC(ptr, size) realloc(ptr, size) diff --git a/libcfs/include/libcfs/user-tcpip.h b/libcfs/include/libcfs/user-tcpip.h index fe0464d..73cc586 100644 --- a/libcfs/include/libcfs/user-tcpip.h +++ b/libcfs/include/libcfs/user-tcpip.h @@ -58,26 +58,36 @@ int libcfs_ipif_enumerate (char ***namesp); * Network function used by user-land lnet acceptor */ -int libcfs_sock_listen (int *sockp, __u32 local_ip, int local_port, int backlog); -int libcfs_sock_accept (int *newsockp, int sock, __u32 *peer_ip, int *peer_port); -int libcfs_sock_read (int sock, void *buffer, int nob, int timeout); -void libcfs_sock_abort_accept(__u16 port); +typedef struct cfs_socket { + int s_fd; +} cfs_socket_t; + +#define LIBCFS_SOCK2FD(sock) ((sock)->s_fd) + +int libcfs_sock_listen(cfs_socket_t **sockp, __u32 ip, int port, int backlog); +int libcfs_sock_accept(cfs_socket_t **newsockp, cfs_socket_t *sock); +int libcfs_sock_read(cfs_socket_t *sock, void *buffer, int nob, int timeout); +int libcfs_sock_write(cfs_socket_t *sock, void *buffer, int nob, int timeout); +void libcfs_sock_abort_accept(cfs_socket_t *sock); +void libcfs_sock_release(cfs_socket_t *sock); +int libcfs_sock_getaddr(cfs_socket_t *sock, int remote, __u32 *ip, int *port); /* * Network functions of common use */ -int libcfs_getpeername(int sock_fd, __u32 *ipaddr_p, __u16 *port_p); -int libcfs_socketpair(int *fdp); -int libcfs_fcntl_nonblock(int fd); -int libcfs_sock_set_nagle(int fd, int nagle); -int libcfs_sock_set_bufsiz(int fd, int bufsiz); -int libcfs_sock_create(int *fdp); -void libcfs_sock_release(int fd); -int libcfs_sock_bind_to_port(int fd, __u16 port); -int libcfs_sock_connect(int fd, __u32 ip, __u16 port); -int libcfs_sock_writev(int fd, const struct iovec *vector, int count); -int libcfs_sock_readv(int fd, const struct iovec *vector, int count); +int libcfs_socketpair(cfs_socket_t **sockp); +int libcfs_fcntl_nonblock(cfs_socket_t *sock); +int libcfs_sock_set_nagle(cfs_socket_t *sock, int nagle); +int libcfs_sock_set_bufsiz(cfs_socket_t *sock, int bufsiz); +int libcfs_sock_connect(cfs_socket_t *sock, __u32 ip, __u16 port); +int libcfs_sock_writev(cfs_socket_t *sock, + const struct iovec *vector, int count); +int libcfs_sock_readv(cfs_socket_t *sock, + const struct iovec *vector, int count); +int libcfs_sock_create(cfs_socket_t **sockp, int *fatal, + __u32 local_ip, int local_port); + /* * Macros for easy printing IP-adresses diff --git a/libcfs/libcfs/user-prim.c b/libcfs/libcfs/user-prim.c index 4c52964..ee787e8 100644 --- a/libcfs/libcfs/user-prim.c +++ b/libcfs/libcfs/user-prim.c @@ -238,23 +238,6 @@ uid_t cfs_curproc_uid(void) return getuid(); } -int cfs_parse_int_tunable(int *value, char *name) -{ - char *env = getenv(name); - char *end; - - if (env == NULL) - return 0; - - *value = (int)strtoull(env, &end, 0); - if (*end == 0) - return 0; - - CERROR("Can't parse tunable %s=%s\n", name, env); - return -EINVAL; -} - - void cfs_enter_debugger(void) { /* diff --git a/libcfs/libcfs/user-tcpip.c b/libcfs/libcfs/user-tcpip.c index 9dcf4b5..b961714b 100644 --- a/libcfs/libcfs/user-tcpip.c +++ b/libcfs/libcfs/user-tcpip.c @@ -39,7 +39,7 @@ #include #include -#ifdef HAVE_NETINET_IN_H +#ifdef HAVE_NETINET_IN_H #include #endif #include @@ -238,93 +238,76 @@ libcfs_ipif_enumerate (char ***namesp) */ int -libcfs_sock_listen (int *sockp, __u32 local_ip, int local_port, int backlog) +libcfs_sock_listen (cfs_socket_t **sockp, + __u32 local_ip, int local_port, int backlog) { - int rc; - int option; - struct sockaddr_in locaddr; - - *sockp = socket(AF_INET, SOCK_STREAM, 0); - if (*sockp < 0) { - rc = -errno; - CERROR("socket() failed: errno==%d\n", errno); - return rc; - } - - option = 1; - if ( setsockopt(*sockp, SOL_SOCKET, SO_REUSEADDR, - (char *)&option, sizeof (option)) ) { - rc = -errno; - CERROR("setsockopt(SO_REUSEADDR) failed: errno==%d\n", errno); - goto failed; - } + int rc; + int fatal; - if (local_ip != 0 || local_port != 0) { - memset(&locaddr, 0, sizeof(locaddr)); - locaddr.sin_family = AF_INET; - locaddr.sin_port = htons(local_port); - locaddr.sin_addr.s_addr = (local_ip == 0) ? - INADDR_ANY : htonl(local_ip); - - if ( bind(*sockp, (struct sockaddr *)&locaddr, sizeof(locaddr)) ) { - rc = -errno; - if ( errno == -EADDRINUSE ) - CDEBUG(D_NET, "Port %d already in use\n", - local_port); - else - CERROR("bind() to port %d failed: errno==%d\n", - local_port, errno); - goto failed; - } - } + rc = libcfs_sock_create(sockp, &fatal, local_ip, local_port); + if (rc != 0) + return rc; - if ( listen(*sockp, backlog) ) { + if ( listen((*sockp)->s_fd, backlog) ) { rc = -errno; CERROR("listen() with backlog==%d failed: errno==%d\n", backlog, errno); goto failed; } - + return 0; failed: - close(*sockp); + libcfs_sock_release(*sockp); return rc; } +void +libcfs_sock_release (cfs_socket_t *sock) +{ + close(sock->s_fd); + LIBCFS_FREE(sock, sizeof(cfs_socket_t)); +} + int -libcfs_sock_accept (int *newsockp, int sock, __u32 *peer_ip, int *peer_port) +libcfs_sock_accept (cfs_socket_t **newsockp, cfs_socket_t *sock) { struct sockaddr_in accaddr; - socklen_t accaddr_len = sizeof(struct sockaddr_in); + socklen_t accaddr_len = sizeof(struct sockaddr_in); - *newsockp = accept(sock, (struct sockaddr *)&accaddr, &accaddr_len); + LIBCFS_ALLOC(*newsockp, sizeof(cfs_socket_t)); + if (*newsockp == NULL) { + CERROR ("Can't alloc memory for cfs_socket_t\n"); + return -ENOMEM; + } - if ( *newsockp < 0 ) { - CERROR("accept() failed: errno==%d\n", errno); - return -errno; + (*newsockp)->s_fd = accept(sock->s_fd, + (struct sockaddr *)&accaddr, &accaddr_len); + + if ( (*newsockp)->s_fd < 0 ) { + int rc = -errno; + CERROR("accept() failed: errno==%d\n", -rc); + LIBCFS_FREE(*newsockp, sizeof(cfs_socket_t)); + return rc; } - *peer_ip = ntohl(accaddr.sin_addr.s_addr); - *peer_port = ntohs(accaddr.sin_port); - return 0; } int -libcfs_sock_read (int sock, void *buffer, int nob, int timeout) +libcfs_sock_read (cfs_socket_t *sock, void *buffer, int nob, int timeout) { - int rc; + int rc; struct pollfd pfd; - cfs_time_t start_time = cfs_time_current(); + cfs_time_t start_time = cfs_time_current(); - pfd.fd = sock; + pfd.fd = sock->s_fd; pfd.events = POLLIN; pfd.revents = 0; /* poll(2) measures timeout in msec */ timeout *= 1000; - + while (nob != 0 && timeout > 0) { cfs_time_t current_time; @@ -335,21 +318,70 @@ libcfs_sock_read (int sock, void *buffer, int nob, int timeout) return -ETIMEDOUT; if ((pfd.revents & POLLIN) == 0) return -EIO; - - rc = read(sock, buffer, nob); + + rc = read(sock->s_fd, buffer, nob); + if (rc < 0) + return -errno; + if (rc == 0) + return -EIO; + + buffer = ((char *)buffer) + rc; + nob -= rc; + + current_time = cfs_time_current(); + timeout -= 1000 * + cfs_duration_sec(cfs_time_sub(current_time, + start_time)); + start_time = current_time; + } + + if (nob == 0) + return 0; + else + return -ETIMEDOUT; +} + +int +libcfs_sock_write (cfs_socket_t *sock, void *buffer, int nob, int timeout) +{ + int rc; + struct pollfd pfd; + cfs_time_t start_time = cfs_time_current(); + + pfd.fd = sock->s_fd; + pfd.events = POLLOUT; + pfd.revents = 0; + + /* poll(2) measures timeout in msec */ + timeout *= 1000; + + while (nob != 0 && timeout > 0) { + cfs_time_t current_time; + + rc = poll(&pfd, 1, timeout); + if (rc < 0) + return -errno; + if (rc == 0) + return -ETIMEDOUT; + if ((pfd.revents & POLLOUT) == 0) + return -EIO; + + rc = write(sock->s_fd, buffer, nob); if (rc < 0) return -errno; if (rc == 0) return -EIO; - + buffer = ((char *)buffer) + rc; nob -= rc; current_time = cfs_time_current(); - timeout -= cfs_duration_sec(cfs_time_sub(cfs_time_current(), - start_time)); + timeout -= 1000 * + cfs_duration_sec(cfs_time_sub(current_time, + start_time)); + start_time = current_time; } - + if (nob == 0) return 0; else @@ -359,112 +391,144 @@ libcfs_sock_read (int sock, void *buffer, int nob, int timeout) /* Just try to connect to localhost to wake up entity that are * sleeping in accept() */ void -libcfs_sock_abort_accept(__u16 port) +libcfs_sock_abort_accept(cfs_socket_t *sock) { int fd, rc; + struct sockaddr_in remaddr; struct sockaddr_in locaddr; + socklen_t alen = sizeof(struct sockaddr_in); + + rc = getsockname(sock->s_fd, (struct sockaddr *)&remaddr, &alen); + if ( rc != 0 ) { + CERROR("getsockname() failed: errno==%d\n", errno); + return; + } memset(&locaddr, 0, sizeof(locaddr)); locaddr.sin_family = AF_INET; - locaddr.sin_port = htons(port); + locaddr.sin_port = remaddr.sin_port; locaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); fd = socket(AF_INET, SOCK_STREAM, 0); if ( fd < 0 ) { CERROR("socket() failed: errno==%d\n", errno); return; - } - + } + rc = connect(fd, (struct sockaddr *)&locaddr, sizeof(locaddr)); if ( rc != 0 ) { if ( errno != ECONNREFUSED ) CERROR("connect() failed: errno==%d\n", errno); else - CDEBUG(D_NET, "Nobody to wake up at %d\n", port); + CDEBUG(D_NET, "Nobody to wake up at %d\n", + ntohs(remaddr.sin_port)); } - + close(fd); } -/* - * Network functions of common use - */ - int -libcfs_getpeername(int sock_fd, __u32 *ipaddr_p, __u16 *port_p) +libcfs_sock_getaddr(cfs_socket_t *sock, int remote, __u32 *ip, int *port) { int rc; struct sockaddr_in peer_addr; socklen_t peer_addr_len = sizeof(peer_addr); - rc = getpeername(sock_fd, (struct sockaddr *)&peer_addr, &peer_addr_len); + LASSERT(remote == 1); + + rc = getpeername(sock->s_fd, + (struct sockaddr *)&peer_addr, &peer_addr_len); if (rc != 0) return -errno; - - if (ipaddr_p != NULL) - *ipaddr_p = ntohl(peer_addr.sin_addr.s_addr); - if (port_p != NULL) - *port_p = ntohs(peer_addr.sin_port); - return 0; + if (ip != NULL) + *ip = ntohl(peer_addr.sin_addr.s_addr); + if (port != NULL) + *port = ntohs(peer_addr.sin_port); + + return rc; } +/* + * Network functions of common use + */ + int -libcfs_socketpair(int *fdp) +libcfs_socketpair(cfs_socket_t **sockp) { - int rc, i; - + int rc, i, fdp[2]; + + LIBCFS_ALLOC(sockp[0], sizeof(cfs_socket_t)); + if (sockp[0] == NULL) { + CERROR ("Can't alloc memory for cfs_socket_t (1)\n"); + return -ENOMEM; + } + + LIBCFS_ALLOC(sockp[1], sizeof(cfs_socket_t)); + if (sockp[1] == NULL) { + CERROR ("Can't alloc memory for cfs_socket_t (2)\n"); + LIBCFS_FREE(sockp[0], sizeof(cfs_socket_t)); + return -ENOMEM; + } + rc = socketpair(AF_UNIX, SOCK_STREAM, 0, fdp); if (rc != 0) { rc = -errno; CERROR ("Cannot create socket pair\n"); + LIBCFS_FREE(sockp[0], sizeof(cfs_socket_t)); + LIBCFS_FREE(sockp[1], sizeof(cfs_socket_t)); return rc; } - + + sockp[0]->s_fd = fdp[0]; + sockp[1]->s_fd = fdp[1]; + for (i = 0; i < 2; i++) { - rc = libcfs_fcntl_nonblock(fdp[i]); + rc = libcfs_fcntl_nonblock(sockp[i]); if (rc) { - close(fdp[0]); - close(fdp[1]); + libcfs_sock_release(sockp[0]); + libcfs_sock_release(sockp[1]); return rc; } } - + return 0; } int -libcfs_fcntl_nonblock(int fd) +libcfs_fcntl_nonblock(cfs_socket_t *sock) { int rc, flags; - - flags = fcntl(fd, F_GETFL, 0); + + flags = fcntl(sock->s_fd, F_GETFL, 0); if (flags == -1) { rc = -errno; CERROR ("Cannot get socket flags\n"); return rc; } - - rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + + rc = fcntl(sock->s_fd, F_SETFL, flags | O_NONBLOCK); if (rc != 0) { rc = -errno; CERROR ("Cannot set socket flags\n"); return rc; } - + return 0; } int -libcfs_sock_set_nagle(int fd, int nagle) +libcfs_sock_set_nagle(cfs_socket_t *sock, int nagle) { int rc; int option = nagle ? 0 : 1; #if defined(__sun__) || defined(__sun) - rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &option, sizeof(option)); + rc = setsockopt(sock->s_fd, + IPPROTO_TCP, TCP_NODELAY, &option, sizeof(option)); #else - rc = setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option)); + rc = setsockopt(sock->s_fd, + SOL_TCP, TCP_NODELAY, &option, sizeof(option)); #endif if (rc != 0) { @@ -477,14 +541,15 @@ libcfs_sock_set_nagle(int fd, int nagle) } int -libcfs_sock_set_bufsiz(int fd, int bufsiz) +libcfs_sock_set_bufsiz(cfs_socket_t *sock, int bufsiz) { int rc, option; - + LASSERT (bufsiz != 0); option = bufsiz; - rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option)); + rc = setsockopt(sock->s_fd, + SOL_SOCKET, SO_SNDBUF, &option, sizeof(option)); if (rc != 0) { rc = -errno; CERROR ("Cannot set SNDBUF socket option\n"); @@ -492,7 +557,8 @@ libcfs_sock_set_bufsiz(int fd, int bufsiz) } option = bufsiz; - rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option)); + rc = setsockopt(sock->s_fd, + SOL_SOCKET, SO_RCVBUF, &option, sizeof(option)); if (rc != 0) { rc = -errno; CERROR ("Cannot set RCVBUF socket option\n"); @@ -503,59 +569,75 @@ libcfs_sock_set_bufsiz(int fd, int bufsiz) } int -libcfs_sock_create(int *fdp) +libcfs_sock_bind(cfs_socket_t *sock, __u32 ip, __u16 port) { - int rc, fd, option; + int rc; + struct sockaddr_in locaddr; - fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) { - rc = -errno; - CERROR ("Cannot create socket\n"); - return rc; - } + if (ip == 0 && port == 0) + return 0; - option = 1; - rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, - &option, sizeof(option)); + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; + locaddr.sin_addr.s_addr = (ip == 0) ? INADDR_ANY : htonl(ip); + locaddr.sin_port = htons(port); + + rc = bind(sock->s_fd, (struct sockaddr *)&locaddr, sizeof(locaddr)); if (rc != 0) { rc = -errno; - CERROR ("Cannot set SO_REUSEADDR for socket\n"); - close(fd); + CERROR("Cannot bind to %d.%d.%d.%d %d: %d\n", + HIPQUAD(ip), port, rc); return rc; - } - - *fdp = fd; - return 0; -} + } -void libcfs_sock_release(int fd) -{ - close(fd); + return 0; } int -libcfs_sock_bind_to_port(int fd, __u16 port) +libcfs_sock_create(cfs_socket_t **sockp, int *fatal, + __u32 local_ip, int local_port) { - int rc; - struct sockaddr_in locaddr; + int rc, fd, option; - memset(&locaddr, 0, sizeof(locaddr)); - locaddr.sin_family = AF_INET; - locaddr.sin_addr.s_addr = INADDR_ANY; - locaddr.sin_port = htons(port); + *fatal = 1; - rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr)); + LIBCFS_ALLOC(*sockp, sizeof(cfs_socket_t)); + if (*sockp == NULL) { + CERROR("Can't alloc memory for cfs_socket_t\n"); + return -ENOMEM; + } + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + rc = -errno; + CERROR("Cannot create socket: %d\n", rc); + LIBCFS_FREE(*sockp, sizeof(cfs_socket_t)); + return rc; + } + + (*sockp)->s_fd = fd; + + option = 1; + rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + &option, sizeof(option)); if (rc != 0) { rc = -errno; - CERROR ("Cannot bind to port %d\n", port); + CERROR("Cannot set SO_REUSEADDR for socket: %d\n", rc); + libcfs_sock_release(*sockp); return rc; } - return 0; + rc = libcfs_sock_bind(*sockp, local_ip, local_port); + if (rc != 0) { + *fatal = 0; + libcfs_sock_release(*sockp); + } + + return rc; } int -libcfs_sock_connect(int fd, __u32 ip, __u16 port) +libcfs_sock_connect(cfs_socket_t *sock, __u32 ip, __u16 port) { int rc; struct sockaddr_in addr; @@ -564,8 +646,8 @@ libcfs_sock_connect(int fd, __u32 ip, __u16 port) addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(ip); addr.sin_port = htons(port); - - rc = connect(fd, (struct sockaddr *)&addr, + + rc = connect(sock->s_fd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in)); if(rc != 0 && errno != EINPROGRESS) { @@ -582,16 +664,17 @@ libcfs_sock_connect(int fd, __u32 ip, __u16 port) /* NB: EPIPE and ECONNRESET are considered as non-fatal * because: * 1) it still makes sense to continue reading && - * 2) anyway, poll() will set up POLLHUP|POLLERR flags */ -int libcfs_sock_writev(int fd, const struct iovec *vector, int count) + * 2) anyway, poll() will set up POLLHUP|POLLERR flags */ +int +libcfs_sock_writev(cfs_socket_t *sock, const struct iovec *vector, int count) { int rc; - - rc = syscall(SYS_writev, fd, vector, count); - - if (rc == 0) /* write nothing */ + + rc = syscall(SYS_writev, sock->s_fd, vector, count); + + if (rc == 0) /* write nothing */ return 0; - + if (rc < 0) { if (errno == EAGAIN || /* write nothing */ errno == EPIPE || /* non-fatal error */ @@ -604,15 +687,16 @@ int libcfs_sock_writev(int fd, const struct iovec *vector, int count) return rc; } -int libcfs_sock_readv(int fd, const struct iovec *vector, int count) +int +libcfs_sock_readv(cfs_socket_t *sock, const struct iovec *vector, int count) { int rc; - - rc = syscall(SYS_readv, fd, vector, count); - - if (rc == 0) /* EOF */ + + rc = syscall(SYS_readv, sock->s_fd, vector, count); + + if (rc == 0) /* EOF */ return -EIO; - + if (rc < 0) { if (errno == EAGAIN) /* read nothing */ return 0; diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index 8d836f3..cdbfb89 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -64,7 +64,7 @@ static inline int lnet_is_wire_handle_none (lnet_handle_wire_t *wh) wh->wh_object_cookie == LNET_WIRE_HANDLE_COOKIE_NONE); } -static inline int lnet_md_exhausted (lnet_libmd_t *md) +static inline int lnet_md_exhausted (lnet_libmd_t *md) { return (md->md_threshold == 0 || ((md->md_options & LNET_MD_MAX_SIZE) != 0 && @@ -89,8 +89,8 @@ static inline int lnet_md_unlinkable (lnet_libmd_t *md) } #ifdef __KERNEL__ -#define LNET_LOCK() spin_lock(&the_lnet.ln_lock) -#define LNET_UNLOCK() spin_unlock(&the_lnet.ln_lock) +#define LNET_LOCK() spin_lock(&the_lnet.ln_lock) +#define LNET_UNLOCK() spin_unlock(&the_lnet.ln_lock) #define LNET_MUTEX_DOWN(m) mutex_down(m) #define LNET_MUTEX_UP(m) mutex_up(m) #else @@ -136,7 +136,7 @@ lnet_freelist_alloc (lnet_freelist_t *fl) if (list_empty (&fl->fl_list)) return (NULL); - + o = list_entry (fl->fl_list.next, lnet_freeobj_t, fo_list); list_del (&o->fo_list); return ((void *)&o->fo_contents); @@ -147,7 +147,7 @@ lnet_freelist_free (lnet_freelist_t *fl, void *obj) { /* ALWAYS called with liblock held */ lnet_freeobj_t *o = list_entry (obj, lnet_freeobj_t, fo_contents); - + list_add (&o->fo_list, &fl->fl_list); } @@ -157,7 +157,7 @@ lnet_eq_alloc (void) { /* NEVER called with liblock held */ lnet_eq_t *eq; - + LNET_LOCK(); eq = (lnet_eq_t *)lnet_freelist_alloc(&the_lnet.ln_free_eqs); LNET_UNLOCK(); @@ -177,7 +177,7 @@ lnet_md_alloc (lnet_md_t *umd) { /* NEVER called with liblock held */ lnet_libmd_t *md; - + LNET_LOCK(); md = (lnet_libmd_t *)lnet_freelist_alloc(&the_lnet.ln_free_mds); LNET_UNLOCK(); @@ -200,11 +200,11 @@ lnet_me_alloc (void) { /* NEVER called with liblock held */ lnet_me_t *me; - + LNET_LOCK(); me = (lnet_me_t *)lnet_freelist_alloc(&the_lnet.ln_free_mes); LNET_UNLOCK(); - + return (me); } @@ -220,7 +220,7 @@ lnet_msg_alloc (void) { /* NEVER called with liblock held */ lnet_msg_t *msg; - + LNET_LOCK(); msg = (lnet_msg_t *)lnet_freelist_alloc(&the_lnet.ln_free_msgs); LNET_UNLOCK(); @@ -287,11 +287,11 @@ lnet_md_alloc (lnet_md_t *umd) md->md_niov = niov; CFS_INIT_LIST_HEAD(&md->md_list); } - + return (md); } -static inline void +static inline void lnet_md_free (lnet_libmd_t *md) { /* ALWAYS called with liblock held */ @@ -315,7 +315,7 @@ lnet_me_alloc (void) return (me); } -static inline void +static inline void lnet_me_free(lnet_me_t *me) { /* ALWAYS called with liblock held */ @@ -340,7 +340,7 @@ lnet_msg_alloc(void) return (msg); } -static inline void +static inline void lnet_msg_free(lnet_msg_t *msg) { /* ALWAYS called with liblock held */ @@ -368,7 +368,7 @@ static inline lnet_eq_t * lnet_handle2eq (lnet_handle_eq_t *handle) { /* ALWAYS called with liblock held */ - lnet_libhandle_t *lh = lnet_lookup_cookie(handle->cookie, + lnet_libhandle_t *lh = lnet_lookup_cookie(handle->cookie, LNET_COOKIE_TYPE_EQ); if (lh == NULL) return (NULL); @@ -399,10 +399,10 @@ lnet_wire_handle2md (lnet_handle_wire_t *wh) { /* ALWAYS called with liblock held */ lnet_libhandle_t *lh; - + if (wh->wh_interface_cookie != the_lnet.ln_interface_cookie) return (NULL); - + lh = lnet_lookup_cookie(wh->wh_object_cookie, LNET_COOKIE_TYPE_MD); if (lh == NULL) @@ -454,14 +454,14 @@ lnet_isrouter(lnet_peer_t *lp) } static inline void -lnet_ni_addref_locked(lnet_ni_t *ni) +lnet_ni_addref_locked(lnet_ni_t *ni) { LASSERT (ni->ni_refcount > 0); ni->ni_refcount++; } static inline void -lnet_ni_addref(lnet_ni_t *ni) +lnet_ni_addref(lnet_ni_t *ni) { LNET_LOCK(); lnet_ni_addref_locked(ni); @@ -488,7 +488,7 @@ lnet_ni_decref(lnet_ni_t *ni) static inline struct list_head * lnet_nid2peerhash (lnet_nid_t nid) { - unsigned int idx = LNET_NIDADDR(nid) % LNET_PEER_HASHSIZE; + unsigned int idx = LNET_NIDADDR(nid) % LNET_PEER_HASHSIZE; return &the_lnet.ln_peer_hash[idx]; } @@ -526,7 +526,7 @@ lnet_set_msg_uid(lnet_ni_t *ni, lnet_msg_t *msg, lnet_uid_t uid) extern lnet_ni_t *lnet_nid2ni_locked (lnet_nid_t nid); extern lnet_ni_t *lnet_net2ni_locked (__u32 net); static inline lnet_ni_t * -lnet_net2ni (__u32 net) +lnet_net2ni (__u32 net) { lnet_ni_t *ni; @@ -542,7 +542,7 @@ int lnet_add_route(__u32 net, unsigned int hops, lnet_nid_t gateway_nid); int lnet_check_routes(void); int lnet_del_route(__u32 net, lnet_nid_t gw_nid); void lnet_destroy_routes(void); -int lnet_get_route(int idx, __u32 *net, __u32 *hops, +int lnet_get_route(int idx, __u32 *net, __u32 *hops, lnet_nid_t *gateway, __u32 *alive); void lnet_proc_init(void); void lnet_proc_fini(void); @@ -579,25 +579,25 @@ int lnet_extract_iov (int dst_niov, struct iovec *dst, unsigned int offset, unsigned int len); unsigned int lnet_kiov_nob (unsigned int niov, lnet_kiov_t *iov); -int lnet_extract_kiov (int dst_niov, lnet_kiov_t *dst, +int lnet_extract_kiov (int dst_niov, lnet_kiov_t *dst, int src_niov, lnet_kiov_t *src, unsigned int offset, unsigned int len); -void lnet_copy_iov2iov (unsigned int ndiov, struct iovec *diov, - unsigned int doffset, - unsigned int nsiov, struct iovec *siov, +void lnet_copy_iov2iov (unsigned int ndiov, struct iovec *diov, + unsigned int doffset, + unsigned int nsiov, struct iovec *siov, unsigned int soffset, unsigned int nob); -void lnet_copy_kiov2iov (unsigned int niov, struct iovec *iov, +void lnet_copy_kiov2iov (unsigned int niov, struct iovec *iov, unsigned int iovoffset, - unsigned int nkiov, lnet_kiov_t *kiov, + unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset, unsigned int nob); -void lnet_copy_iov2kiov (unsigned int nkiov, lnet_kiov_t *kiov, +void lnet_copy_iov2kiov (unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset, - unsigned int niov, struct iovec *iov, + unsigned int niov, struct iovec *iov, unsigned int iovoffset, unsigned int nob); -void lnet_copy_kiov2kiov (unsigned int ndkiov, lnet_kiov_t *dkiov, - unsigned int doffset, - unsigned int nskiov, lnet_kiov_t *skiov, +void lnet_copy_kiov2kiov (unsigned int ndkiov, lnet_kiov_t *dkiov, + unsigned int doffset, + unsigned int nskiov, lnet_kiov_t *skiov, unsigned int soffset, unsigned int nob); static inline void @@ -687,4 +687,23 @@ void lnet_destroy_peer_table(void); int lnet_create_peer_table(void); void lnet_debug_peer(lnet_nid_t nid); +#ifndef __KERNEL__ +static inline int +lnet_parse_int_tunable(int *value, char *name) +{ + char *env = getenv(name); + char *end; + + if (env == NULL) + return 0; + + *value = strtoull(env, &end, 0); + if (*end == 0) + return 0; + + CERROR("Can't parse tunable %s=%s\n", name, env); + return -EINVAL; +} +#endif + #endif diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 3fa3d8d..f518278 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -56,7 +56,7 @@ #include #include -#define WIRE_ATTR __attribute__((packed)) +#define WIRE_ATTR __attribute__((packed)) /* The wire handle's interface cookie only matches one network interface in * one epoch (i.e. new cookie when the interface restarts or the node @@ -133,14 +133,14 @@ typedef struct { /* A HELLO message contains a magic number and protocol version * code in the header's dest_nid, the peer's NID in the src_nid, and * LNET_MSG_HELLO in the type field. All other common fields are zero - * (including payload_size; i.e. no payload). + * (including payload_size; i.e. no payload). * This is for use by byte-stream LNDs (e.g. TCP/IP) to check the peer is * running the same protocol and to find out its NID. These LNDs should * exchange HELLO messages when a connection is first established. Individual * LNDs can put whatever else they fancy in lnet_hdr_t::msg. */ typedef struct { - __u32 magic; /* LNET_PROTO_TCP_MAGIC */ + __u32 magic; /* LNET_PROTO_TCP_MAGIC */ __u16 version_major; /* increment on incompatible change */ __u16 version_minor; /* increment on compatible change */ } WIRE_ATTR lnet_magicversion_t; @@ -224,7 +224,7 @@ typedef struct lnet_libhandle { } lnet_libhandle_t; #define lh_entry(ptr, type, member) \ - ((type *)((char *)(ptr)-(char *)(&((type *)0)->member))) + ((type *)((char *)(ptr)-(char *)(&((type *)0)->member))) typedef struct lnet_eq { struct list_head eq_list; @@ -276,7 +276,7 @@ typedef struct lnet_libmd { #ifdef LNET_USE_LIB_FREELIST typedef struct { - void *fl_objs; /* single contiguous array of objects */ + void *fl_objs; /* single contiguous array of objects */ int fl_nobjs; /* the number of them */ int fl_objsize; /* the size (including overhead) of each of them */ struct list_head fl_list; /* where they are enqueued */ @@ -313,36 +313,36 @@ typedef struct lnet_lnd /* fields initialised by the LND */ unsigned int lnd_type; - + int (*lnd_startup) (struct lnet_ni *ni); void (*lnd_shutdown) (struct lnet_ni *ni); int (*lnd_ctl)(struct lnet_ni *ni, unsigned int cmd, void *arg); /* In data movement APIs below, payload buffers are described as a set * of 'niov' fragments which are... - * EITHER + * EITHER * in virtual memory (struct iovec *iov != NULL) * OR * in pages (kernel only: plt_kiov_t *kiov != NULL). * The LND may NOT overwrite these fragment descriptors. * An 'offset' and may specify a byte offset within the set of - * fragments to start from + * fragments to start from */ /* Start sending a preformatted message. 'private' is NULL for PUT and - * GET messages; otherwise this is a response to an incoming message - * and 'private' is the 'private' passed to lnet_parse(). Return - * non-zero for immediate failure, otherwise complete later with - * lnet_finalize() */ - int (*lnd_send)(struct lnet_ni *ni, void *private, lnet_msg_t *msg); + * GET messages; otherwise this is a response to an incoming message + * and 'private' is the 'private' passed to lnet_parse(). Return + * non-zero for immediate failure, otherwise complete later with + * lnet_finalize() */ + int (*lnd_send)(struct lnet_ni *ni, void *private, lnet_msg_t *msg); /* Start receiving 'mlen' bytes of payload data, skipping the following * 'rlen' - 'mlen' bytes. 'private' is the 'private' passed to * lnet_parse(). Return non-zero for immedaite failure, otherwise * complete later with lnet_finalize(). This also gives back a receive * credit if the LND does flow control. */ - int (*lnd_recv)(struct lnet_ni *ni, void *private, lnet_msg_t *msg, - int delayed, unsigned int niov, + int (*lnd_recv)(struct lnet_ni *ni, void *private, lnet_msg_t *msg, + int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen); @@ -358,19 +358,17 @@ typedef struct lnet_lnd /* notification of peer health */ void (*lnd_notify)(struct lnet_ni *ni, lnet_nid_t peer, int alive); -#ifdef __KERNEL__ +#if defined(__KERNEL__) || defined(HAVE_LIBPTHREAD) /* accept a new connection */ int (*lnd_accept)(struct lnet_ni *ni, cfs_socket_t *sock); -#else +#endif + +#ifndef __KERNEL__ /* wait for something to happen */ void (*lnd_wait)(struct lnet_ni *ni, int milliseconds); /* ensure non-RDMA messages can be received outside liblustre */ int (*lnd_setasync)(struct lnet_ni *ni, lnet_process_id_t id, int nasync); - -#ifdef HAVE_LIBPTHREAD - int (*lnd_accept)(struct lnet_ni *ni, int sock); -#endif #endif } lnd_t; @@ -416,7 +414,7 @@ typedef struct lnet_peer { } lnet_peer_t; typedef struct { - struct list_head lr_list; /* chain on net */ + struct list_head lr_list; /* chain on net */ lnet_peer_t *lr_gateway; /* router node */ } lnet_route_t; @@ -540,10 +538,10 @@ typedef struct struct list_head *ln_peer_hash; /* NID->peer hash */ int ln_npeers; /* # peers extant */ int ln_peertable_version; /* /proc validity stamp */ - + int ln_routing; /* am I a router? */ lnet_rtrbufpool_t ln_rtrpools[LNET_NRBPOOLS]; /* router buffer pools */ - + int ln_lh_hash_size; /* size of lib handle hash table */ struct list_head *ln_lh_hash_table; /* all extant lib handles, this interface */ __u64 ln_next_object_cookie; /* cookie generator */ @@ -568,11 +566,11 @@ typedef struct lnet_ping_info_t *ln_ping_info; #ifdef __KERNEL__ - int ln_rc_state; /* router checker startup/shutdown state */ - struct semaphore ln_rc_signal; /* serialise startup/shutdown */ + int ln_rc_state; /* router checker startup/shutdown state */ + struct semaphore ln_rc_signal; /* serialise startup/shutdown */ lnet_handle_eq_t ln_rc_eqh; /* router checker's event queue */ #endif - + #ifdef LNET_USE_LIB_FREELIST lnet_freelist_t ln_free_mes; lnet_freelist_t ln_free_msgs; @@ -592,7 +590,7 @@ typedef struct * call lnet_server_mode() */ int ln_server_mode_flag; -#endif +#endif } lnet_t; #endif diff --git a/lnet/lnet/acceptor.c b/lnet/lnet/acceptor.c index 198b057..c852f16 100644 --- a/lnet/lnet/acceptor.c +++ b/lnet/lnet/acceptor.c @@ -37,28 +37,55 @@ #define DEBUG_SUBSYSTEM S_LNET #include +#if defined(__KERNEL__) || defined(HAVE_LIBPTHREAD) + +static int accept_port = 988; +static int accept_backlog = 127; +static int accept_timeout = 5; + +struct { + int pta_shutdown; + cfs_socket_t *pta_sock; #ifdef __KERNEL__ + struct semaphore pta_signal; +#else + struct cfs_completion pta_signal; +#endif +} lnet_acceptor_state; + +int +lnet_acceptor_port(void) +{ + return accept_port; +} + +static inline int +lnet_accept_magic(__u32 magic, __u32 constant) +{ + return (magic == constant || + magic == __swab32(constant)); +} + +#ifdef __KERNEL__ + +#define cfs_init_completion(c) init_mutex_locked(c) +#define cfs_wait_for_completion(c) mutex_down(c) +#define cfs_complete(c) mutex_up(c) +#define cfs_fini_completion(c) do { } while (0) +#define cfs_create_thread(func, a) cfs_kernel_thread(func, a, 0) + +EXPORT_SYMBOL(lnet_acceptor_port); + static char *accept = "secure"; + CFS_MODULE_PARM(accept, "s", charp, 0444, "Accept connections (secure|all|none)"); - -static int accept_port = 988; CFS_MODULE_PARM(accept_port, "i", int, 0444, "Acceptor's port (same on all nodes)"); - -static int accept_backlog = 127; CFS_MODULE_PARM(accept_backlog, "i", int, 0444, "Acceptor's listen backlog"); - -static int accept_timeout = 5; CFS_MODULE_PARM(accept_timeout, "i", int, 0644, - "Acceptor's timeout (seconds)"); - -struct { - int pta_shutdown; - cfs_socket_t *pta_sock; - struct semaphore pta_signal; -} lnet_acceptor_state; + "Acceptor's timeout (seconds)"); int lnet_acceptor_timeout(void) @@ -67,15 +94,8 @@ lnet_acceptor_timeout(void) } EXPORT_SYMBOL(lnet_acceptor_timeout); -int -lnet_acceptor_port(void) -{ - return accept_port; -} -EXPORT_SYMBOL(lnet_acceptor_port); - void -lnet_connect_console_error (int rc, lnet_nid_t peer_nid, +lnet_connect_console_error (int rc, lnet_nid_t peer_nid, __u32 peer_ip, int peer_port) { switch (rc) { @@ -145,13 +165,13 @@ lnet_connect(cfs_socket_t **sockp, lnet_nid_t peer_nid, CLASSERT (sizeof(cr) <= 16); /* not too big to be on the stack */ - for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; - port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; + for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; + port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; --port) { /* Iterate through reserved ports. */ - rc = libcfs_sock_connect(&sock, &fatal, - local_ip, port, + rc = libcfs_sock_connect(&sock, &fatal, + local_ip, port, peer_ip, peer_port); if (rc != 0) { if (fatal) @@ -183,14 +203,14 @@ lnet_connect(cfs_socket_t **sockp, lnet_nid_t peer_nid, accept_timeout); if (rc != 0) goto failed_sock; - + *sockp = sock; return 0; } rc = -EADDRINUSE; goto failed; - + failed_sock: libcfs_sock_release(sock); failed: @@ -199,13 +219,45 @@ lnet_connect(cfs_socket_t **sockp, lnet_nid_t peer_nid, } EXPORT_SYMBOL(lnet_connect); -static inline int -lnet_accept_magic(__u32 magic, __u32 constant) +#else /* below is multi-threaded user-space code */ + +static char *accept_type = "secure"; + +int +lnet_acceptor_get_tunables() { - return (magic == constant || - magic == __swab32(constant)); + int rc; + char *env = getenv("LNET_ACCEPT"); + + if (env != NULL) + accept_type = env; + + rc = lnet_parse_int_tunable(&accept_port, "LNET_ACCEPT_PORT"); + + if (rc != 0) + return rc; + + rc = lnet_parse_int_tunable(&accept_backlog, "LNET_ACCEPT_BACKLOG"); + + if (rc != 0) + return rc; + + rc = lnet_parse_int_tunable(&accept_timeout, "LNET_ACCEPT_TIMEOUT"); + + if (rc != 0) + return rc; + + CDEBUG(D_NET, "accept_type = %s\n", accept_type); + CDEBUG(D_NET, "accept_port = %d\n", accept_port); + CDEBUG(D_NET, "accept_backlog = %d\n", accept_backlog); + CDEBUG(D_NET, "accept_timeout = %d\n", accept_timeout); + return 0; } +#endif /* __KERNEL__ */ + +/* Below is the code common for both kernel and MT user-space */ + int lnet_accept(cfs_socket_t *sock, __u32 magic) { @@ -218,7 +270,7 @@ lnet_accept(cfs_socket_t *sock, __u32 magic) char *str; LASSERT (sizeof(cr) <= 16); /* not too big for the stack */ - + rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port); LASSERT (rc == 0); /* we succeeded before */ @@ -251,7 +303,7 @@ lnet_accept(cfs_socket_t *sock, __u32 magic) str = "'old' openibnal"; else str = "unrecognised"; - + LCONSOLE_ERROR_MSG(0x11f, "Refusing connection from %u.%u.%u.%u" " magic %08x: %s acceptor protocol\n", HIPQUAD(peer_ip), magic, str); @@ -260,7 +312,7 @@ lnet_accept(cfs_socket_t *sock, __u32 magic) flip = (magic != LNET_PROTO_ACCEPTOR_MAGIC); - rc = libcfs_sock_read(sock, &cr.acr_version, + rc = libcfs_sock_read(sock, &cr.acr_version, sizeof(cr.acr_version), accept_timeout); if (rc != 0) { @@ -271,7 +323,7 @@ lnet_accept(cfs_socket_t *sock, __u32 magic) if (flip) __swab32s(&cr.acr_version); - + if (cr.acr_version != LNET_PROTO_ACCEPTOR_VERSION) { /* future version compatibility! * An acceptor-specific protocol rev will first send a version @@ -330,320 +382,7 @@ lnet_accept(cfs_socket_t *sock, __u32 magic) libcfs_nid2str(cr.acr_nid), HIPQUAD(peer_ip)); rc = ni->ni_lnd->lnd_accept(ni, sock); - lnet_ni_decref(ni); - return rc; -} - -int -lnet_acceptor(void *arg) -{ - char name[16]; - cfs_socket_t *newsock; - int rc; - __u32 magic; - __u32 peer_ip; - int peer_port; - int secure = (int)((long_ptr_t)arg); - - LASSERT (lnet_acceptor_state.pta_sock == NULL); - - snprintf(name, sizeof(name), "acceptor_%03d", accept_port); - cfs_daemonize(name); - cfs_block_allsigs(); - - rc = libcfs_sock_listen(&lnet_acceptor_state.pta_sock, - 0, accept_port, accept_backlog); - if (rc != 0) { - if (rc == -EADDRINUSE) - LCONSOLE_ERROR_MSG(0x122, "Can't start acceptor on port" - " %d: port already in use\n", - accept_port); - else - LCONSOLE_ERROR_MSG(0x123, "Can't start acceptor on port " - "%d: unexpected error %d\n", - accept_port, rc); - - lnet_acceptor_state.pta_sock = NULL; - } else { - LCONSOLE(0, "Accept %s, port %d\n", accept, accept_port); - } - - /* set init status and unblock parent */ - lnet_acceptor_state.pta_shutdown = rc; - mutex_up(&lnet_acceptor_state.pta_signal); - - if (rc != 0) - return rc; - - while (!lnet_acceptor_state.pta_shutdown) { - - rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock); - if (rc != 0) { - if (rc != -EAGAIN) { - CWARN("Accept error %d: pausing...\n", rc); - cfs_pause(cfs_time_seconds(1)); - } - continue; - } - - rc = libcfs_sock_getaddr(newsock, 1, &peer_ip, &peer_port); - if (rc != 0) { - CERROR("Can't determine new connection's address\n"); - goto failed; - } - - if (secure && peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) { - CERROR("Refusing connection from %u.%u.%u.%u: " - "insecure port %d\n", - HIPQUAD(peer_ip), peer_port); - goto failed; - } - - rc = libcfs_sock_read(newsock, &magic, sizeof(magic), - accept_timeout); - if (rc != 0) { - CERROR("Error %d reading connection request from " - "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); - goto failed; - } - - rc = lnet_accept(newsock, magic); - if (rc != 0) - goto failed; - - continue; - - failed: - libcfs_sock_release(newsock); - } - - libcfs_sock_release(lnet_acceptor_state.pta_sock); - lnet_acceptor_state.pta_sock = NULL; - - LCONSOLE(0, "Acceptor stopping\n"); - - /* unblock lnet_acceptor_stop() */ - mutex_up(&lnet_acceptor_state.pta_signal); - return 0; -} - -int -lnet_acceptor_start(void) -{ - long pid; - long secure; - - LASSERT (lnet_acceptor_state.pta_sock == NULL); - init_mutex_locked(&lnet_acceptor_state.pta_signal); - - if (!strcmp(accept, "secure")) { - secure = 1; - } else if (!strcmp(accept, "all")) { - secure = 0; - } else if (!strcmp(accept, "none")) { - return 0; - } else { - LCONSOLE_ERROR_MSG(0x124, "Can't parse 'accept=\"%s\"'\n", - accept); - return -EINVAL; - } - - if (lnet_count_acceptor_nis() == 0) /* not required */ - return 0; - - pid = cfs_kernel_thread(lnet_acceptor, (void *)(ulong_ptr_t)secure, 0); - if (pid < 0) { - CERROR("Can't start acceptor thread: %ld\n", pid); - return -ESRCH; - } - - mutex_down(&lnet_acceptor_state.pta_signal); /* wait for acceptor to startup */ - - if (!lnet_acceptor_state.pta_shutdown) { - /* started OK */ - LASSERT (lnet_acceptor_state.pta_sock != NULL); - return 0; - } - - LASSERT (lnet_acceptor_state.pta_sock == NULL); - return -ENETDOWN; -} - -void -lnet_acceptor_stop(void) -{ - if (lnet_acceptor_state.pta_sock == NULL) /* not running */ - return; - - lnet_acceptor_state.pta_shutdown = 1; - libcfs_sock_abort_accept(lnet_acceptor_state.pta_sock); - - /* block until acceptor signals exit */ - mutex_down(&lnet_acceptor_state.pta_signal); -} - -#else /* __KERNEL__ */ -#ifdef HAVE_LIBPTHREAD - -static char *accept_type; -static int accept_port = 988; -static int accept_backlog; -static int accept_timeout; - -struct { - int pta_shutdown; - int pta_sock; - struct cfs_completion pta_completion; -} lnet_acceptor_state; - -int -lnet_acceptor_port(void) -{ - return accept_port; -} - -int -lnet_parse_int_tunable(int *value, char *name, int dflt) -{ - char *env = getenv(name); - char *end; - - if (env == NULL) { - *value = dflt; - return 0; - } - *value = strtoull(env, &end, 0); - if (*end == 0) - return 0; - - CERROR("Can't parse tunable %s=%s\n", name, env); - return -EINVAL; -} - -int -lnet_parse_string_tunable(char **value, char *name, char *dflt) -{ - char *env = getenv(name); - - if (env == NULL) - *value = dflt; - else - *value = env; - - return 0; -} - -int -lnet_acceptor_get_tunables() -{ - int rc; - rc = lnet_parse_string_tunable(&accept_type, "LNET_ACCEPT", "secure"); - - if (rc != 0) - return rc; - - rc = lnet_parse_int_tunable(&accept_port, "LNET_ACCEPT_PORT", 988); - - if (rc != 0) - return rc; - - rc = lnet_parse_int_tunable(&accept_backlog, "LNET_ACCEPT_BACKLOG", 127); - - if (rc != 0) - return rc; - - rc = lnet_parse_int_tunable(&accept_timeout, "LNET_ACCEPT_TIMEOUT", 5); - - if (rc != 0) - return rc; - - CDEBUG(D_NET, "accept_type = %s\n", accept_type); - CDEBUG(D_NET, "accept_port = %d\n", accept_port); - CDEBUG(D_NET, "accept_backlog = %d\n", accept_backlog); - CDEBUG(D_NET, "accept_timeout = %d\n", accept_timeout); - return 0; -} - -static inline int -lnet_accept_magic(__u32 magic, __u32 constant) -{ - return (magic == constant || - magic == __swab32(constant)); -} - -/* user-land lnet_accept() isn't used by any LND's directly. So, we don't - * do it visible outside acceptor.c and we can change its prototype - * freely */ -static int -lnet_accept(int sock, __u32 magic, __u32 peer_ip, int peer_port) -{ - int rc, flip; - lnet_acceptor_connreq_t cr; - lnet_ni_t *ni; - - if (!lnet_accept_magic(magic, LNET_PROTO_ACCEPTOR_MAGIC)) { - LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u magic %08x: " - "unsupported acceptor protocol\n", - HIPQUAD(peer_ip), magic); - return -EPROTO; - } - - flip = (magic != LNET_PROTO_ACCEPTOR_MAGIC); - - rc = libcfs_sock_read(sock, &cr.acr_version, - sizeof(cr.acr_version), - accept_timeout); - if (rc != 0) { - CERROR("Error %d reading connection request version from " - "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); - return -EIO; - } - - if (flip) - __swab32s(&cr.acr_version); - - if (cr.acr_version != LNET_PROTO_ACCEPTOR_VERSION) - return -EPROTO; - - rc = libcfs_sock_read(sock, &cr.acr_nid, - sizeof(cr) - - offsetof(lnet_acceptor_connreq_t, acr_nid), - accept_timeout); - if (rc != 0) { - CERROR("Error %d reading connection request from " - "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip)); - return -EIO; - } - - if (flip) - __swab64s(&cr.acr_nid); - - ni = lnet_net2ni(LNET_NIDNET(cr.acr_nid)); - - if (ni == NULL || /* no matching net */ - ni->ni_nid != cr.acr_nid) { /* right NET, wrong NID! */ - if (ni != NULL) - lnet_ni_decref(ni); - LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: " - " No matching NI\n", - HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid)); - return -EPERM; - } - - if (ni->ni_lnd->lnd_accept == NULL) { - lnet_ni_decref(ni); - LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: " - " NI doesn not accept IP connections\n", - HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid)); - return -EPERM; - } - - CDEBUG(D_NET, "Accept %s from %u.%u.%u.%u\n", - libcfs_nid2str(cr.acr_nid), HIPQUAD(peer_ip)); - - rc = ni->ni_lnd->lnd_accept(ni, sock); - lnet_ni_decref(ni); return rc; } @@ -652,12 +391,14 @@ int lnet_acceptor(void *arg) { char name[16]; - int secure = (int)((unsigned long)arg); + cfs_socket_t *newsock; int rc; - int newsock; + __u32 magic; __u32 peer_ip; int peer_port; - __u32 magic; + int secure = (int)((long_ptr_t)arg); + + LASSERT (lnet_acceptor_state.pta_sock == NULL); snprintf(name, sizeof(name), "acceptor_%03d", accept_port); cfs_daemonize(name); @@ -667,31 +408,40 @@ lnet_acceptor(void *arg) 0, accept_port, accept_backlog); if (rc != 0) { if (rc == -EADDRINUSE) - LCONSOLE_ERROR("Can't start acceptor on port %d: " - "port already in use\n", - accept_port); + LCONSOLE_ERROR_MSG(0x122, "Can't start acceptor on port" + " %d: port already in use\n", + accept_port); else - LCONSOLE_ERROR("Can't start acceptor on port %d: " - "unexpected error %d\n", - accept_port, rc); + LCONSOLE_ERROR_MSG(0x123, "Can't start acceptor on port " + "%d: unexpected error %d\n", + accept_port, rc); + lnet_acceptor_state.pta_sock = NULL; } else { +#ifdef __KERNEL__ + LCONSOLE(0, "Accept %s, port %d\n", accept, accept_port); +#else LCONSOLE(0, "Accept %s, port %d\n", accept_type, accept_port); +#endif } - + /* set init status and unblock parent */ lnet_acceptor_state.pta_shutdown = rc; - cfs_complete(&lnet_acceptor_state.pta_completion); + cfs_complete(&lnet_acceptor_state.pta_signal); if (rc != 0) return rc; while (!lnet_acceptor_state.pta_shutdown) { - rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock, - &peer_ip, &peer_port); - if (rc != 0) + rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock); + if (rc != 0) { + if (rc != -EAGAIN) { + CWARN("Accept error %d: pausing...\n", rc); + cfs_pause(cfs_time_seconds(1)); + } continue; + } /* maybe we're waken up with libcfs_sock_abort_accept() */ if (lnet_acceptor_state.pta_shutdown) { @@ -699,6 +449,12 @@ lnet_acceptor(void *arg) break; } + rc = libcfs_sock_getaddr(newsock, 1, &peer_ip, &peer_port); + if (rc != 0) { + CERROR("Can't determine new connection's address\n"); + goto failed; + } + if (secure && peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) { CERROR("Refusing connection from %u.%u.%u.%u: " "insecure port %d\n", @@ -714,33 +470,55 @@ lnet_acceptor(void *arg) goto failed; } - rc = lnet_accept(newsock, magic, peer_ip, peer_port); + rc = lnet_accept(newsock, magic); if (rc != 0) goto failed; - + continue; - - failed: + + failed: libcfs_sock_release(newsock); } - + libcfs_sock_release(lnet_acceptor_state.pta_sock); - LCONSOLE(0,"Acceptor stopping\n"); + lnet_acceptor_state.pta_sock = NULL; - /* unblock lnet_acceptor_stop() */ - cfs_complete(&lnet_acceptor_state.pta_completion); + LCONSOLE(0, "Acceptor stopping\n"); + /* unblock lnet_acceptor_stop() */ + cfs_complete(&lnet_acceptor_state.pta_signal); return 0; } -static int skip_waiting_for_completion; +static inline int +accept2secure(const char *acc, long *sec) +{ + if (!strcmp(acc, "secure")) { + *sec = 1; + return 1; + } else if (!strcmp(acc, "all")) { + *sec = 0; + return 1; + } else if (!strcmp(acc, "none")) { + return 0; + } else { + LCONSOLE_ERROR_MSG(0x124, "Can't parse 'accept=\"%s\"'\n", + acc); + return -EINVAL; + } +} int lnet_acceptor_start(void) { - long secure; - int rc; + int rc; + long rc2; + long secure; + + LASSERT (lnet_acceptor_state.pta_sock == NULL); +#ifndef __KERNEL__ + /* kernel version uses CFS_MODULE_PARM */ rc = lnet_acceptor_get_tunables(); if (rc != 0) return rc; @@ -748,71 +526,67 @@ lnet_acceptor_start(void) /* Do nothing if we're liblustre clients */ if ((the_lnet.ln_pid & LNET_PID_USERFLAG) != 0) return 0; - - cfs_init_completion(&lnet_acceptor_state.pta_completion); - - if (!strcmp(accept_type, "secure")) { - secure = 1; - } else if (!strcmp(accept_type, "all")) { - secure = 0; - } else if (!strcmp(accept_type, "none")) { - skip_waiting_for_completion = 1; - return 0; - } else { - LCONSOLE_ERROR ("Can't parse 'accept_type=\"%s\"'\n", accept_type); - cfs_fini_completion(&lnet_acceptor_state.pta_completion); - return -EINVAL; +#endif + cfs_init_completion(&lnet_acceptor_state.pta_signal); + +#ifdef __KERNEL__ + rc = accept2secure(accept, &secure); +#else + rc = accept2secure(accept_type, &secure); +#endif + if (rc <= 0) { + cfs_fini_completion(&lnet_acceptor_state.pta_signal); + return rc; } - if (lnet_count_acceptor_nis() == 0) { /* not required */ - skip_waiting_for_completion = 1; + if (lnet_count_acceptor_nis() == 0) /* not required */ return 0; - } - rc = cfs_create_thread(lnet_acceptor, (void *)secure); - if (rc != 0) { + rc2 = cfs_create_thread(lnet_acceptor, (void *)(ulong_ptr_t)secure); + if (rc2 < 0) { CERROR("Can't start acceptor thread: %d\n", rc); - cfs_fini_completion(&lnet_acceptor_state.pta_completion); - return rc; + cfs_fini_completion(&lnet_acceptor_state.pta_signal); + return -ESRCH; } /* wait for acceptor to startup */ - cfs_wait_for_completion(&lnet_acceptor_state.pta_completion); + cfs_wait_for_completion(&lnet_acceptor_state.pta_signal); - if (!lnet_acceptor_state.pta_shutdown) + if (!lnet_acceptor_state.pta_shutdown) { + /* started OK */ + LASSERT (lnet_acceptor_state.pta_sock != NULL); return 0; - - cfs_fini_completion(&lnet_acceptor_state.pta_completion); + } + + LASSERT (lnet_acceptor_state.pta_sock == NULL); + cfs_fini_completion(&lnet_acceptor_state.pta_signal); return -ENETDOWN; } void lnet_acceptor_stop(void) { - /* Do nothing if we're liblustre clients */ - if ((the_lnet.ln_pid & LNET_PID_USERFLAG) != 0) + if (lnet_acceptor_state.pta_sock == NULL) /* not running */ return; - if (!skip_waiting_for_completion) { - lnet_acceptor_state.pta_shutdown = 1; - libcfs_sock_abort_accept(accept_port); - - /* block until acceptor signals exit */ - cfs_wait_for_completion(&lnet_acceptor_state.pta_completion); - } - - cfs_fini_completion(&lnet_acceptor_state.pta_completion); + lnet_acceptor_state.pta_shutdown = 1; + libcfs_sock_abort_accept(lnet_acceptor_state.pta_sock); + + /* block until acceptor signals exit */ + cfs_wait_for_completion(&lnet_acceptor_state.pta_signal); + + cfs_fini_completion(&lnet_acceptor_state.pta_signal); } -#else + +#else /* single-threaded user-space */ int lnet_acceptor_start(void) { - return 0; + return 0; } void lnet_acceptor_stop(void) { } -#endif /* !HAVE_LIBPTHREAD */ -#endif /* !__KERNEL__ */ +#endif /* defined(__KERNEL__) || defined(HAVE_LIBPTHREAD) */ diff --git a/lnet/ulnds/socklnd/conn.c b/lnet/ulnds/socklnd/conn.c index a386bb1..4d6e01d 100644 --- a/lnet/ulnds/socklnd/conn.c +++ b/lnet/ulnds/socklnd/conn.c @@ -51,7 +51,7 @@ usocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time) if (conn->uc_rx_flag && /* receiving is in progress */ cfs_time_aftereq(current_time, conn->uc_rx_deadline)) return 1; - + return 0; } @@ -61,7 +61,7 @@ usocklnd_conn_kill(usock_conn_t *conn) pthread_mutex_lock(&conn->uc_lock); if (conn->uc_state != UC_DEAD) usocklnd_conn_kill_locked(conn); - pthread_mutex_unlock(&conn->uc_lock); + pthread_mutex_unlock(&conn->uc_lock); } /* Mark the conn as DEAD and schedule its deletion */ @@ -82,7 +82,7 @@ usocklnd_conn_allocate() LIBCFS_ALLOC (pr, sizeof(*pr)); if (pr == NULL) return NULL; - + LIBCFS_ALLOC (conn, sizeof(*conn)); if (conn == NULL) { LIBCFS_FREE (pr, sizeof(*pr)); @@ -115,7 +115,7 @@ usocklnd_conn_free(usock_conn_t *conn) LIBCFS_FREE (conn->uc_rx_hello, offsetof(ksock_hello_msg_t, kshm_ips[LNET_MAX_INTERFACES])); - + LIBCFS_FREE (conn, sizeof(*conn)); } @@ -128,12 +128,12 @@ usocklnd_tear_peer_conn(usock_conn_t *conn) lnet_process_id_t id; int decref_flag = 0; int killall_flag = 0; - + if (peer == NULL) /* nothing to tear */ return; - + pthread_mutex_lock(&peer->up_lock); - pthread_mutex_lock(&conn->uc_lock); + pthread_mutex_lock(&conn->uc_lock); ni = peer->up_ni; id = peer->up_peerid; @@ -142,9 +142,9 @@ usocklnd_tear_peer_conn(usock_conn_t *conn) if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) { /* change state not to finalize twice */ conn->uc_rx_state = UC_RX_KSM_HEADER; - lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO); + lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO); } - + usocklnd_destroy_txlist(peer->up_ni, &conn->uc_tx_list); @@ -155,14 +155,14 @@ usocklnd_tear_peer_conn(usock_conn_t *conn) if(conn->uc_errored && !peer->up_errored) peer->up_errored = killall_flag = 1; } - + pthread_mutex_unlock(&conn->uc_lock); if (killall_flag) usocklnd_del_conns_locked(peer); pthread_mutex_unlock(&peer->up_lock); - + if (!decref_flag) return; @@ -178,7 +178,7 @@ void usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id) { usock_peer_t *peer; - + pthread_rwlock_wrlock(&usock_data.ud_peers_lock); peer = usocklnd_find_peer_locked(ni, id); @@ -192,13 +192,13 @@ usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id) for (i = 0; i < N_CONN_TYPES; i++) LASSERT (peer->up_conns[i] == NULL); - list_del(&peer->up_list); - + list_del(&peer->up_list); + if (peer->up_errored && (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0) lnet_notify (peer->up_ni, peer->up_peerid.nid, 0, cfs_time_seconds(peer->up_last_alive)); - + usocklnd_peer_decref(peer); } @@ -208,18 +208,21 @@ usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id) /* Returns 0 on success, <0 else */ int -usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp) +usocklnd_create_passive_conn(lnet_ni_t *ni, + cfs_socket_t *sock, usock_conn_t **connp) { int rc; __u32 peer_ip; - __u16 peer_port; + int peer_port; usock_conn_t *conn; - rc = libcfs_getpeername(fd, &peer_ip, &peer_port); + rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port); if (rc) return rc; - rc = usocklnd_set_sock_options(fd); + LASSERT (peer_port >= 0); /* uc_peer_port is u16 */ + + rc = usocklnd_set_sock_options(sock); if (rc) return rc; @@ -228,8 +231,8 @@ usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp) return -ENOMEM; usocklnd_rx_hellomagic_state_transition(conn); - - conn->uc_fd = fd; + + conn->uc_sock = sock; conn->uc_peer_ip = peer_ip; conn->uc_peer_port = peer_port; conn->uc_state = UC_RECEIVING_HELLO; @@ -250,11 +253,11 @@ usocklnd_create_active_conn(usock_peer_t *peer, int type, usock_conn_t **connp) { int rc; - int fd; + cfs_socket_t *sock; usock_conn_t *conn; __u32 dst_ip = LNET_NIDADDR(peer->up_peerid.nid); __u16 dst_port = lnet_acceptor_port(); - + conn = usocklnd_conn_allocate(); if (conn == NULL) return -ENOMEM; @@ -264,32 +267,33 @@ usocklnd_create_active_conn(usock_peer_t *peer, int type, if (conn->uc_tx_hello == NULL) { usocklnd_conn_free(conn); return -ENOMEM; - } - + } + if (the_lnet.ln_pid & LNET_PID_USERFLAG) - rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port); + rc = usocklnd_connect_cli_mode(&sock, dst_ip, dst_port); else - rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port); - + rc = usocklnd_connect_srv_mode(&sock, dst_ip, dst_port); + if (rc) { usocklnd_destroy_tx(NULL, conn->uc_tx_hello); usocklnd_conn_free(conn); return rc; } - + conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout); - conn->uc_tx_flag = 1; - - conn->uc_fd = fd; - conn->uc_peer_ip = dst_ip; - conn->uc_peer_port = dst_port; - conn->uc_type = type; + conn->uc_tx_flag = 1; + + conn->uc_sock = sock; + conn->uc_peer_ip = dst_ip; + conn->uc_peer_port = dst_port; + conn->uc_type = type; conn->uc_activeflag = 1; - conn->uc_state = UC_CONNECTING; - conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip); - conn->uc_ni = NULL; - conn->uc_peerid = peer->up_peerid; - conn->uc_peer = peer; + conn->uc_state = UC_CONNECTING; + conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip); + conn->uc_ni = NULL; + conn->uc_peerid = peer->up_peerid; + conn->uc_peer = peer; + usocklnd_peer_addref(peer); CFS_INIT_LIST_HEAD (&conn->uc_tx_list); CFS_INIT_LIST_HEAD (&conn->uc_zcack_list); @@ -302,45 +306,42 @@ usocklnd_create_active_conn(usock_peer_t *peer, int type, /* Returns 0 on success, <0 else */ int -usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port) +usocklnd_connect_srv_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port) { - __u16 port; - int fd; - int rc; + __u16 port; + cfs_socket_t *sock; + int rc; + int fatal; - for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; - port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; + for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT; + port >= LNET_ACCEPTOR_MIN_RESERVED_PORT; port--) { /* Iterate through reserved ports. */ - - rc = libcfs_sock_create(&fd); - if (rc) - return rc; - - rc = libcfs_sock_bind_to_port(fd, port); + rc = libcfs_sock_create(&sock, &fatal, 0, port); if (rc) { - close(fd); + if (fatal) + return rc; continue; } - rc = usocklnd_set_sock_options(fd); + rc = usocklnd_set_sock_options(sock); if (rc) { - close(fd); + libcfs_sock_release(sock); return rc; } - rc = libcfs_sock_connect(fd, dst_ip, dst_port); + rc = libcfs_sock_connect(sock, dst_ip, dst_port); if (rc == 0) { - *fdp = fd; + *sockp = sock; return 0; } - + if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) { - close(fd); + libcfs_sock_release(sock); return rc; } - close(fd); + libcfs_sock_release(sock); } CERROR("Can't bind to any reserved port\n"); @@ -349,54 +350,55 @@ usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port) /* Returns 0 on success, <0 else */ int -usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port) +usocklnd_connect_cli_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port) { - int fd; - int rc; + cfs_socket_t *sock; + int rc; + int fatal; - rc = libcfs_sock_create(&fd); + rc = libcfs_sock_create(&sock, &fatal, 0, 0); if (rc) return rc; - - rc = usocklnd_set_sock_options(fd); + + rc = usocklnd_set_sock_options(sock); if (rc) { - close(fd); + libcfs_sock_release(sock); return rc; } - rc = libcfs_sock_connect(fd, dst_ip, dst_port); + rc = libcfs_sock_connect(sock, dst_ip, dst_port); if (rc) { - close(fd); + libcfs_sock_release(sock); return rc; } - *fdp = fd; + *sockp = sock; return 0; } int -usocklnd_set_sock_options(int fd) +usocklnd_set_sock_options(cfs_socket_t *sock) { int rc; - rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle); + rc = libcfs_sock_set_nagle(sock, usock_tuns.ut_socknagle); if (rc) return rc; if (usock_tuns.ut_sockbufsiz) { - rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz); + rc = libcfs_sock_set_bufsiz(sock, usock_tuns.ut_sockbufsiz); if (rc) - return rc; + return rc; } - - return libcfs_fcntl_nonblock(fd); + + return libcfs_fcntl_nonblock(sock); } usock_tx_t * usocklnd_create_noop_tx(__u64 cookie) { usock_tx_t *tx; - + LIBCFS_ALLOC (tx, sizeof(usock_tx_t)); if (tx == NULL) return NULL; @@ -406,22 +408,22 @@ usocklnd_create_noop_tx(__u64 cookie) socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP); tx->tx_msg.ksm_zc_cookies[1] = cookie; - + tx->tx_iova[0].iov_base = (void *)&tx->tx_msg; tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr); tx->tx_iov = tx->tx_iova; tx->tx_niov = 1; - + return tx; } - + usock_tx_t * usocklnd_create_tx(lnet_msg_t *lntmsg) { usock_tx_t *tx; - unsigned int payload_niov = lntmsg->msg_niov; - struct iovec *payload_iov = lntmsg->msg_iov; + unsigned int payload_niov = lntmsg->msg_niov; + struct iovec *payload_iov = lntmsg->msg_iov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; int size = offsetof(usock_tx_t, @@ -435,14 +437,14 @@ usocklnd_create_tx(lnet_msg_t *lntmsg) tx->tx_lnetmsg = lntmsg; tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob; - + socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET); tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr; tx->tx_iova[0].iov_base = (void *)&tx->tx_msg; tx->tx_iova[0].iov_len = sizeof(ksock_msg_t); tx->tx_iov = tx->tx_iova; - tx->tx_niov = 1 + + tx->tx_niov = 1 + lnet_extract_iov(payload_niov, &tx->tx_iov[1], payload_niov, payload_iov, payload_offset, payload_nob); @@ -460,7 +462,7 @@ usocklnd_init_hello_msg(ksock_hello_msg_t *hello, hello->kshm_version = KSOCK_PROTO_V2; hello->kshm_nips = 0; hello->kshm_ctype = type; - + hello->kshm_dst_incarnation = 0; /* not used */ hello->kshm_src_incarnation = net->un_incarnation; @@ -488,7 +490,7 @@ usocklnd_create_hello_tx(lnet_ni_t *ni, hello = (ksock_hello_msg_t *)&tx->tx_iova[1]; usocklnd_init_hello_msg(hello, ni, type, peer_nid); - + tx->tx_iova[0].iov_base = (void *)hello; tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob = offsetof(ksock_hello_msg_t, kshm_ips); @@ -522,10 +524,10 @@ usocklnd_create_cr_hello_tx(lnet_ni_t *ni, cr->acr_magic = LNET_PROTO_ACCEPTOR_MAGIC; cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION; cr->acr_nid = peer_nid; - + hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr)); usocklnd_init_hello_msg(hello, ni, type, peer_nid); - + tx->tx_iova[0].iov_base = (void *)cr; tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob = sizeof(lnet_acceptor_connreq_t) + @@ -545,7 +547,7 @@ usocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx) LASSERT (ni != NULL || lnetmsg == NULL); LIBCFS_FREE (tx, tx->tx_size); - + if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */ lnet_finalize(ni, lnetmsg, rc); } @@ -558,7 +560,7 @@ usocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist) while (!list_empty(txlist)) { tx = list_entry(txlist->next, usock_tx_t, tx_list); list_del(&tx->tx_list); - + usocklnd_destroy_tx(ni, tx); } } @@ -571,7 +573,7 @@ usocklnd_destroy_zcack_list(struct list_head *zcack_list) while (!list_empty(zcack_list)) { zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list); list_del(&zcack->zc_list); - + LIBCFS_FREE (zcack, sizeof(*zcack)); } } @@ -588,7 +590,7 @@ usocklnd_destroy_peer(usock_peer_t *peer) LIBCFS_FREE (peer, sizeof (*peer)); pthread_mutex_lock(&net->un_lock); - if(--net->un_peercount == 0) + if(--net->un_peercount == 0) pthread_cond_signal(&net->un_cond); pthread_mutex_unlock(&net->un_lock); } @@ -604,12 +606,12 @@ usocklnd_destroy_conn(usock_conn_t *conn) } if (!list_empty(&conn->uc_tx_list)) { - LASSERT (conn->uc_peer != NULL); + LASSERT (conn->uc_peer != NULL); usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list); } usocklnd_destroy_zcack_list(&conn->uc_zcack_list); - + if (conn->uc_peer != NULL) usocklnd_peer_decref(conn->uc_peer); @@ -631,7 +633,7 @@ usocklnd_get_conn_type(lnet_msg_t *lntmsg) return SOCKLND_CONN_ANY; nob = sizeof(ksock_msg_t) + lntmsg->msg_len; - + if (nob >= usock_tuns.ut_min_bulk) return SOCKLND_CONN_BULK_OUT; else @@ -698,10 +700,10 @@ usocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id, peer->up_errored = 0; peer->up_last_alive = 0; cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */ - pthread_mutex_init(&peer->up_lock, NULL); + pthread_mutex_init(&peer->up_lock, NULL); pthread_mutex_lock(&net->un_lock); - net->un_peercount++; + net->un_peercount++; pthread_mutex_unlock(&net->un_lock); *peerp = peer; @@ -729,7 +731,7 @@ usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id, rc = usocklnd_create_peer(ni, id, &peer); if (rc) return rc; - + pthread_rwlock_wrlock(&usock_data.ud_peers_lock); peer2 = usocklnd_find_peer_locked(ni, id); if (peer2 == NULL) { @@ -739,7 +741,7 @@ usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id, CERROR("Can't create peer: network shutdown\n"); return -ESHUTDOWN; } - + /* peer table will take 1 of my refs on peer */ usocklnd_peer_addref(peer); list_add_tail (&peer->up_list, @@ -749,8 +751,8 @@ usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id, peer = peer2; } pthread_rwlock_unlock(&usock_data.ud_peers_lock); - - find_or_create_peer_done: + + find_or_create_peer_done: *peerp = peer; return 0; } @@ -758,7 +760,7 @@ usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id, /* NB: both peer and conn locks are held */ static int usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack) -{ +{ if (conn->uc_state == UC_READY && list_empty(&conn->uc_tx_list) && list_empty(&conn->uc_zcack_list) && @@ -767,7 +769,7 @@ usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack) POLLOUT); if (rc != 0) return rc; - } + } list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list); return 0; @@ -779,7 +781,7 @@ usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack) static void usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx, int *send_immediately) -{ +{ if (conn->uc_state == UC_READY && list_empty(&conn->uc_tx_list) && list_empty(&conn->uc_zcack_list) && @@ -787,7 +789,7 @@ usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx, conn->uc_sending = 1; *send_immediately = 1; return; - } + } *send_immediately = 0; list_add_tail(&tx->tx_list, &conn->uc_tx_list); @@ -805,12 +807,12 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type, int idx; int rc; lnet_pid_t userflag = peer->up_peerid.pid & LNET_PID_USERFLAG; - + if (userflag) type = SOCKLND_CONN_ANY; idx = usocklnd_type2idx(type); - + pthread_mutex_lock(&peer->up_lock); if (peer->up_conns[idx] != NULL) { conn = peer->up_conns[idx]; @@ -823,7 +825,7 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type, rc = -EHOSTUNREACH; goto find_or_create_conn_failed; } - + rc = usocklnd_create_active_conn(peer, type, &conn); if (rc) { peer->up_errored = 1; @@ -833,7 +835,7 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type, /* peer takes 1 of conn refcount */ usocklnd_link_conn_to_peer(conn, peer, idx); - + rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT); if (rc) { peer->up_conns[idx] = NULL; @@ -842,7 +844,7 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type, } usocklnd_wakeup_pollthread(conn->uc_pt_idx); } - + pthread_mutex_lock(&conn->uc_lock); LASSERT(conn->uc_peer == peer); @@ -850,14 +852,14 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type, if (tx != NULL) { usocklnd_enqueue_tx(conn, tx, send_immediately); } else { - rc = usocklnd_enqueue_zcack(conn, zc_ack); + rc = usocklnd_enqueue_zcack(conn, zc_ack); if (rc != 0) { usocklnd_conn_kill_locked(conn); pthread_mutex_unlock(&conn->uc_lock); goto find_or_create_conn_failed; } } - pthread_mutex_unlock(&conn->uc_lock); + pthread_mutex_unlock(&conn->uc_lock); usocklnd_conn_addref(conn); pthread_mutex_unlock(&peer->up_lock); @@ -873,7 +875,7 @@ usocklnd_find_or_create_conn(usock_peer_t *peer, int type, void usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx) { - peer->up_conns[idx] = conn; + peer->up_conns[idx] = conn; peer->up_errored = 0; /* this new fresh conn will try * revitalize even stale errored peer */ } @@ -910,7 +912,7 @@ usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn, usock_conn_t *skip_conn) { int i; - + if (!peer->up_incrn_is_set) { peer->up_incarnation = incrn; peer->up_incrn_is_set = 1; @@ -921,19 +923,19 @@ usocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn, return; peer->up_incarnation = incrn; - + for (i = 0; i < N_CONN_TYPES; i++) { usock_conn_t *conn = peer->up_conns[i]; - + if (conn == NULL || conn == skip_conn) continue; - pthread_mutex_lock(&conn->uc_lock); + pthread_mutex_lock(&conn->uc_lock); LASSERT (conn->uc_peer == peer); conn->uc_peer = NULL; peer->up_conns[i] = NULL; if (conn->uc_state != UC_DEAD) - usocklnd_conn_kill_locked(conn); + usocklnd_conn_kill_locked(conn); pthread_mutex_unlock(&conn->uc_lock); usocklnd_conn_decref(conn); @@ -978,7 +980,7 @@ usocklnd_rx_helloversion_state_transition(usock_conn_t *conn) conn->uc_rx_nob_wanted = conn->uc_rx_nob_left = sizeof(conn->uc_rx_hello->kshm_version); - + conn->uc_rx_state = UC_RX_HELLO_VERSION; } @@ -998,7 +1000,7 @@ usocklnd_rx_hellobody_state_transition(usock_conn_t *conn) conn->uc_rx_nob_left = offsetof(ksock_hello_msg_t, kshm_ips) - offsetof(ksock_hello_msg_t, kshm_src_nid); - + conn->uc_rx_state = UC_RX_HELLO_BODY; } @@ -1018,7 +1020,7 @@ usocklnd_rx_helloIPs_state_transition(usock_conn_t *conn) conn->uc_rx_nob_left = conn->uc_rx_hello->kshm_nips * sizeof(conn->uc_rx_hello->kshm_ips[0]); - + conn->uc_rx_state = UC_RX_HELLO_IPS; } @@ -1030,12 +1032,12 @@ usocklnd_rx_lnethdr_state_transition(usock_conn_t *conn) { conn->uc_rx_niov = 1; conn->uc_rx_iov = conn->uc_rx_iova; - conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg; + conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg; conn->uc_rx_iov[0].iov_len = conn->uc_rx_nob_wanted = conn->uc_rx_nob_left = sizeof(ksock_lnet_msg_t); - + conn->uc_rx_state = UC_RX_LNET_HEADER; conn->uc_rx_flag = 1; } @@ -1048,12 +1050,12 @@ usocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn) { conn->uc_rx_niov = 1; conn->uc_rx_iov = conn->uc_rx_iova; - conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg; + conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg; conn->uc_rx_iov[0].iov_len = conn->uc_rx_nob_wanted = - conn->uc_rx_nob_left = + conn->uc_rx_nob_left = offsetof(ksock_msg_t, ksm_u); - + conn->uc_rx_state = UC_RX_KSM_HEADER; conn->uc_rx_flag = 0; } @@ -1070,7 +1072,7 @@ usocklnd_rx_skipping_state_transition(usock_conn_t *conn) unsigned int niov = 0; int skipped = 0; int nob_to_skip = conn->uc_rx_nob_left; - + LASSERT(nob_to_skip != 0); conn->uc_rx_iov = conn->uc_rx_iova; diff --git a/lnet/ulnds/socklnd/handlers.c b/lnet/ulnds/socklnd/handlers.c index 32ef7d1..e9eda46 100644 --- a/lnet/ulnds/socklnd/handlers.c +++ b/lnet/ulnds/socklnd/handlers.c @@ -57,8 +57,8 @@ usocklnd_exception_handler(usock_conn_t *conn) if (conn->uc_state == UC_CONNECTING || conn->uc_state == UC_SENDING_HELLO) usocklnd_conn_kill_locked(conn); - - pthread_mutex_unlock(&conn->uc_lock); + + pthread_mutex_unlock(&conn->uc_lock); } int @@ -72,7 +72,7 @@ usocklnd_read_handler(usock_conn_t *conn) rc = 0; pthread_mutex_lock(&conn->uc_lock); state = conn->uc_state; - + /* process special case: LNET calls lnd_recv() asyncronously */ if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) { /* still don't have usocklnd_recv() called */ @@ -93,7 +93,7 @@ usocklnd_read_handler(usock_conn_t *conn) * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */ switch (state) { - + case UC_RECEIVING_HELLO: case UC_READY: if (conn->uc_rx_nob_wanted != 0) { @@ -117,7 +117,7 @@ usocklnd_read_handler(usock_conn_t *conn) usocklnd_conn_kill(conn); break; } - + if (continue_reading) goto read_again; @@ -129,7 +129,7 @@ usocklnd_read_handler(usock_conn_t *conn) default: LBUG(); } - + return rc; } @@ -144,7 +144,7 @@ usocklnd_read_msg(usock_conn_t *conn, int *cont_flag) { int rc = 0; __u64 cookie; - + *cont_flag = 0; /* smth. new emerged in RX part - let's process it */ @@ -155,7 +155,7 @@ usocklnd_read_msg(usock_conn_t *conn, int *cont_flag) __swab32s(&conn->uc_rx_msg.ksm_csum); __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[0]); __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[1]); - } + } /* we never send packets for wich zc-acking is required */ if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET || @@ -170,7 +170,7 @@ usocklnd_read_msg(usock_conn_t *conn, int *cont_flag) usocklnd_rx_lnethdr_state_transition(conn); *cont_flag = 1; break; - + case UC_RX_LNET_HEADER: if (the_lnet.ln_pid & LNET_PID_USERFLAG) { /* replace dest_nid,pid (ksocknal sets its own) */ @@ -178,24 +178,24 @@ usocklnd_read_msg(usock_conn_t *conn, int *cont_flag) cpu_to_le64(conn->uc_peer->up_ni->ni_nid); conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid = cpu_to_le32(the_lnet.ln_pid); - - } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) { + + } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) { /* Userspace peer */ lnet_process_id_t *id = &conn->uc_peer->up_peerid; lnet_hdr_t *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr; - + /* Substitute process ID assigned at connection time */ lhdr->src_pid = cpu_to_le32(id->pid); lhdr->src_nid = cpu_to_le64(id->nid); } - + conn->uc_rx_state = UC_RX_PARSE; usocklnd_conn_addref(conn); /* ++ref while parsing */ - - rc = lnet_parse(conn->uc_peer->up_ni, - &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr, + + rc = lnet_parse(conn->uc_peer->up_ni, + &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr, conn->uc_peerid.nid, conn, 0); - + if (rc < 0) { /* I just received garbage: give up on this conn */ conn->uc_errored = 1; @@ -207,24 +207,24 @@ usocklnd_read_msg(usock_conn_t *conn, int *cont_flag) pthread_mutex_lock(&conn->uc_lock); LASSERT (conn->uc_rx_state == UC_RX_PARSE || conn->uc_rx_state == UC_RX_LNET_PAYLOAD); - + /* check whether usocklnd_recv() got called */ if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) *cont_flag = 1; pthread_mutex_unlock(&conn->uc_lock); break; - + case UC_RX_PARSE: LBUG(); /* it's error to be here, because this special * case is handled by caller */ break; - + case UC_RX_PARSE_WAIT: LBUG(); /* it's error to be here, because the conn * shouldn't wait for POLLIN event in this * state */ break; - + case UC_RX_LNET_PAYLOAD: /* payload all received */ @@ -233,15 +233,15 @@ usocklnd_read_msg(usock_conn_t *conn, int *cont_flag) cookie = conn->uc_rx_msg.ksm_zc_cookies[0]; if (cookie != 0) rc = usocklnd_handle_zc_req(conn->uc_peer, cookie); - + if (rc != 0) { /* change state not to finalize twice */ conn->uc_rx_state = UC_RX_KSM_HEADER; return -EPROTO; } - + /* Fall through */ - + case UC_RX_SKIPPING: if (conn->uc_rx_nob_left != 0) { usocklnd_rx_skipping_state_transition(conn); @@ -276,13 +276,13 @@ usocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie) if (zc_ack == NULL) return -ENOMEM; zc_ack->zc_cookie = cookie; - + /* Let's assume that CONTROL is the best type for zcack, * but userspace clients don't use typed connections */ if (the_lnet.ln_pid & LNET_PID_USERFLAG) type = SOCKLND_CONN_ANY; else - type = SOCKLND_CONN_CONTROL; + type = SOCKLND_CONN_CONTROL; rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack, &dummy); @@ -304,9 +304,9 @@ usocklnd_read_hello(usock_conn_t *conn, int *cont_flag) { int rc = 0; ksock_hello_msg_t *hello = conn->uc_rx_hello; - + *cont_flag = 0; - + /* smth. new emerged in hello - let's process it */ switch (conn->uc_rx_state) { case UC_RX_HELLO_MAGIC: @@ -319,7 +319,7 @@ usocklnd_read_hello(usock_conn_t *conn, int *cont_flag) usocklnd_rx_helloversion_state_transition(conn); *cont_flag = 1; - break; + break; case UC_RX_HELLO_VERSION: if ((!conn->uc_flip && @@ -331,7 +331,7 @@ usocklnd_read_hello(usock_conn_t *conn, int *cont_flag) usocklnd_rx_hellobody_state_transition(conn); *cont_flag = 1; break; - + case UC_RX_HELLO_BODY: if (conn->uc_flip) { ksock_hello_msg_t *hello = conn->uc_rx_hello; @@ -351,8 +351,8 @@ usocklnd_read_hello(usock_conn_t *conn, int *cont_flag) HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port); return -EPROTO; } - - if (conn->uc_rx_hello->kshm_nips) { + + if (conn->uc_rx_hello->kshm_nips) { usocklnd_rx_helloIPs_state_transition(conn); *cont_flag = 1; break; @@ -364,8 +364,8 @@ usocklnd_read_hello(usock_conn_t *conn, int *cont_flag) rc = usocklnd_activeconn_hellorecv(conn); else /* passive conn */ rc = usocklnd_passiveconn_hellorecv(conn); - - break; + + break; default: LBUG(); /* unknown state */ @@ -398,7 +398,7 @@ usocklnd_activeconn_hellorecv(usock_conn_t *conn) } peer->up_last_alive = cfs_time_current(); - + /* peer says that we lost the race */ if (hello->kshm_ctype == SOCKLND_CONN_NONE) { /* Start new active conn, relink txs and zc_acks from @@ -406,7 +406,7 @@ usocklnd_activeconn_hellorecv(usock_conn_t *conn) * Actually, we're expecting that a passive conn will * make us zombie soon and take care of our txs and * zc_acks */ - + struct list_head tx_list, zcack_list; usock_conn_t *conn2; int idx = usocklnd_type2idx(conn->uc_type); @@ -425,7 +425,7 @@ usocklnd_activeconn_hellorecv(usock_conn_t *conn) pthread_mutex_unlock(&peer->up_lock); return 0; } - + LASSERT (peer == conn->uc_peer); LASSERT (peer->up_conns[idx] == conn); @@ -436,22 +436,22 @@ usocklnd_activeconn_hellorecv(usock_conn_t *conn) pthread_mutex_unlock(&peer->up_lock); return rc; } - + usocklnd_link_conn_to_peer(conn2, peer, idx); conn2->uc_peer = peer; - + /* unlink txs and zcack from the conn */ list_add(&tx_list, &conn->uc_tx_list); list_del_init(&conn->uc_tx_list); list_add(&zcack_list, &conn->uc_zcack_list); list_del_init(&conn->uc_zcack_list); - + /* link they to the conn2 */ list_add(&conn2->uc_tx_list, &tx_list); list_del_init(&tx_list); list_add(&conn2->uc_zcack_list, &zcack_list); list_del_init(&zcack_list); - + /* make conn zombie */ conn->uc_peer = NULL; usocklnd_peer_decref(peer); @@ -464,11 +464,11 @@ usocklnd_activeconn_hellorecv(usock_conn_t *conn) } else { usocklnd_conn_kill_locked(conn); } - - pthread_mutex_unlock(&conn->uc_lock); + + pthread_mutex_unlock(&conn->uc_lock); pthread_mutex_unlock(&peer->up_lock); usocklnd_conn_decref(conn); - + } else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */ if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype)) return -EPROTO; @@ -477,7 +477,7 @@ usocklnd_activeconn_hellorecv(usock_conn_t *conn) usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, conn); pthread_mutex_unlock(&peer->up_lock); - + /* safely transit to UC_READY state */ /* rc == 0 */ pthread_mutex_lock(&conn->uc_lock); @@ -490,7 +490,7 @@ usocklnd_activeconn_hellorecv(usock_conn_t *conn) LASSERT (conn->uc_sending == 0); if ( !list_empty(&conn->uc_tx_list) || !list_empty(&conn->uc_zcack_list) ) { - + conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout); conn->uc_tx_flag = 1; @@ -502,7 +502,7 @@ usocklnd_activeconn_hellorecv(usock_conn_t *conn) if (rc == 0) conn->uc_state = UC_READY; } - pthread_mutex_unlock(&conn->uc_lock); + pthread_mutex_unlock(&conn->uc_lock); } return rc; @@ -532,7 +532,7 @@ usocklnd_passiveconn_hellorecv(usock_conn_t *conn) /* don't know parent peer yet and not zombie */ LASSERT (conn->uc_peer == NULL && ni != NULL); - + /* don't know peer's nid and incarnation yet */ if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) { /* do not trust liblustre clients */ @@ -541,7 +541,7 @@ usocklnd_passiveconn_hellorecv(usock_conn_t *conn) peer_ip); if (hello->kshm_ctype != SOCKLND_CONN_ANY) { lnet_ni_decref(ni); - conn->uc_ni = NULL; + conn->uc_ni = NULL; CERROR("Refusing to accept connection of type=%d from " "userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype, HIPQUAD(peer_ip), peer_port); @@ -552,7 +552,7 @@ usocklnd_passiveconn_hellorecv(usock_conn_t *conn) conn->uc_peerid.nid = hello->kshm_src_nid; } conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype); - + rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer); if (rc) { lnet_ni_decref(ni); @@ -561,14 +561,14 @@ usocklnd_passiveconn_hellorecv(usock_conn_t *conn) } peer->up_last_alive = cfs_time_current(); - + idx = usocklnd_type2idx(conn->uc_type); /* safely check whether we're first */ pthread_mutex_lock(&peer->up_lock); usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL); - + if (peer->up_conns[idx] == NULL) { peer->up_last_alive = cfs_time_current(); conn->uc_peer = peer; @@ -576,7 +576,7 @@ usocklnd_passiveconn_hellorecv(usock_conn_t *conn) usocklnd_link_conn_to_peer(conn, peer, idx); usocklnd_conn_addref(conn); } else { - usocklnd_peer_decref(peer); + usocklnd_peer_decref(peer); /* Resolve race in favour of higher NID */ if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) { @@ -588,7 +588,7 @@ usocklnd_passiveconn_hellorecv(usock_conn_t *conn) /* if conn->uc_peerid.nid > conn->uc_ni->ni_nid, * postpone race resolution till READY state * (hopefully that conn[idx] will die because of - * incoming hello of CONN_NONE type) */ + * incoming hello of CONN_NONE type) */ } pthread_mutex_unlock(&peer->up_lock); @@ -610,10 +610,10 @@ usocklnd_passiveconn_hellorecv(usock_conn_t *conn) conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout); conn->uc_tx_flag = 1; rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT); - + passive_hellorecv_done: pthread_mutex_unlock(&conn->uc_lock); - return rc; + return rc; } int @@ -625,7 +625,7 @@ usocklnd_write_handler(usock_conn_t *conn) int state; usock_peer_t *peer; lnet_ni_t *ni; - + pthread_mutex_lock(&conn->uc_lock); /* like membar */ state = conn->uc_state; pthread_mutex_unlock(&conn->uc_lock); @@ -650,8 +650,8 @@ usocklnd_write_handler(usock_conn_t *conn) rc = usocklnd_activeconn_hellosent(conn); else /* passive conn */ rc = usocklnd_passiveconn_hellosent(conn); - - break; + + break; case UC_READY: pthread_mutex_lock(&conn->uc_lock); @@ -665,7 +665,7 @@ usocklnd_write_handler(usock_conn_t *conn) LASSERT(usock_tuns.ut_fair_limit > 1); pthread_mutex_unlock(&conn->uc_lock); return 0; - } + } tx = usocklnd_try_piggyback(&conn->uc_tx_list, &conn->uc_zcack_list); @@ -678,7 +678,7 @@ usocklnd_write_handler(usock_conn_t *conn) if (rc) break; - + rc = usocklnd_send_tx(conn, tx); if (rc == 0) { /* partial send or connection closed */ pthread_mutex_lock(&conn->uc_lock); @@ -707,7 +707,7 @@ usocklnd_write_handler(usock_conn_t *conn) rc = ret; } pthread_mutex_unlock(&conn->uc_lock); - + break; case UC_DEAD: @@ -719,7 +719,7 @@ usocklnd_write_handler(usock_conn_t *conn) if (rc < 0) usocklnd_conn_kill(conn); - + return rc; } @@ -746,7 +746,7 @@ usocklnd_try_piggyback(struct list_head *tx_list_p, tx->tx_resid != tx->tx_nob) return tx; } - + if (list_empty(zcack_list_p)) { /* nothing to piggyback */ return tx; @@ -754,15 +754,15 @@ usocklnd_try_piggyback(struct list_head *tx_list_p, zc_ack = list_entry(zcack_list_p->next, usock_zc_ack_t, zc_list); list_del(&zc_ack->zc_list); - } - + } + if (tx != NULL) /* piggyback the zc-ack cookie */ tx->tx_msg.ksm_zc_cookies[1] = zc_ack->zc_cookie; else /* cannot piggyback, need noop */ - tx = usocklnd_create_noop_tx(zc_ack->zc_cookie); - + tx = usocklnd_create_noop_tx(zc_ack->zc_cookie); + LIBCFS_FREE (zc_ack, sizeof(*zc_ack)); return tx; } @@ -775,7 +775,7 @@ int usocklnd_activeconn_hellosent(usock_conn_t *conn) { int rc = 0; - + pthread_mutex_lock(&conn->uc_lock); if (conn->uc_state != UC_DEAD) { @@ -817,15 +817,15 @@ usocklnd_passiveconn_hellosent(usock_conn_t *conn) /* conn->uc_peer == NULL, so the conn isn't accessible via * peer hash list, so nobody can touch the conn but us */ - + if (conn->uc_ni == NULL) /* remove zombie conn */ goto passive_hellosent_connkill; - + /* all code below is race resolution, because normally * passive conn is linked to peer just after receiving hello */ CFS_INIT_LIST_HEAD (&tx_list); CFS_INIT_LIST_HEAD (&zcack_list); - + /* conn is passive and isn't linked to any peer, so its tx and zc_ack lists have to be empty */ LASSERT (list_empty(&conn->uc_tx_list) && @@ -836,10 +836,10 @@ usocklnd_passiveconn_hellosent(usock_conn_t *conn) if (rc) return rc; - idx = usocklnd_type2idx(conn->uc_type); + idx = usocklnd_type2idx(conn->uc_type); /* try to link conn to peer */ - pthread_mutex_lock(&peer->up_lock); + pthread_mutex_lock(&peer->up_lock); if (peer->up_conns[idx] == NULL) { usocklnd_link_conn_to_peer(conn, peer, idx); usocklnd_conn_addref(conn); @@ -866,7 +866,7 @@ usocklnd_passiveconn_hellosent(usock_conn_t *conn) list_del_init(&conn2->uc_tx_list); list_add(&zcack_list, &conn2->uc_zcack_list); list_del_init(&conn2->uc_zcack_list); - + pthread_mutex_lock(&conn->uc_lock); list_add_tail(&conn->uc_tx_list, &tx_list); list_del_init(&tx_list); @@ -874,7 +874,7 @@ usocklnd_passiveconn_hellosent(usock_conn_t *conn) list_del_init(&zcack_list); conn->uc_peer = peer; pthread_mutex_unlock(&conn->uc_lock); - + conn2->uc_peer = NULL; /* make conn2 zombie */ pthread_mutex_unlock(&conn2->uc_lock); usocklnd_conn_decref(conn2); @@ -889,7 +889,7 @@ usocklnd_passiveconn_hellosent(usock_conn_t *conn) pthread_mutex_unlock(&peer->up_lock); usocklnd_peer_decref(peer); - passive_hellosent_done: + passive_hellosent_done: /* safely transit to UC_READY state */ /* rc == 0 */ pthread_mutex_lock(&conn->uc_lock); @@ -935,23 +935,23 @@ usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx) { struct iovec *iov; int nob; - int fd = conn->uc_fd; cfs_time_t t; - + LASSERT (tx->tx_resid != 0); do { usock_peer_t *peer = conn->uc_peer; LASSERT (tx->tx_niov > 0); - - nob = libcfs_sock_writev(fd, tx->tx_iov, tx->tx_niov); + + nob = libcfs_sock_writev(conn->uc_sock, + tx->tx_iov, tx->tx_niov); if (nob < 0) conn->uc_errored = 1; if (nob <= 0) /* write queue is flow-controlled or error */ return nob; - - LASSERT (nob <= tx->tx_resid); + + LASSERT (nob <= tx->tx_resid); tx->tx_resid -= nob; t = cfs_time_current(); conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout)); @@ -959,22 +959,22 @@ usocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx) if(peer != NULL) peer->up_last_alive = t; - /* "consume" iov */ + /* "consume" iov */ iov = tx->tx_iov; - do { - LASSERT (tx->tx_niov > 0); - - if (nob < iov->iov_len) { + do { + LASSERT (tx->tx_niov > 0); + + if (nob < iov->iov_len) { iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); - iov->iov_len -= nob; - break; - } + iov->iov_len -= nob; + break; + } - nob -= iov->iov_len; - tx->tx_iov = ++iov; - tx->tx_niov--; + nob -= iov->iov_len; + tx->tx_iov = ++iov; + tx->tx_niov--; } while (nob != 0); - + } while (tx->tx_resid != 0); return 1; /* send complete */ @@ -994,16 +994,17 @@ usocklnd_read_data(usock_conn_t *conn) do { usock_peer_t *peer = conn->uc_peer; - + LASSERT (conn->uc_rx_niov > 0); - - nob = libcfs_sock_readv(conn->uc_fd, conn->uc_rx_iov, conn->uc_rx_niov); + + nob = libcfs_sock_readv(conn->uc_sock, + conn->uc_rx_iov, conn->uc_rx_niov); if (nob <= 0) {/* read nothing or error */ conn->uc_errored = 1; return nob; } - - LASSERT (nob <= conn->uc_rx_nob_wanted); + + LASSERT (nob <= conn->uc_rx_nob_wanted); conn->uc_rx_nob_wanted -= nob; conn->uc_rx_nob_left -= nob; t = cfs_time_current(); @@ -1011,23 +1012,23 @@ usocklnd_read_data(usock_conn_t *conn) if(peer != NULL) peer->up_last_alive = t; - - /* "consume" iov */ + + /* "consume" iov */ iov = conn->uc_rx_iov; - do { - LASSERT (conn->uc_rx_niov > 0); - - if (nob < iov->iov_len) { - iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); - iov->iov_len -= nob; - break; - } - - nob -= iov->iov_len; + do { + LASSERT (conn->uc_rx_niov > 0); + + if (nob < iov->iov_len) { + iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob); + iov->iov_len -= nob; + break; + } + + nob -= iov->iov_len; conn->uc_rx_iov = ++iov; - conn->uc_rx_niov--; + conn->uc_rx_niov--; } while (nob != 0); - + } while (conn->uc_rx_nob_wanted != 0); return 1; /* read complete */ diff --git a/lnet/ulnds/socklnd/poll.c b/lnet/ulnds/socklnd/poll.c index 3388d85..2f15f1b 100644 --- a/lnet/ulnds/socklnd/poll.c +++ b/lnet/ulnds/socklnd/poll.c @@ -46,12 +46,12 @@ void usocklnd_process_stale_list(usock_pollthread_t *pt_data) { while (!list_empty(&pt_data->upt_stale_list)) { - usock_conn_t *conn; + usock_conn_t *conn; conn = list_entry(pt_data->upt_stale_list.next, usock_conn_t, uc_stale_list); - + list_del(&conn->uc_stale_list); - + usocklnd_tear_peer_conn(conn); usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */ } @@ -76,14 +76,14 @@ usocklnd_poll_thread(void *arg) sigset_t sigs; sigfillset (&sigs); pthread_sigmask (SIG_SETMASK, &sigs, 0); - + LASSERT(pt_data != NULL); - + planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout); chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds); saved_nfds = pt_data->upt_nfds; idx_start = 1; - + /* Main loop */ while (usock_data.ud_shutdown == 0) { rc = 0; @@ -94,11 +94,11 @@ usocklnd_poll_thread(void *arg) usock_pollrequest_t *pr; pr = list_entry(pt_data->upt_pollrequests.next, usock_pollrequest_t, upr_list); - + list_del(&pr->upr_list); rc = usocklnd_process_pollrequest(pr, pt_data); if (rc) - break; + break; } pthread_mutex_unlock(&pt_data->upt_pollrequests_lock); @@ -107,7 +107,7 @@ usocklnd_poll_thread(void *arg) /* Delete conns orphaned due to POLL_DEL_REQUESTs */ usocklnd_process_stale_list(pt_data); - + /* Actual polling for events */ rc = poll(pt_data->upt_pollfd, pt_data->upt_nfds, @@ -135,7 +135,7 @@ usocklnd_poll_thread(void *arg) extra = 0; } - times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1; + times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1; idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds); for (idx = idx_start; idx < idx_finish; idx++) { @@ -149,7 +149,7 @@ usocklnd_poll_thread(void *arg) pthread_mutex_unlock(&conn->uc_lock); } - if (idx_finish == pt_data->upt_nfds) { + if (idx_finish == pt_data->upt_nfds) { chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds); saved_nfds = pt_data->upt_nfds; idx_start = 1; @@ -157,11 +157,11 @@ usocklnd_poll_thread(void *arg) else { idx_start = idx_finish; } - + planned_time = cfs_time_add(current_time, cfs_time_seconds(usock_tuns.ut_poll_timeout)); } - + /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */ LASSERT (rc != 0 || pt_data->upt_nfds == 1); @@ -170,37 +170,37 @@ usocklnd_poll_thread(void *arg) /* Block new poll requests to be enqueued */ pt_data->upt_errno = rc; - + while (!list_empty(&pt_data->upt_pollrequests)) { usock_pollrequest_t *pr; pr = list_entry(pt_data->upt_pollrequests.next, usock_pollrequest_t, upr_list); - + list_del(&pr->upr_list); if (pr->upr_type == POLL_ADD_REQUEST) { - close(pr->upr_conn->uc_fd); + libcfs_sock_release(pr->upr_conn->uc_sock); list_add_tail(&pr->upr_conn->uc_stale_list, &pt_data->upt_stale_list); } else { usocklnd_conn_decref(pr->upr_conn); } - + LIBCFS_FREE (pr, sizeof(*pr)); } pthread_mutex_unlock(&pt_data->upt_pollrequests_lock); usocklnd_process_stale_list(pt_data); - + for (idx = 1; idx < pt_data->upt_nfds; idx++) { usock_conn_t *conn = pt_data->upt_idx2conn[idx]; LASSERT(conn != NULL); - close(conn->uc_fd); + libcfs_sock_release(conn->uc_sock); usocklnd_tear_peer_conn(conn); usocklnd_conn_decref(conn); } } - + /* unblock usocklnd_shutdown() */ cfs_complete(&pt_data->upt_completion); @@ -226,7 +226,7 @@ usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value) pr->upr_value = value; usocklnd_conn_addref(conn); /* +1 for poll request */ - + pthread_mutex_lock(&pt->upt_pollrequests_lock); if (pt->upt_errno) { /* very rare case: errored poll thread */ @@ -236,7 +236,7 @@ usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value) LIBCFS_FREE(pr, sizeof(*pr)); return rc; } - + list_add_tail(&pr->upr_list, &pt->upt_pollrequests); pthread_mutex_unlock(&pt->upt_pollrequests_lock); return 0; @@ -248,7 +248,7 @@ usocklnd_add_killrequest(usock_conn_t *conn) int pt_idx = conn->uc_pt_idx; usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx]; usock_pollrequest_t *pr = conn->uc_preq; - + /* Use preallocated poll request because there is no good * workaround for ENOMEM error while killing connection */ if (pr) { @@ -257,9 +257,9 @@ usocklnd_add_killrequest(usock_conn_t *conn) pr->upr_value = 0; usocklnd_conn_addref(conn); /* +1 for poll request */ - + pthread_mutex_lock(&pt->upt_pollrequests_lock); - + if (pt->upt_errno) { /* very rare case: errored poll thread */ pthread_mutex_unlock(&pt->upt_pollrequests_lock); usocklnd_conn_decref(conn); @@ -287,16 +287,17 @@ usocklnd_process_pollrequest(usock_pollrequest_t *pr, int *fd2idx = pt_data->upt_fd2idx; usock_conn_t **idx2conn = pt_data->upt_idx2conn; int *skip = pt_data->upt_skip; - + LASSERT(conn != NULL); - LASSERT(conn->uc_fd >=0); + LASSERT(conn->uc_sock != NULL); LASSERT(type == POLL_ADD_REQUEST || - conn->uc_fd < pt_data->upt_nfd2idx); + LIBCFS_SOCK2FD(conn->uc_sock) < pt_data->upt_nfd2idx); if (type != POLL_ADD_REQUEST) { - idx = fd2idx[conn->uc_fd]; + idx = fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)]; if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */ - LASSERT(pollfd[idx].fd == conn->uc_fd); + LASSERT(pollfd[idx].fd == + LIBCFS_SOCK2FD(conn->uc_sock)); } else { /* unlikely */ CWARN("Very unlikely event happend: trying to" " handle poll request of type %d but idx=%d" @@ -312,7 +313,7 @@ usocklnd_process_pollrequest(usock_pollrequest_t *pr, } LIBCFS_FREE (pr, sizeof(*pr)); - + switch (type) { case POLL_ADD_REQUEST: if (pt_data->upt_nfds >= pt_data->upt_npollfd) { @@ -327,7 +328,7 @@ usocklnd_process_pollrequest(usock_pollrequest_t *pr, if (new_pollfd == NULL) goto process_pollrequest_enomem; pt_data->upt_pollfd = pollfd = new_pollfd; - + new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd * sizeof(usock_conn_t *)); if (new_idx2conn == NULL) @@ -339,16 +340,16 @@ usocklnd_process_pollrequest(usock_pollrequest_t *pr, if (new_skip == NULL) goto process_pollrequest_enomem; pt_data->upt_skip = new_skip; - + pt_data->upt_npollfd = new_npollfd; } - if (conn->uc_fd >= pt_data->upt_nfd2idx) { + if (LIBCFS_SOCK2FD(conn->uc_sock) >= pt_data->upt_nfd2idx) { /* resize fd2idx[] */ int *new_fd2idx; int new_nfd2idx = pt_data->upt_nfd2idx * 2; - while (new_nfd2idx <= conn->uc_fd) + while (new_nfd2idx <= LIBCFS_SOCK2FD(conn->uc_sock)) new_nfd2idx *= 2; new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx * @@ -363,29 +364,29 @@ usocklnd_process_pollrequest(usock_pollrequest_t *pr, pt_data->upt_nfd2idx = new_nfd2idx; } - LASSERT(fd2idx[conn->uc_fd] == 0); + LASSERT(fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] == 0); idx = pt_data->upt_nfds++; idx2conn[idx] = conn; - fd2idx[conn->uc_fd] = idx; + fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = idx; - pollfd[idx].fd = conn->uc_fd; + pollfd[idx].fd = LIBCFS_SOCK2FD(conn->uc_sock); pollfd[idx].events = value; pollfd[idx].revents = 0; break; case POLL_DEL_REQUEST: - fd2idx[conn->uc_fd] = 0; /* invalidate this entry */ - + fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = 0; /* invalidate this + * entry */ --pt_data->upt_nfds; if (idx != pt_data->upt_nfds) { /* shift last entry into released position */ memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds], sizeof(struct pollfd)); idx2conn[idx] = idx2conn[pt_data->upt_nfds]; - fd2idx[pollfd[idx].fd] = idx; + fd2idx[pollfd[idx].fd] = idx; } - close(conn->uc_fd); + libcfs_sock_release(conn->uc_sock); list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list); break; case POLL_RX_SET_REQUEST: @@ -398,14 +399,14 @@ usocklnd_process_pollrequest(usock_pollrequest_t *pr, pollfd[idx].events = value; break; default: - LBUG(); /* unknown type */ + LBUG(); /* unknown type */ } /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the * reference that poll request possesses */ if (type != POLL_ADD_REQUEST) usocklnd_conn_decref(conn); - + return 0; process_pollrequest_enomem: @@ -433,14 +434,14 @@ usocklnd_execute_handlers(usock_pollthread_t *pt_data) for (j = 0; j < usock_tuns.ut_fair_limit; j++) { int prev = 0; int i = skip[0]; - + if (i >= nfds) /* nothing ready */ break; - + do { usock_conn_t *conn = idx2conn[i]; int next; - + if (j == 0) /* first pass... */ next = skip[i] = i+1; /* set skip chain */ else /* later passes... */ @@ -456,20 +457,20 @@ usocklnd_execute_handlers(usock_pollthread_t *pt_data) else usocklnd_exception_handler(conn); } - + if ((pollfd[i].revents & POLLIN) != 0 && usocklnd_read_handler(conn) <= 0) pollfd[i].revents &= ~POLLIN; - + if ((pollfd[i].revents & POLLOUT) != 0 && usocklnd_write_handler(conn) <= 0) pollfd[i].revents &= ~POLLOUT; - + if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0) skip[prev] = next; /* skip this entry next pass */ else prev = i; - + i = next; } while (i < nfds); } @@ -481,14 +482,14 @@ usocklnd_calculate_chunk_size(int num) const int n = 4; const int p = usock_tuns.ut_poll_timeout; int chunk = num; - + /* chunk should be big enough to detect a timeout on any * connection within (n+1)/n times the timeout interval * if we checks every 'p' seconds 'chunk' conns */ - + if (usock_tuns.ut_timeout > n * p) chunk = (chunk * n * p) / usock_tuns.ut_timeout; - + if (chunk == 0) chunk = 1; @@ -502,8 +503,8 @@ usocklnd_wakeup_pollthread(int i) int notification = 0; int rc; - rc = syscall(SYS_write, pt->upt_notifier_fd, ¬ification, - sizeof(notification)); + rc = syscall(SYS_write, LIBCFS_SOCK2FD(pt->upt_notifier[0]), + ¬ification, sizeof(notification)); if (rc != sizeof(notification)) CERROR("Very unlikely event happend: " diff --git a/lnet/ulnds/socklnd/usocklnd.c b/lnet/ulnds/socklnd/usocklnd.c index fdab0ab..a280d3b 100644 --- a/lnet/ulnds/socklnd/usocklnd.c +++ b/lnet/ulnds/socklnd/usocklnd.c @@ -75,7 +75,7 @@ usocklnd_validate_tunables() usock_tuns.ut_timeout); return -1; } - + if (usock_tuns.ut_poll_timeout <= 0 || usock_tuns.ut_poll_timeout > MAX_REASONABLE_TIMEOUT) { CERROR("USOCK_POLL_TIMEOUT: %d is out of reasonable limits\n", @@ -88,7 +88,7 @@ usocklnd_validate_tunables() usock_tuns.ut_fair_limit); return -1; } - + if (usock_tuns.ut_npollthreads < 0 || usock_tuns.ut_npollthreads > MAX_REASONABLE_NPT) { CERROR("USOCK_NPOLLTHREADS: %d is out of reasonable limits\n", @@ -101,7 +101,7 @@ usocklnd_validate_tunables() usock_tuns.ut_txcredits); return -1; } - + if (usock_tuns.ut_peertxcredits <= 0) { CERROR("USOCK_PEERTXCREDITS: %d should be positive\n", usock_tuns.ut_peertxcredits); @@ -121,7 +121,7 @@ usocklnd_validate_tunables() usock_tuns.ut_socknagle); return -1; } - + if (usock_tuns.ut_sockbufsiz < 0) { CERROR("USOCK_SOCKBUFSIZ: %d should be 0 or positive\n", usock_tuns.ut_sockbufsiz); @@ -135,22 +135,22 @@ void usocklnd_release_poll_states(int n) { int i; - + for (i = 0; i < n; i++) { usock_pollthread_t *pt = &usock_data.ud_pollthreads[i]; - - close(pt->upt_notifier_fd); - close(pt->upt_pollfd[0].fd); + + libcfs_sock_release(pt->upt_notifier[0]); + libcfs_sock_release(pt->upt_notifier[1]); pthread_mutex_destroy(&pt->upt_pollrequests_lock); cfs_fini_completion(&pt->upt_completion); - + LIBCFS_FREE (pt->upt_pollfd, sizeof(struct pollfd) * pt->upt_npollfd); LIBCFS_FREE (pt->upt_idx2conn, sizeof(usock_conn_t *) * pt->upt_npollfd); LIBCFS_FREE (pt->upt_fd2idx, - sizeof(int) * pt->upt_nfd2idx); + sizeof(int) * pt->upt_nfd2idx); } } @@ -158,55 +158,55 @@ int usocklnd_update_tunables() { int rc; - - rc = cfs_parse_int_tunable(&usock_tuns.ut_timeout, + + rc = lnet_parse_int_tunable(&usock_tuns.ut_timeout, "USOCK_TIMEOUT"); if (rc) return rc; - rc = cfs_parse_int_tunable(&usock_tuns.ut_poll_timeout, + rc = lnet_parse_int_tunable(&usock_tuns.ut_poll_timeout, "USOCK_POLL_TIMEOUT"); if (rc) return rc; - rc = cfs_parse_int_tunable(&usock_tuns.ut_npollthreads, + rc = lnet_parse_int_tunable(&usock_tuns.ut_npollthreads, "USOCK_NPOLLTHREADS"); if (rc) return rc; - rc = cfs_parse_int_tunable(&usock_tuns.ut_fair_limit, + rc = lnet_parse_int_tunable(&usock_tuns.ut_fair_limit, "USOCK_FAIR_LIMIT"); if (rc) return rc; - rc = cfs_parse_int_tunable(&usock_tuns.ut_min_bulk, + rc = lnet_parse_int_tunable(&usock_tuns.ut_min_bulk, "USOCK_MIN_BULK"); if (rc) return rc; - rc = cfs_parse_int_tunable(&usock_tuns.ut_txcredits, + rc = lnet_parse_int_tunable(&usock_tuns.ut_txcredits, "USOCK_TXCREDITS"); if (rc) return rc; - rc = cfs_parse_int_tunable(&usock_tuns.ut_peertxcredits, + rc = lnet_parse_int_tunable(&usock_tuns.ut_peertxcredits, "USOCK_PEERTXCREDITS"); if (rc) return rc; - rc = cfs_parse_int_tunable(&usock_tuns.ut_socknagle, + rc = lnet_parse_int_tunable(&usock_tuns.ut_socknagle, "USOCK_SOCKNAGLE"); if (rc) return rc; - rc = cfs_parse_int_tunable(&usock_tuns.ut_sockbufsiz, + rc = lnet_parse_int_tunable(&usock_tuns.ut_sockbufsiz, "USOCK_SOCKBUFSIZ"); if (rc) return rc; if (usocklnd_validate_tunables()) return -EINVAL; - + if (usock_tuns.ut_npollthreads == 0) { usock_tuns.ut_npollthreads = cfs_online_cpus(); @@ -215,7 +215,7 @@ usocklnd_update_tunables() return -EINVAL; } } - + return 0; } @@ -226,11 +226,11 @@ usocklnd_base_startup() usock_pollthread_t *pt; int i; int rc; - + rc = usocklnd_update_tunables(); if (rc) return rc; - + usock_data.ud_npollthreads = usock_tuns.ut_npollthreads; LIBCFS_ALLOC (usock_data.ud_pollthreads, @@ -241,17 +241,16 @@ usocklnd_base_startup() /* Initialize poll thread state structures */ for (i = 0; i < usock_data.ud_npollthreads; i++) { - int notifier[2]; pt = &usock_data.ud_pollthreads[i]; rc = -ENOMEM; - + LIBCFS_ALLOC (pt->upt_pollfd, sizeof(struct pollfd) * UPT_START_SIZ); if (pt->upt_pollfd == NULL) goto base_startup_failed_0; - + LIBCFS_ALLOC (pt->upt_idx2conn, sizeof(usock_conn_t *) * UPT_START_SIZ); if (pt->upt_idx2conn == NULL) @@ -260,11 +259,11 @@ usocklnd_base_startup() LIBCFS_ALLOC (pt->upt_fd2idx, sizeof(int) * UPT_START_SIZ); if (pt->upt_fd2idx == NULL) - goto base_startup_failed_2; - + goto base_startup_failed_2; + memset(pt->upt_fd2idx, 0, - sizeof(int) * UPT_START_SIZ); - + sizeof(int) * UPT_START_SIZ); + LIBCFS_ALLOC (pt->upt_skip, sizeof(int) * UPT_START_SIZ); if (pt->upt_skip == NULL) @@ -272,13 +271,11 @@ usocklnd_base_startup() pt->upt_npollfd = pt->upt_nfd2idx = UPT_START_SIZ; - rc = libcfs_socketpair(notifier); + rc = libcfs_socketpair(pt->upt_notifier); if (rc != 0) goto base_startup_failed_4; - pt->upt_notifier_fd = notifier[0]; - - pt->upt_pollfd[0].fd = notifier[1]; + pt->upt_pollfd[0].fd = LIBCFS_SOCK2FD(pt->upt_notifier[1]); pt->upt_pollfd[0].events = POLLIN; pt->upt_pollfd[0].revents = 0; @@ -292,10 +289,10 @@ usocklnd_base_startup() cfs_init_completion(&pt->upt_completion); } - /* Initialize peer hash list */ + /* Initialize peer hash list */ for (i = 0; i < UD_PEER_HASH_SIZE; i++) CFS_INIT_LIST_HEAD(&usock_data.ud_peers[i]); - + pthread_rwlock_init(&usock_data.ud_peers_lock, NULL); /* Spawn poll threads */ @@ -307,9 +304,9 @@ usocklnd_base_startup() return rc; } } - + usock_data.ud_state = UD_STATE_INITIALIZED; - + return 0; base_startup_failed_4: @@ -333,7 +330,7 @@ void usocklnd_base_shutdown(int n) { int i; - + usock_data.ud_shutdown = 1; for (i = 0; i < n; i++) { usock_pollthread_t *pt = &usock_data.ud_pollthreads[i]; @@ -348,7 +345,7 @@ usocklnd_base_shutdown(int n) LIBCFS_FREE (usock_data.ud_pollthreads, usock_data.ud_npollthreads * sizeof(usock_pollthread_t)); - + usock_data.ud_state = UD_STATE_INIT_NOTHING; } @@ -367,18 +364,18 @@ usocklnd_assign_ni_nid(lnet_ni_t *ni) int rc; int up; __u32 ipaddr; - + /* Find correct IP-address and update ni_nid with it. * Two cases are supported: * 1) no explicit interfaces are defined. NID will be assigned to * first non-lo interface that is up; * 2) exactly one explicit interface is defined. For example, - * LNET_NETWORKS='tcp(eth0)' */ + * LNET_NETWORKS='tcp(eth0)' */ if (ni->ni_interfaces[0] == NULL) { char **names; int i, n; - + n = libcfs_ipif_enumerate(&names); if (n <= 0) { CERROR("Can't enumerate interfaces: %d\n", n); @@ -386,26 +383,26 @@ usocklnd_assign_ni_nid(lnet_ni_t *ni) } for (i = 0; i < n; i++) { - + if (!strcmp(names[i], "lo")) /* skip the loopback IF */ continue; - + rc = libcfs_ipif_query(names[i], &up, &ipaddr); if (rc != 0) { CWARN("Can't get interface %s info: %d\n", names[i], rc); continue; } - + if (!up) { CWARN("Ignoring interface %s (down)\n", names[i]); continue; } - + break; /* one address is quite enough */ } - + libcfs_ipif_free_enumeration(names, n); if (i >= n) { @@ -433,13 +430,13 @@ usocklnd_assign_ni_nid(lnet_ni_t *ni) ni->ni_interfaces[0]); return -1; } - + CDEBUG(D_NET, "Explicit interface defined: %s. " "%u.%u.%u.%u used\n", ni->ni_interfaces[0], HIPQUAD(ipaddr)); - + } - + ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ipaddr); return 0; @@ -478,7 +475,7 @@ usocklnd_startup(lnet_ni_t *ni) ni->ni_maxtxcredits = usock_tuns.ut_txcredits; ni->ni_peertxcredits = usock_tuns.ut_peertxcredits; - + usock_data.ud_nets_count++; return 0; @@ -500,14 +497,14 @@ usocklnd_shutdown(lnet_ni_t *ni) net->un_shutdown = 1; - usocklnd_del_all_peers(ni); + usocklnd_del_all_peers(ni); /* Wait for all peer state to clean up */ pthread_mutex_lock(&net->un_lock); - while (net->un_peercount != 0) + while (net->un_peercount != 0) pthread_cond_wait(&net->un_cond, &net->un_lock); pthread_mutex_unlock(&net->un_lock); - + /* Release usock_net_t structure */ pthread_mutex_destroy(&net->un_lock); pthread_cond_destroy(&net->un_cond); @@ -531,7 +528,7 @@ usocklnd_del_all_peers(lnet_ni_t *ni) for (i = 0; i < UD_PEER_HASH_SIZE; i++) { list_for_each_safe (ptmp, pnxt, &usock_data.ud_peers[i]) { peer = list_entry (ptmp, usock_peer_t, up_list); - + if (peer->up_ni != ni) continue; @@ -540,7 +537,7 @@ usocklnd_del_all_peers(lnet_ni_t *ni) } pthread_rwlock_unlock(&usock_data.ud_peers_lock); - + /* wakeup all threads */ for (i = 0; i < usock_data.ud_npollthreads; i++) usocklnd_wakeup_pollthread(i); @@ -566,10 +563,10 @@ void usocklnd_del_conns_locked(usock_peer_t *peer) { int i; - + for (i=0; i < N_CONN_TYPES; i++) { usock_conn_t *conn = peer->up_conns[i]; if (conn != NULL) - usocklnd_conn_kill(conn); - } + usocklnd_conn_kill(conn); + } } diff --git a/lnet/ulnds/socklnd/usocklnd.h b/lnet/ulnds/socklnd/usocklnd.h index 82680ba..4a91918 100644 --- a/lnet/ulnds/socklnd/usocklnd.h +++ b/lnet/ulnds/socklnd/usocklnd.h @@ -60,7 +60,7 @@ typedef struct { struct usock_peer_s; typedef struct { - int uc_fd; /* socket */ + cfs_socket_t *uc_sock; /* socket */ int uc_type; /* conn type */ int uc_activeflag; /* active side of connection? */ int uc_flip; /* is peer other endian? */ @@ -74,7 +74,7 @@ typedef struct { __u32 uc_peer_ip; /* IP address of the peer */ __u16 uc_peer_port; /* port of the peer */ struct list_head uc_stale_list; /* orphaned connections */ - + /* Receive state */ int uc_rx_state; /* message or hello state */ ksock_hello_msg_t *uc_rx_hello; /* hello buffer */ @@ -95,10 +95,10 @@ typedef struct { int uc_tx_flag; /* deadline valid? */ int uc_sending; /* send op is in progress */ usock_tx_t *uc_tx_hello; /* fake tx with hello */ - + cfs_atomic_t uc_refcount; /* # of users */ pthread_mutex_t uc_lock; /* serialize */ - int uc_errored; /* a flag for lnet_notify() */ + int uc_errored; /* a flag for lnet_notify() */ } usock_conn_t; /* Allowable conn states are: */ @@ -133,12 +133,13 @@ typedef struct usock_peer_s { * hasn't been set so far */ cfs_atomic_t up_refcount; /* # of users */ pthread_mutex_t up_lock; /* serialize */ - int up_errored; /* a flag for lnet_notify() */ + int up_errored; /* a flag for lnet_notify() */ cfs_time_t up_last_alive; /* when the peer was last alive */ } usock_peer_t; typedef struct { - int upt_notifier_fd; /* notifier fd for writing */ + cfs_socket_t *upt_notifier[2]; /* notifier sockets: 1st for + writing, 2nd for reading */ struct pollfd *upt_pollfd; /* poll fds */ int upt_nfds; /* active poll fds */ int upt_npollfd; /* allocated poll fds */ @@ -187,7 +188,7 @@ typedef struct { pthread_cond_t un_cond; /* condvar to wait for notifications */ pthread_mutex_t un_lock; /* a lock to protect un_cond */ } usock_net_t; - + typedef struct { int ut_poll_timeout; /* the third arg for poll(2) (seconds) */ int ut_timeout; /* "stuck" socket timeout (seconds) */ @@ -276,7 +277,7 @@ int usocklnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg); int usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen); -int usocklnd_accept(lnet_ni_t *ni, int sock_fd); +int usocklnd_accept(lnet_ni_t *ni, cfs_socket_t *sock); int usocklnd_poll_thread(void *arg); int usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value); @@ -318,12 +319,15 @@ usock_conn_t *usocklnd_conn_allocate(); void usocklnd_conn_free(usock_conn_t *conn); void usocklnd_tear_peer_conn(usock_conn_t *conn); void usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id); -int usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp); +int usocklnd_create_passive_conn(lnet_ni_t *ni, + cfs_socket_t *sock, usock_conn_t **connp); int usocklnd_create_active_conn(usock_peer_t *peer, int type, usock_conn_t **connp); -int usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port); -int usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port); -int usocklnd_set_sock_options(int fd); +int usocklnd_connect_srv_mode(cfs_socket_t **sockp, + __u32 dst_ip, __u16 dst_port); +int usocklnd_connect_cli_mode(cfs_socket_t **sockp, + __u32 dst_ip, __u16 dst_port); +int usocklnd_set_sock_options(cfs_socket_t *sock); usock_tx_t *usocklnd_create_noop_tx(__u64 cookie); usock_tx_t *usocklnd_create_tx(lnet_msg_t *lntmsg); void usocklnd_init_hello_msg(ksock_hello_msg_t *hello, @@ -346,7 +350,7 @@ int usocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id, int usocklnd_find_or_create_conn(usock_peer_t *peer, int type, usock_conn_t **connp, usock_tx_t *tx, usock_zc_ack_t *zc_ack, - int *send_immediately_flag); + int *send_immediately_flag); void usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx); int usocklnd_invert_type(int type); void usocklnd_conn_new_state(usock_conn_t *conn, int new_state); diff --git a/lnet/ulnds/socklnd/usocklnd_cb.c b/lnet/ulnds/socklnd/usocklnd_cb.c index d60fe71..1195f42 100644 --- a/lnet/ulnds/socklnd/usocklnd_cb.c +++ b/lnet/ulnds/socklnd/usocklnd_cb.c @@ -61,29 +61,29 @@ usocklnd_send_tx_immediately(usock_conn_t *conn, usock_tx_t *tx) conn->uc_sending = 0; pthread_mutex_unlock(&conn->uc_lock); partial_send = 1; - } else { + } else { usocklnd_destroy_tx(peer->up_ni, tx); /* NB: lnetmsg was finalized, so we *must* return 0 */ - if (rc < 0) { /* real error */ + if (rc < 0) { /* real error */ usocklnd_conn_kill(conn); return 0; } - + /* rc == 1: tx was sent completely */ rc = 0; /* let's say to caller 'Ok' */ //counter_imm_complete++; } pthread_mutex_lock(&conn->uc_lock); - conn->uc_sending = 0; + conn->uc_sending = 0; /* schedule write handler */ if (partial_send || (conn->uc_state == UC_READY && (!list_empty(&conn->uc_tx_list) || !list_empty(&conn->uc_zcack_list)))) { - conn->uc_tx_deadline = + conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout); conn->uc_tx_flag = 1; rc2 = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST, POLLOUT); @@ -108,16 +108,16 @@ usocklnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) int rc; usock_conn_t *conn; int send_immediately; - + tx = usocklnd_create_tx(lntmsg); if (tx == NULL) return -ENOMEM; - + rc = usocklnd_find_or_create_peer(ni, target, &peer); if (rc) { LIBCFS_FREE (tx, tx->tx_size); return rc; - } + } /* peer cannot disappear now because its refcount was incremented */ type = usocklnd_get_conn_type(lntmsg); @@ -150,7 +150,7 @@ usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, /* I don't think that we'll win much concurrency moving lock() * call below lnet_extract_iov() */ pthread_mutex_lock(&conn->uc_lock); - + conn->uc_rx_lnetmsg = msg; conn->uc_rx_nob_wanted = mlen; conn->uc_rx_nob_left = rlen; @@ -162,8 +162,8 @@ usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, /* the gap between lnet_parse() and usocklnd_recv() happened? */ if (conn->uc_rx_state == UC_RX_PARSE_WAIT) { conn->uc_rx_flag = 1; /* waiting for incoming lnet payload */ - conn->uc_rx_deadline = - cfs_time_shift(usock_tuns.ut_timeout); + conn->uc_rx_deadline = + cfs_time_shift(usock_tuns.ut_timeout); rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, POLLIN); if (rc != 0) { usocklnd_conn_kill_locked(conn); @@ -173,23 +173,23 @@ usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, } conn->uc_rx_state = UC_RX_LNET_PAYLOAD; - recv_out: + recv_out: pthread_mutex_unlock(&conn->uc_lock); usocklnd_conn_decref(conn); return rc; } int -usocklnd_accept(lnet_ni_t *ni, int sock_fd) +usocklnd_accept(lnet_ni_t *ni, cfs_socket_t *sock) { int rc; usock_conn_t *conn; - - rc = usocklnd_create_passive_conn(ni, sock_fd, &conn); + + rc = usocklnd_create_passive_conn(ni, sock, &conn); if (rc) return rc; LASSERT(conn != NULL); - + /* disable shutdown event temporarily */ lnet_ni_addref(ni); @@ -199,7 +199,7 @@ usocklnd_accept(lnet_ni_t *ni, int sock_fd) /* NB: conn reference counter was incremented while adding * poll request if rc == 0 */ - + usocklnd_conn_decref(conn); /* should destroy conn if rc != 0 */ return rc; } -- 1.8.3.1