void cfs_waitq_init(cfs_waitq_t *waitq);
void cfs_waitlink_init(cfs_waitlink_t *link);
void cfs_waitq_add(cfs_waitq_t *waitq, cfs_waitlink_t *link);
-void cfs_waitq_add_exclusive(cfs_waitq_t *waitq,
+void cfs_waitq_add_exclusive(cfs_waitq_t *waitq,
cfs_waitlink_t *link);
void cfs_waitq_del(cfs_waitq_t *waitq, cfs_waitlink_t *link);
int cfs_waitq_active(cfs_waitq_t *waitq);
#define cfs_create_thread(l,m) LBUG()
#endif
-int cfs_parse_int_tunable(int *value, char *name);
uid_t cfs_curproc_uid(void);
#define LIBCFS_REALLOC(ptr, size) realloc(ptr, size)
* Network function used by user-land lnet acceptor
*/
-int libcfs_sock_listen (int *sockp, __u32 local_ip, int local_port, int backlog);
-int libcfs_sock_accept (int *newsockp, int sock, __u32 *peer_ip, int *peer_port);
-int libcfs_sock_read (int sock, void *buffer, int nob, int timeout);
-void libcfs_sock_abort_accept(__u16 port);
+typedef struct cfs_socket {
+ int s_fd;
+} cfs_socket_t;
+
+#define LIBCFS_SOCK2FD(sock) ((sock)->s_fd)
+
+int libcfs_sock_listen(cfs_socket_t **sockp, __u32 ip, int port, int backlog);
+int libcfs_sock_accept(cfs_socket_t **newsockp, cfs_socket_t *sock);
+int libcfs_sock_read(cfs_socket_t *sock, void *buffer, int nob, int timeout);
+int libcfs_sock_write(cfs_socket_t *sock, void *buffer, int nob, int timeout);
+void libcfs_sock_abort_accept(cfs_socket_t *sock);
+void libcfs_sock_release(cfs_socket_t *sock);
+int libcfs_sock_getaddr(cfs_socket_t *sock, int remote, __u32 *ip, int *port);
/*
* Network functions of common use
*/
-int libcfs_getpeername(int sock_fd, __u32 *ipaddr_p, __u16 *port_p);
-int libcfs_socketpair(int *fdp);
-int libcfs_fcntl_nonblock(int fd);
-int libcfs_sock_set_nagle(int fd, int nagle);
-int libcfs_sock_set_bufsiz(int fd, int bufsiz);
-int libcfs_sock_create(int *fdp);
-void libcfs_sock_release(int fd);
-int libcfs_sock_bind_to_port(int fd, __u16 port);
-int libcfs_sock_connect(int fd, __u32 ip, __u16 port);
-int libcfs_sock_writev(int fd, const struct iovec *vector, int count);
-int libcfs_sock_readv(int fd, const struct iovec *vector, int count);
+int libcfs_socketpair(cfs_socket_t **sockp);
+int libcfs_fcntl_nonblock(cfs_socket_t *sock);
+int libcfs_sock_set_nagle(cfs_socket_t *sock, int nagle);
+int libcfs_sock_set_bufsiz(cfs_socket_t *sock, int bufsiz);
+int libcfs_sock_connect(cfs_socket_t *sock, __u32 ip, __u16 port);
+int libcfs_sock_writev(cfs_socket_t *sock,
+ const struct iovec *vector, int count);
+int libcfs_sock_readv(cfs_socket_t *sock,
+ const struct iovec *vector, int count);
+int libcfs_sock_create(cfs_socket_t **sockp, int *fatal,
+ __u32 local_ip, int local_port);
+
/*
* Macros for easy printing IP-adresses
return getuid();
}
-int cfs_parse_int_tunable(int *value, char *name)
-{
- char *env = getenv(name);
- char *end;
-
- if (env == NULL)
- return 0;
-
- *value = (int)strtoull(env, &end, 0);
- if (*end == 0)
- return 0;
-
- CERROR("Can't parse tunable %s=%s\n", name, env);
- return -EINVAL;
-}
-
-
void cfs_enter_debugger(void)
{
/*
#include <libcfs/libcfs.h>
#include <sys/socket.h>
-#ifdef HAVE_NETINET_IN_H
+#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#include <netinet/tcp.h>
*/
int
-libcfs_sock_listen (int *sockp, __u32 local_ip, int local_port, int backlog)
+libcfs_sock_listen (cfs_socket_t **sockp,
+ __u32 local_ip, int local_port, int backlog)
{
- int rc;
- int option;
- struct sockaddr_in locaddr;
-
- *sockp = socket(AF_INET, SOCK_STREAM, 0);
- if (*sockp < 0) {
- rc = -errno;
- CERROR("socket() failed: errno==%d\n", errno);
- return rc;
- }
-
- option = 1;
- if ( setsockopt(*sockp, SOL_SOCKET, SO_REUSEADDR,
- (char *)&option, sizeof (option)) ) {
- rc = -errno;
- CERROR("setsockopt(SO_REUSEADDR) failed: errno==%d\n", errno);
- goto failed;
- }
+ int rc;
+ int fatal;
- if (local_ip != 0 || local_port != 0) {
- memset(&locaddr, 0, sizeof(locaddr));
- locaddr.sin_family = AF_INET;
- locaddr.sin_port = htons(local_port);
- locaddr.sin_addr.s_addr = (local_ip == 0) ?
- INADDR_ANY : htonl(local_ip);
-
- if ( bind(*sockp, (struct sockaddr *)&locaddr, sizeof(locaddr)) ) {
- rc = -errno;
- if ( errno == -EADDRINUSE )
- CDEBUG(D_NET, "Port %d already in use\n",
- local_port);
- else
- CERROR("bind() to port %d failed: errno==%d\n",
- local_port, errno);
- goto failed;
- }
- }
+ rc = libcfs_sock_create(sockp, &fatal, local_ip, local_port);
+ if (rc != 0)
+ return rc;
- if ( listen(*sockp, backlog) ) {
+ if ( listen((*sockp)->s_fd, backlog) ) {
rc = -errno;
CERROR("listen() with backlog==%d failed: errno==%d\n",
backlog, errno);
goto failed;
}
-
+
return 0;
failed:
- close(*sockp);
+ libcfs_sock_release(*sockp);
return rc;
}
+void
+libcfs_sock_release (cfs_socket_t *sock)
+{
+ close(sock->s_fd);
+ LIBCFS_FREE(sock, sizeof(cfs_socket_t));
+}
+
int
-libcfs_sock_accept (int *newsockp, int sock, __u32 *peer_ip, int *peer_port)
+libcfs_sock_accept (cfs_socket_t **newsockp, cfs_socket_t *sock)
{
struct sockaddr_in accaddr;
- socklen_t accaddr_len = sizeof(struct sockaddr_in);
+ socklen_t accaddr_len = sizeof(struct sockaddr_in);
- *newsockp = accept(sock, (struct sockaddr *)&accaddr, &accaddr_len);
+ LIBCFS_ALLOC(*newsockp, sizeof(cfs_socket_t));
+ if (*newsockp == NULL) {
+ CERROR ("Can't alloc memory for cfs_socket_t\n");
+ return -ENOMEM;
+ }
- if ( *newsockp < 0 ) {
- CERROR("accept() failed: errno==%d\n", errno);
- return -errno;
+ (*newsockp)->s_fd = accept(sock->s_fd,
+ (struct sockaddr *)&accaddr, &accaddr_len);
+
+ if ( (*newsockp)->s_fd < 0 ) {
+ int rc = -errno;
+ CERROR("accept() failed: errno==%d\n", -rc);
+ LIBCFS_FREE(*newsockp, sizeof(cfs_socket_t));
+ return rc;
}
- *peer_ip = ntohl(accaddr.sin_addr.s_addr);
- *peer_port = ntohs(accaddr.sin_port);
-
return 0;
}
int
-libcfs_sock_read (int sock, void *buffer, int nob, int timeout)
+libcfs_sock_read (cfs_socket_t *sock, void *buffer, int nob, int timeout)
{
- int rc;
+ int rc;
struct pollfd pfd;
- cfs_time_t start_time = cfs_time_current();
+ cfs_time_t start_time = cfs_time_current();
- pfd.fd = sock;
+ pfd.fd = sock->s_fd;
pfd.events = POLLIN;
pfd.revents = 0;
/* poll(2) measures timeout in msec */
timeout *= 1000;
-
+
while (nob != 0 && timeout > 0) {
cfs_time_t current_time;
return -ETIMEDOUT;
if ((pfd.revents & POLLIN) == 0)
return -EIO;
-
- rc = read(sock, buffer, nob);
+
+ rc = read(sock->s_fd, buffer, nob);
+ if (rc < 0)
+ return -errno;
+ if (rc == 0)
+ return -EIO;
+
+ buffer = ((char *)buffer) + rc;
+ nob -= rc;
+
+ current_time = cfs_time_current();
+ timeout -= 1000 *
+ cfs_duration_sec(cfs_time_sub(current_time,
+ start_time));
+ start_time = current_time;
+ }
+
+ if (nob == 0)
+ return 0;
+ else
+ return -ETIMEDOUT;
+}
+
+int
+libcfs_sock_write (cfs_socket_t *sock, void *buffer, int nob, int timeout)
+{
+ int rc;
+ struct pollfd pfd;
+ cfs_time_t start_time = cfs_time_current();
+
+ pfd.fd = sock->s_fd;
+ pfd.events = POLLOUT;
+ pfd.revents = 0;
+
+ /* poll(2) measures timeout in msec */
+ timeout *= 1000;
+
+ while (nob != 0 && timeout > 0) {
+ cfs_time_t current_time;
+
+ rc = poll(&pfd, 1, timeout);
+ if (rc < 0)
+ return -errno;
+ if (rc == 0)
+ return -ETIMEDOUT;
+ if ((pfd.revents & POLLOUT) == 0)
+ return -EIO;
+
+ rc = write(sock->s_fd, buffer, nob);
if (rc < 0)
return -errno;
if (rc == 0)
return -EIO;
-
+
buffer = ((char *)buffer) + rc;
nob -= rc;
current_time = cfs_time_current();
- timeout -= cfs_duration_sec(cfs_time_sub(cfs_time_current(),
- start_time));
+ timeout -= 1000 *
+ cfs_duration_sec(cfs_time_sub(current_time,
+ start_time));
+ start_time = current_time;
}
-
+
if (nob == 0)
return 0;
else
/* Just try to connect to localhost to wake up entity that are
* sleeping in accept() */
void
-libcfs_sock_abort_accept(__u16 port)
+libcfs_sock_abort_accept(cfs_socket_t *sock)
{
int fd, rc;
+ struct sockaddr_in remaddr;
struct sockaddr_in locaddr;
+ socklen_t alen = sizeof(struct sockaddr_in);
+
+ rc = getsockname(sock->s_fd, (struct sockaddr *)&remaddr, &alen);
+ if ( rc != 0 ) {
+ CERROR("getsockname() failed: errno==%d\n", errno);
+ return;
+ }
memset(&locaddr, 0, sizeof(locaddr));
locaddr.sin_family = AF_INET;
- locaddr.sin_port = htons(port);
+ locaddr.sin_port = remaddr.sin_port;
locaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
fd = socket(AF_INET, SOCK_STREAM, 0);
if ( fd < 0 ) {
CERROR("socket() failed: errno==%d\n", errno);
return;
- }
-
+ }
+
rc = connect(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
if ( rc != 0 ) {
if ( errno != ECONNREFUSED )
CERROR("connect() failed: errno==%d\n", errno);
else
- CDEBUG(D_NET, "Nobody to wake up at %d\n", port);
+ CDEBUG(D_NET, "Nobody to wake up at %d\n",
+ ntohs(remaddr.sin_port));
}
-
+
close(fd);
}
-/*
- * Network functions of common use
- */
-
int
-libcfs_getpeername(int sock_fd, __u32 *ipaddr_p, __u16 *port_p)
+libcfs_sock_getaddr(cfs_socket_t *sock, int remote, __u32 *ip, int *port)
{
int rc;
struct sockaddr_in peer_addr;
socklen_t peer_addr_len = sizeof(peer_addr);
- rc = getpeername(sock_fd, (struct sockaddr *)&peer_addr, &peer_addr_len);
+ LASSERT(remote == 1);
+
+ rc = getpeername(sock->s_fd,
+ (struct sockaddr *)&peer_addr, &peer_addr_len);
if (rc != 0)
return -errno;
-
- if (ipaddr_p != NULL)
- *ipaddr_p = ntohl(peer_addr.sin_addr.s_addr);
- if (port_p != NULL)
- *port_p = ntohs(peer_addr.sin_port);
- return 0;
+ if (ip != NULL)
+ *ip = ntohl(peer_addr.sin_addr.s_addr);
+ if (port != NULL)
+ *port = ntohs(peer_addr.sin_port);
+
+ return rc;
}
+/*
+ * Network functions of common use
+ */
+
int
-libcfs_socketpair(int *fdp)
+libcfs_socketpair(cfs_socket_t **sockp)
{
- int rc, i;
-
+ int rc, i, fdp[2];
+
+ LIBCFS_ALLOC(sockp[0], sizeof(cfs_socket_t));
+ if (sockp[0] == NULL) {
+ CERROR ("Can't alloc memory for cfs_socket_t (1)\n");
+ return -ENOMEM;
+ }
+
+ LIBCFS_ALLOC(sockp[1], sizeof(cfs_socket_t));
+ if (sockp[1] == NULL) {
+ CERROR ("Can't alloc memory for cfs_socket_t (2)\n");
+ LIBCFS_FREE(sockp[0], sizeof(cfs_socket_t));
+ return -ENOMEM;
+ }
+
rc = socketpair(AF_UNIX, SOCK_STREAM, 0, fdp);
if (rc != 0) {
rc = -errno;
CERROR ("Cannot create socket pair\n");
+ LIBCFS_FREE(sockp[0], sizeof(cfs_socket_t));
+ LIBCFS_FREE(sockp[1], sizeof(cfs_socket_t));
return rc;
}
-
+
+ sockp[0]->s_fd = fdp[0];
+ sockp[1]->s_fd = fdp[1];
+
for (i = 0; i < 2; i++) {
- rc = libcfs_fcntl_nonblock(fdp[i]);
+ rc = libcfs_fcntl_nonblock(sockp[i]);
if (rc) {
- close(fdp[0]);
- close(fdp[1]);
+ libcfs_sock_release(sockp[0]);
+ libcfs_sock_release(sockp[1]);
return rc;
}
}
-
+
return 0;
}
int
-libcfs_fcntl_nonblock(int fd)
+libcfs_fcntl_nonblock(cfs_socket_t *sock)
{
int rc, flags;
-
- flags = fcntl(fd, F_GETFL, 0);
+
+ flags = fcntl(sock->s_fd, F_GETFL, 0);
if (flags == -1) {
rc = -errno;
CERROR ("Cannot get socket flags\n");
return rc;
}
-
- rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+
+ rc = fcntl(sock->s_fd, F_SETFL, flags | O_NONBLOCK);
if (rc != 0) {
rc = -errno;
CERROR ("Cannot set socket flags\n");
return rc;
}
-
+
return 0;
}
int
-libcfs_sock_set_nagle(int fd, int nagle)
+libcfs_sock_set_nagle(cfs_socket_t *sock, int nagle)
{
int rc;
int option = nagle ? 0 : 1;
#if defined(__sun__) || defined(__sun)
- rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &option, sizeof(option));
+ rc = setsockopt(sock->s_fd,
+ IPPROTO_TCP, TCP_NODELAY, &option, sizeof(option));
#else
- rc = setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
+ rc = setsockopt(sock->s_fd,
+ SOL_TCP, TCP_NODELAY, &option, sizeof(option));
#endif
if (rc != 0) {
}
int
-libcfs_sock_set_bufsiz(int fd, int bufsiz)
+libcfs_sock_set_bufsiz(cfs_socket_t *sock, int bufsiz)
{
int rc, option;
-
+
LASSERT (bufsiz != 0);
option = bufsiz;
- rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
+ rc = setsockopt(sock->s_fd,
+ SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
if (rc != 0) {
rc = -errno;
CERROR ("Cannot set SNDBUF socket option\n");
}
option = bufsiz;
- rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
+ rc = setsockopt(sock->s_fd,
+ SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
if (rc != 0) {
rc = -errno;
CERROR ("Cannot set RCVBUF socket option\n");
}
int
-libcfs_sock_create(int *fdp)
+libcfs_sock_bind(cfs_socket_t *sock, __u32 ip, __u16 port)
{
- int rc, fd, option;
+ int rc;
+ struct sockaddr_in locaddr;
- fd = socket(AF_INET, SOCK_STREAM, 0);
- if (fd < 0) {
- rc = -errno;
- CERROR ("Cannot create socket\n");
- return rc;
- }
+ if (ip == 0 && port == 0)
+ return 0;
- option = 1;
- rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
- &option, sizeof(option));
+ memset(&locaddr, 0, sizeof(locaddr));
+ locaddr.sin_family = AF_INET;
+ locaddr.sin_addr.s_addr = (ip == 0) ? INADDR_ANY : htonl(ip);
+ locaddr.sin_port = htons(port);
+
+ rc = bind(sock->s_fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
if (rc != 0) {
rc = -errno;
- CERROR ("Cannot set SO_REUSEADDR for socket\n");
- close(fd);
+ CERROR("Cannot bind to %d.%d.%d.%d %d: %d\n",
+ HIPQUAD(ip), port, rc);
return rc;
- }
-
- *fdp = fd;
- return 0;
-}
+ }
-void libcfs_sock_release(int fd)
-{
- close(fd);
+ return 0;
}
int
-libcfs_sock_bind_to_port(int fd, __u16 port)
+libcfs_sock_create(cfs_socket_t **sockp, int *fatal,
+ __u32 local_ip, int local_port)
{
- int rc;
- struct sockaddr_in locaddr;
+ int rc, fd, option;
- memset(&locaddr, 0, sizeof(locaddr));
- locaddr.sin_family = AF_INET;
- locaddr.sin_addr.s_addr = INADDR_ANY;
- locaddr.sin_port = htons(port);
+ *fatal = 1;
- rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
+ LIBCFS_ALLOC(*sockp, sizeof(cfs_socket_t));
+ if (*sockp == NULL) {
+ CERROR("Can't alloc memory for cfs_socket_t\n");
+ return -ENOMEM;
+ }
+
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd < 0) {
+ rc = -errno;
+ CERROR("Cannot create socket: %d\n", rc);
+ LIBCFS_FREE(*sockp, sizeof(cfs_socket_t));
+ return rc;
+ }
+
+ (*sockp)->s_fd = fd;
+
+ option = 1;
+ rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
+ &option, sizeof(option));
if (rc != 0) {
rc = -errno;
- CERROR ("Cannot bind to port %d\n", port);
+ CERROR("Cannot set SO_REUSEADDR for socket: %d\n", rc);
+ libcfs_sock_release(*sockp);
return rc;
}
- return 0;
+ rc = libcfs_sock_bind(*sockp, local_ip, local_port);
+ if (rc != 0) {
+ *fatal = 0;
+ libcfs_sock_release(*sockp);
+ }
+
+ return rc;
}
int
-libcfs_sock_connect(int fd, __u32 ip, __u16 port)
+libcfs_sock_connect(cfs_socket_t *sock, __u32 ip, __u16 port)
{
int rc;
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(ip);
addr.sin_port = htons(port);
-
- rc = connect(fd, (struct sockaddr *)&addr,
+
+ rc = connect(sock->s_fd, (struct sockaddr *)&addr,
sizeof(struct sockaddr_in));
if(rc != 0 && errno != EINPROGRESS) {
/* NB: EPIPE and ECONNRESET are considered as non-fatal
* because:
* 1) it still makes sense to continue reading &&
- * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
-int libcfs_sock_writev(int fd, const struct iovec *vector, int count)
+ * 2) anyway, poll() will set up POLLHUP|POLLERR flags */
+int
+libcfs_sock_writev(cfs_socket_t *sock, const struct iovec *vector, int count)
{
int rc;
-
- rc = syscall(SYS_writev, fd, vector, count);
-
- if (rc == 0) /* write nothing */
+
+ rc = syscall(SYS_writev, sock->s_fd, vector, count);
+
+ if (rc == 0) /* write nothing */
return 0;
-
+
if (rc < 0) {
if (errno == EAGAIN || /* write nothing */
errno == EPIPE || /* non-fatal error */
return rc;
}
-int libcfs_sock_readv(int fd, const struct iovec *vector, int count)
+int
+libcfs_sock_readv(cfs_socket_t *sock, const struct iovec *vector, int count)
{
int rc;
-
- rc = syscall(SYS_readv, fd, vector, count);
-
- if (rc == 0) /* EOF */
+
+ rc = syscall(SYS_readv, sock->s_fd, vector, count);
+
+ if (rc == 0) /* EOF */
return -EIO;
-
+
if (rc < 0) {
if (errno == EAGAIN) /* read nothing */
return 0;
wh->wh_object_cookie == LNET_WIRE_HANDLE_COOKIE_NONE);
}
-static inline int lnet_md_exhausted (lnet_libmd_t *md)
+static inline int lnet_md_exhausted (lnet_libmd_t *md)
{
return (md->md_threshold == 0 ||
((md->md_options & LNET_MD_MAX_SIZE) != 0 &&
}
#ifdef __KERNEL__
-#define LNET_LOCK() spin_lock(&the_lnet.ln_lock)
-#define LNET_UNLOCK() spin_unlock(&the_lnet.ln_lock)
+#define LNET_LOCK() spin_lock(&the_lnet.ln_lock)
+#define LNET_UNLOCK() spin_unlock(&the_lnet.ln_lock)
#define LNET_MUTEX_DOWN(m) mutex_down(m)
#define LNET_MUTEX_UP(m) mutex_up(m)
#else
if (list_empty (&fl->fl_list))
return (NULL);
-
+
o = list_entry (fl->fl_list.next, lnet_freeobj_t, fo_list);
list_del (&o->fo_list);
return ((void *)&o->fo_contents);
{
/* ALWAYS called with liblock held */
lnet_freeobj_t *o = list_entry (obj, lnet_freeobj_t, fo_contents);
-
+
list_add (&o->fo_list, &fl->fl_list);
}
{
/* NEVER called with liblock held */
lnet_eq_t *eq;
-
+
LNET_LOCK();
eq = (lnet_eq_t *)lnet_freelist_alloc(&the_lnet.ln_free_eqs);
LNET_UNLOCK();
{
/* NEVER called with liblock held */
lnet_libmd_t *md;
-
+
LNET_LOCK();
md = (lnet_libmd_t *)lnet_freelist_alloc(&the_lnet.ln_free_mds);
LNET_UNLOCK();
{
/* NEVER called with liblock held */
lnet_me_t *me;
-
+
LNET_LOCK();
me = (lnet_me_t *)lnet_freelist_alloc(&the_lnet.ln_free_mes);
LNET_UNLOCK();
-
+
return (me);
}
{
/* NEVER called with liblock held */
lnet_msg_t *msg;
-
+
LNET_LOCK();
msg = (lnet_msg_t *)lnet_freelist_alloc(&the_lnet.ln_free_msgs);
LNET_UNLOCK();
md->md_niov = niov;
CFS_INIT_LIST_HEAD(&md->md_list);
}
-
+
return (md);
}
-static inline void
+static inline void
lnet_md_free (lnet_libmd_t *md)
{
/* ALWAYS called with liblock held */
return (me);
}
-static inline void
+static inline void
lnet_me_free(lnet_me_t *me)
{
/* ALWAYS called with liblock held */
return (msg);
}
-static inline void
+static inline void
lnet_msg_free(lnet_msg_t *msg)
{
/* ALWAYS called with liblock held */
lnet_handle2eq (lnet_handle_eq_t *handle)
{
/* ALWAYS called with liblock held */
- lnet_libhandle_t *lh = lnet_lookup_cookie(handle->cookie,
+ lnet_libhandle_t *lh = lnet_lookup_cookie(handle->cookie,
LNET_COOKIE_TYPE_EQ);
if (lh == NULL)
return (NULL);
{
/* ALWAYS called with liblock held */
lnet_libhandle_t *lh;
-
+
if (wh->wh_interface_cookie != the_lnet.ln_interface_cookie)
return (NULL);
-
+
lh = lnet_lookup_cookie(wh->wh_object_cookie,
LNET_COOKIE_TYPE_MD);
if (lh == NULL)
}
static inline void
-lnet_ni_addref_locked(lnet_ni_t *ni)
+lnet_ni_addref_locked(lnet_ni_t *ni)
{
LASSERT (ni->ni_refcount > 0);
ni->ni_refcount++;
}
static inline void
-lnet_ni_addref(lnet_ni_t *ni)
+lnet_ni_addref(lnet_ni_t *ni)
{
LNET_LOCK();
lnet_ni_addref_locked(ni);
static inline struct list_head *
lnet_nid2peerhash (lnet_nid_t nid)
{
- unsigned int idx = LNET_NIDADDR(nid) % LNET_PEER_HASHSIZE;
+ unsigned int idx = LNET_NIDADDR(nid) % LNET_PEER_HASHSIZE;
return &the_lnet.ln_peer_hash[idx];
}
extern lnet_ni_t *lnet_nid2ni_locked (lnet_nid_t nid);
extern lnet_ni_t *lnet_net2ni_locked (__u32 net);
static inline lnet_ni_t *
-lnet_net2ni (__u32 net)
+lnet_net2ni (__u32 net)
{
lnet_ni_t *ni;
int lnet_check_routes(void);
int lnet_del_route(__u32 net, lnet_nid_t gw_nid);
void lnet_destroy_routes(void);
-int lnet_get_route(int idx, __u32 *net, __u32 *hops,
+int lnet_get_route(int idx, __u32 *net, __u32 *hops,
lnet_nid_t *gateway, __u32 *alive);
void lnet_proc_init(void);
void lnet_proc_fini(void);
unsigned int offset, unsigned int len);
unsigned int lnet_kiov_nob (unsigned int niov, lnet_kiov_t *iov);
-int lnet_extract_kiov (int dst_niov, lnet_kiov_t *dst,
+int lnet_extract_kiov (int dst_niov, lnet_kiov_t *dst,
int src_niov, lnet_kiov_t *src,
unsigned int offset, unsigned int len);
-void lnet_copy_iov2iov (unsigned int ndiov, struct iovec *diov,
- unsigned int doffset,
- unsigned int nsiov, struct iovec *siov,
+void lnet_copy_iov2iov (unsigned int ndiov, struct iovec *diov,
+ unsigned int doffset,
+ unsigned int nsiov, struct iovec *siov,
unsigned int soffset, unsigned int nob);
-void lnet_copy_kiov2iov (unsigned int niov, struct iovec *iov,
+void lnet_copy_kiov2iov (unsigned int niov, struct iovec *iov,
unsigned int iovoffset,
- unsigned int nkiov, lnet_kiov_t *kiov,
+ unsigned int nkiov, lnet_kiov_t *kiov,
unsigned int kiovoffset, unsigned int nob);
-void lnet_copy_iov2kiov (unsigned int nkiov, lnet_kiov_t *kiov,
+void lnet_copy_iov2kiov (unsigned int nkiov, lnet_kiov_t *kiov,
unsigned int kiovoffset,
- unsigned int niov, struct iovec *iov,
+ unsigned int niov, struct iovec *iov,
unsigned int iovoffset, unsigned int nob);
-void lnet_copy_kiov2kiov (unsigned int ndkiov, lnet_kiov_t *dkiov,
- unsigned int doffset,
- unsigned int nskiov, lnet_kiov_t *skiov,
+void lnet_copy_kiov2kiov (unsigned int ndkiov, lnet_kiov_t *dkiov,
+ unsigned int doffset,
+ unsigned int nskiov, lnet_kiov_t *skiov,
unsigned int soffset, unsigned int nob);
static inline void
int lnet_create_peer_table(void);
void lnet_debug_peer(lnet_nid_t nid);
+#ifndef __KERNEL__
+static inline int
+lnet_parse_int_tunable(int *value, char *name)
+{
+ char *env = getenv(name);
+ char *end;
+
+ if (env == NULL)
+ return 0;
+
+ *value = strtoull(env, &end, 0);
+ if (*end == 0)
+ return 0;
+
+ CERROR("Can't parse tunable %s=%s\n", name, env);
+ return -EINVAL;
+}
+#endif
+
#endif
#include <libcfs/list.h>
#include <lnet/types.h>
-#define WIRE_ATTR __attribute__((packed))
+#define WIRE_ATTR __attribute__((packed))
/* The wire handle's interface cookie only matches one network interface in
* one epoch (i.e. new cookie when the interface restarts or the node
/* A HELLO message contains a magic number and protocol version
* code in the header's dest_nid, the peer's NID in the src_nid, and
* LNET_MSG_HELLO in the type field. All other common fields are zero
- * (including payload_size; i.e. no payload).
+ * (including payload_size; i.e. no payload).
* This is for use by byte-stream LNDs (e.g. TCP/IP) to check the peer is
* running the same protocol and to find out its NID. These LNDs should
* exchange HELLO messages when a connection is first established. Individual
* LNDs can put whatever else they fancy in lnet_hdr_t::msg.
*/
typedef struct {
- __u32 magic; /* LNET_PROTO_TCP_MAGIC */
+ __u32 magic; /* LNET_PROTO_TCP_MAGIC */
__u16 version_major; /* increment on incompatible change */
__u16 version_minor; /* increment on compatible change */
} WIRE_ATTR lnet_magicversion_t;
} lnet_libhandle_t;
#define lh_entry(ptr, type, member) \
- ((type *)((char *)(ptr)-(char *)(&((type *)0)->member)))
+ ((type *)((char *)(ptr)-(char *)(&((type *)0)->member)))
typedef struct lnet_eq {
struct list_head eq_list;
#ifdef LNET_USE_LIB_FREELIST
typedef struct
{
- void *fl_objs; /* single contiguous array of objects */
+ void *fl_objs; /* single contiguous array of objects */
int fl_nobjs; /* the number of them */
int fl_objsize; /* the size (including overhead) of each of them */
struct list_head fl_list; /* where they are enqueued */
/* fields initialised by the LND */
unsigned int lnd_type;
-
+
int (*lnd_startup) (struct lnet_ni *ni);
void (*lnd_shutdown) (struct lnet_ni *ni);
int (*lnd_ctl)(struct lnet_ni *ni, unsigned int cmd, void *arg);
/* In data movement APIs below, payload buffers are described as a set
* of 'niov' fragments which are...
- * EITHER
+ * EITHER
* in virtual memory (struct iovec *iov != NULL)
* OR
* in pages (kernel only: plt_kiov_t *kiov != NULL).
* The LND may NOT overwrite these fragment descriptors.
* An 'offset' and may specify a byte offset within the set of
- * fragments to start from
+ * fragments to start from
*/
/* Start sending a preformatted message. 'private' is NULL for PUT and
- * GET messages; otherwise this is a response to an incoming message
- * and 'private' is the 'private' passed to lnet_parse(). Return
- * non-zero for immediate failure, otherwise complete later with
- * lnet_finalize() */
- int (*lnd_send)(struct lnet_ni *ni, void *private, lnet_msg_t *msg);
+ * GET messages; otherwise this is a response to an incoming message
+ * and 'private' is the 'private' passed to lnet_parse(). Return
+ * non-zero for immediate failure, otherwise complete later with
+ * lnet_finalize() */
+ int (*lnd_send)(struct lnet_ni *ni, void *private, lnet_msg_t *msg);
/* Start receiving 'mlen' bytes of payload data, skipping the following
* 'rlen' - 'mlen' bytes. 'private' is the 'private' passed to
* lnet_parse(). Return non-zero for immedaite failure, otherwise
* complete later with lnet_finalize(). This also gives back a receive
* credit if the LND does flow control. */
- int (*lnd_recv)(struct lnet_ni *ni, void *private, lnet_msg_t *msg,
- int delayed, unsigned int niov,
+ int (*lnd_recv)(struct lnet_ni *ni, void *private, lnet_msg_t *msg,
+ int delayed, unsigned int niov,
struct iovec *iov, lnet_kiov_t *kiov,
unsigned int offset, unsigned int mlen, unsigned int rlen);
/* notification of peer health */
void (*lnd_notify)(struct lnet_ni *ni, lnet_nid_t peer, int alive);
-#ifdef __KERNEL__
+#if defined(__KERNEL__) || defined(HAVE_LIBPTHREAD)
/* accept a new connection */
int (*lnd_accept)(struct lnet_ni *ni, cfs_socket_t *sock);
-#else
+#endif
+
+#ifndef __KERNEL__
/* wait for something to happen */
void (*lnd_wait)(struct lnet_ni *ni, int milliseconds);
/* ensure non-RDMA messages can be received outside liblustre */
int (*lnd_setasync)(struct lnet_ni *ni, lnet_process_id_t id, int nasync);
-
-#ifdef HAVE_LIBPTHREAD
- int (*lnd_accept)(struct lnet_ni *ni, int sock);
-#endif
#endif
} lnd_t;
} lnet_peer_t;
typedef struct {
- struct list_head lr_list; /* chain on net */
+ struct list_head lr_list; /* chain on net */
lnet_peer_t *lr_gateway; /* router node */
} lnet_route_t;
struct list_head *ln_peer_hash; /* NID->peer hash */
int ln_npeers; /* # peers extant */
int ln_peertable_version; /* /proc validity stamp */
-
+
int ln_routing; /* am I a router? */
lnet_rtrbufpool_t ln_rtrpools[LNET_NRBPOOLS]; /* router buffer pools */
-
+
int ln_lh_hash_size; /* size of lib handle hash table */
struct list_head *ln_lh_hash_table; /* all extant lib handles, this interface */
__u64 ln_next_object_cookie; /* cookie generator */
lnet_ping_info_t *ln_ping_info;
#ifdef __KERNEL__
- int ln_rc_state; /* router checker startup/shutdown state */
- struct semaphore ln_rc_signal; /* serialise startup/shutdown */
+ int ln_rc_state; /* router checker startup/shutdown state */
+ struct semaphore ln_rc_signal; /* serialise startup/shutdown */
lnet_handle_eq_t ln_rc_eqh; /* router checker's event queue */
#endif
-
+
#ifdef LNET_USE_LIB_FREELIST
lnet_freelist_t ln_free_mes;
lnet_freelist_t ln_free_msgs;
* call lnet_server_mode() */
int ln_server_mode_flag;
-#endif
+#endif
} lnet_t;
#endif
#define DEBUG_SUBSYSTEM S_LNET
#include <lnet/lib-lnet.h>
+#if defined(__KERNEL__) || defined(HAVE_LIBPTHREAD)
+
+static int accept_port = 988;
+static int accept_backlog = 127;
+static int accept_timeout = 5;
+
+struct {
+ int pta_shutdown;
+ cfs_socket_t *pta_sock;
#ifdef __KERNEL__
+ struct semaphore pta_signal;
+#else
+ struct cfs_completion pta_signal;
+#endif
+} lnet_acceptor_state;
+
+int
+lnet_acceptor_port(void)
+{
+ return accept_port;
+}
+
+static inline int
+lnet_accept_magic(__u32 magic, __u32 constant)
+{
+ return (magic == constant ||
+ magic == __swab32(constant));
+}
+
+#ifdef __KERNEL__
+
+#define cfs_init_completion(c) init_mutex_locked(c)
+#define cfs_wait_for_completion(c) mutex_down(c)
+#define cfs_complete(c) mutex_up(c)
+#define cfs_fini_completion(c) do { } while (0)
+#define cfs_create_thread(func, a) cfs_kernel_thread(func, a, 0)
+
+EXPORT_SYMBOL(lnet_acceptor_port);
+
static char *accept = "secure";
+
CFS_MODULE_PARM(accept, "s", charp, 0444,
"Accept connections (secure|all|none)");
-
-static int accept_port = 988;
CFS_MODULE_PARM(accept_port, "i", int, 0444,
"Acceptor's port (same on all nodes)");
-
-static int accept_backlog = 127;
CFS_MODULE_PARM(accept_backlog, "i", int, 0444,
"Acceptor's listen backlog");
-
-static int accept_timeout = 5;
CFS_MODULE_PARM(accept_timeout, "i", int, 0644,
- "Acceptor's timeout (seconds)");
-
-struct {
- int pta_shutdown;
- cfs_socket_t *pta_sock;
- struct semaphore pta_signal;
-} lnet_acceptor_state;
+ "Acceptor's timeout (seconds)");
int
lnet_acceptor_timeout(void)
}
EXPORT_SYMBOL(lnet_acceptor_timeout);
-int
-lnet_acceptor_port(void)
-{
- return accept_port;
-}
-EXPORT_SYMBOL(lnet_acceptor_port);
-
void
-lnet_connect_console_error (int rc, lnet_nid_t peer_nid,
+lnet_connect_console_error (int rc, lnet_nid_t peer_nid,
__u32 peer_ip, int peer_port)
{
switch (rc) {
CLASSERT (sizeof(cr) <= 16); /* not too big to be on the stack */
- for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
- port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
+ for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
+ port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
--port) {
/* Iterate through reserved ports. */
- rc = libcfs_sock_connect(&sock, &fatal,
- local_ip, port,
+ rc = libcfs_sock_connect(&sock, &fatal,
+ local_ip, port,
peer_ip, peer_port);
if (rc != 0) {
if (fatal)
accept_timeout);
if (rc != 0)
goto failed_sock;
-
+
*sockp = sock;
return 0;
}
rc = -EADDRINUSE;
goto failed;
-
+
failed_sock:
libcfs_sock_release(sock);
failed:
}
EXPORT_SYMBOL(lnet_connect);
-static inline int
-lnet_accept_magic(__u32 magic, __u32 constant)
+#else /* below is multi-threaded user-space code */
+
+static char *accept_type = "secure";
+
+int
+lnet_acceptor_get_tunables()
{
- return (magic == constant ||
- magic == __swab32(constant));
+ int rc;
+ char *env = getenv("LNET_ACCEPT");
+
+ if (env != NULL)
+ accept_type = env;
+
+ rc = lnet_parse_int_tunable(&accept_port, "LNET_ACCEPT_PORT");
+
+ if (rc != 0)
+ return rc;
+
+ rc = lnet_parse_int_tunable(&accept_backlog, "LNET_ACCEPT_BACKLOG");
+
+ if (rc != 0)
+ return rc;
+
+ rc = lnet_parse_int_tunable(&accept_timeout, "LNET_ACCEPT_TIMEOUT");
+
+ if (rc != 0)
+ return rc;
+
+ CDEBUG(D_NET, "accept_type = %s\n", accept_type);
+ CDEBUG(D_NET, "accept_port = %d\n", accept_port);
+ CDEBUG(D_NET, "accept_backlog = %d\n", accept_backlog);
+ CDEBUG(D_NET, "accept_timeout = %d\n", accept_timeout);
+ return 0;
}
+#endif /* __KERNEL__ */
+
+/* Below is the code common for both kernel and MT user-space */
+
int
lnet_accept(cfs_socket_t *sock, __u32 magic)
{
char *str;
LASSERT (sizeof(cr) <= 16); /* not too big for the stack */
-
+
rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
LASSERT (rc == 0); /* we succeeded before */
str = "'old' openibnal";
else
str = "unrecognised";
-
+
LCONSOLE_ERROR_MSG(0x11f, "Refusing connection from %u.%u.%u.%u"
" magic %08x: %s acceptor protocol\n",
HIPQUAD(peer_ip), magic, str);
flip = (magic != LNET_PROTO_ACCEPTOR_MAGIC);
- rc = libcfs_sock_read(sock, &cr.acr_version,
+ rc = libcfs_sock_read(sock, &cr.acr_version,
sizeof(cr.acr_version),
accept_timeout);
if (rc != 0) {
if (flip)
__swab32s(&cr.acr_version);
-
+
if (cr.acr_version != LNET_PROTO_ACCEPTOR_VERSION) {
/* future version compatibility!
* An acceptor-specific protocol rev will first send a version
libcfs_nid2str(cr.acr_nid), HIPQUAD(peer_ip));
rc = ni->ni_lnd->lnd_accept(ni, sock);
- lnet_ni_decref(ni);
- return rc;
-}
-
-int
-lnet_acceptor(void *arg)
-{
- char name[16];
- cfs_socket_t *newsock;
- int rc;
- __u32 magic;
- __u32 peer_ip;
- int peer_port;
- int secure = (int)((long_ptr_t)arg);
-
- LASSERT (lnet_acceptor_state.pta_sock == NULL);
-
- snprintf(name, sizeof(name), "acceptor_%03d", accept_port);
- cfs_daemonize(name);
- cfs_block_allsigs();
-
- rc = libcfs_sock_listen(&lnet_acceptor_state.pta_sock,
- 0, accept_port, accept_backlog);
- if (rc != 0) {
- if (rc == -EADDRINUSE)
- LCONSOLE_ERROR_MSG(0x122, "Can't start acceptor on port"
- " %d: port already in use\n",
- accept_port);
- else
- LCONSOLE_ERROR_MSG(0x123, "Can't start acceptor on port "
- "%d: unexpected error %d\n",
- accept_port, rc);
-
- lnet_acceptor_state.pta_sock = NULL;
- } else {
- LCONSOLE(0, "Accept %s, port %d\n", accept, accept_port);
- }
-
- /* set init status and unblock parent */
- lnet_acceptor_state.pta_shutdown = rc;
- mutex_up(&lnet_acceptor_state.pta_signal);
-
- if (rc != 0)
- return rc;
-
- while (!lnet_acceptor_state.pta_shutdown) {
-
- rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock);
- if (rc != 0) {
- if (rc != -EAGAIN) {
- CWARN("Accept error %d: pausing...\n", rc);
- cfs_pause(cfs_time_seconds(1));
- }
- continue;
- }
-
- rc = libcfs_sock_getaddr(newsock, 1, &peer_ip, &peer_port);
- if (rc != 0) {
- CERROR("Can't determine new connection's address\n");
- goto failed;
- }
-
- if (secure && peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
- CERROR("Refusing connection from %u.%u.%u.%u: "
- "insecure port %d\n",
- HIPQUAD(peer_ip), peer_port);
- goto failed;
- }
-
- rc = libcfs_sock_read(newsock, &magic, sizeof(magic),
- accept_timeout);
- if (rc != 0) {
- CERROR("Error %d reading connection request from "
- "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
- goto failed;
- }
-
- rc = lnet_accept(newsock, magic);
- if (rc != 0)
- goto failed;
-
- continue;
-
- failed:
- libcfs_sock_release(newsock);
- }
-
- libcfs_sock_release(lnet_acceptor_state.pta_sock);
- lnet_acceptor_state.pta_sock = NULL;
-
- LCONSOLE(0, "Acceptor stopping\n");
-
- /* unblock lnet_acceptor_stop() */
- mutex_up(&lnet_acceptor_state.pta_signal);
- return 0;
-}
-
-int
-lnet_acceptor_start(void)
-{
- long pid;
- long secure;
-
- LASSERT (lnet_acceptor_state.pta_sock == NULL);
- init_mutex_locked(&lnet_acceptor_state.pta_signal);
-
- if (!strcmp(accept, "secure")) {
- secure = 1;
- } else if (!strcmp(accept, "all")) {
- secure = 0;
- } else if (!strcmp(accept, "none")) {
- return 0;
- } else {
- LCONSOLE_ERROR_MSG(0x124, "Can't parse 'accept=\"%s\"'\n",
- accept);
- return -EINVAL;
- }
-
- if (lnet_count_acceptor_nis() == 0) /* not required */
- return 0;
-
- pid = cfs_kernel_thread(lnet_acceptor, (void *)(ulong_ptr_t)secure, 0);
- if (pid < 0) {
- CERROR("Can't start acceptor thread: %ld\n", pid);
- return -ESRCH;
- }
-
- mutex_down(&lnet_acceptor_state.pta_signal); /* wait for acceptor to startup */
-
- if (!lnet_acceptor_state.pta_shutdown) {
- /* started OK */
- LASSERT (lnet_acceptor_state.pta_sock != NULL);
- return 0;
- }
-
- LASSERT (lnet_acceptor_state.pta_sock == NULL);
- return -ENETDOWN;
-}
-
-void
-lnet_acceptor_stop(void)
-{
- if (lnet_acceptor_state.pta_sock == NULL) /* not running */
- return;
-
- lnet_acceptor_state.pta_shutdown = 1;
- libcfs_sock_abort_accept(lnet_acceptor_state.pta_sock);
-
- /* block until acceptor signals exit */
- mutex_down(&lnet_acceptor_state.pta_signal);
-}
-
-#else /* __KERNEL__ */
-#ifdef HAVE_LIBPTHREAD
-
-static char *accept_type;
-static int accept_port = 988;
-static int accept_backlog;
-static int accept_timeout;
-
-struct {
- int pta_shutdown;
- int pta_sock;
- struct cfs_completion pta_completion;
-} lnet_acceptor_state;
-
-int
-lnet_acceptor_port(void)
-{
- return accept_port;
-}
-
-int
-lnet_parse_int_tunable(int *value, char *name, int dflt)
-{
- char *env = getenv(name);
- char *end;
-
- if (env == NULL) {
- *value = dflt;
- return 0;
- }
- *value = strtoull(env, &end, 0);
- if (*end == 0)
- return 0;
-
- CERROR("Can't parse tunable %s=%s\n", name, env);
- return -EINVAL;
-}
-
-int
-lnet_parse_string_tunable(char **value, char *name, char *dflt)
-{
- char *env = getenv(name);
-
- if (env == NULL)
- *value = dflt;
- else
- *value = env;
-
- return 0;
-}
-
-int
-lnet_acceptor_get_tunables()
-{
- int rc;
- rc = lnet_parse_string_tunable(&accept_type, "LNET_ACCEPT", "secure");
-
- if (rc != 0)
- return rc;
-
- rc = lnet_parse_int_tunable(&accept_port, "LNET_ACCEPT_PORT", 988);
-
- if (rc != 0)
- return rc;
-
- rc = lnet_parse_int_tunable(&accept_backlog, "LNET_ACCEPT_BACKLOG", 127);
-
- if (rc != 0)
- return rc;
-
- rc = lnet_parse_int_tunable(&accept_timeout, "LNET_ACCEPT_TIMEOUT", 5);
-
- if (rc != 0)
- return rc;
-
- CDEBUG(D_NET, "accept_type = %s\n", accept_type);
- CDEBUG(D_NET, "accept_port = %d\n", accept_port);
- CDEBUG(D_NET, "accept_backlog = %d\n", accept_backlog);
- CDEBUG(D_NET, "accept_timeout = %d\n", accept_timeout);
- return 0;
-}
-
-static inline int
-lnet_accept_magic(__u32 magic, __u32 constant)
-{
- return (magic == constant ||
- magic == __swab32(constant));
-}
-
-/* user-land lnet_accept() isn't used by any LND's directly. So, we don't
- * do it visible outside acceptor.c and we can change its prototype
- * freely */
-static int
-lnet_accept(int sock, __u32 magic, __u32 peer_ip, int peer_port)
-{
- int rc, flip;
- lnet_acceptor_connreq_t cr;
- lnet_ni_t *ni;
-
- if (!lnet_accept_magic(magic, LNET_PROTO_ACCEPTOR_MAGIC)) {
- LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u magic %08x: "
- "unsupported acceptor protocol\n",
- HIPQUAD(peer_ip), magic);
- return -EPROTO;
- }
-
- flip = (magic != LNET_PROTO_ACCEPTOR_MAGIC);
-
- rc = libcfs_sock_read(sock, &cr.acr_version,
- sizeof(cr.acr_version),
- accept_timeout);
- if (rc != 0) {
- CERROR("Error %d reading connection request version from "
- "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
- return -EIO;
- }
-
- if (flip)
- __swab32s(&cr.acr_version);
-
- if (cr.acr_version != LNET_PROTO_ACCEPTOR_VERSION)
- return -EPROTO;
-
- rc = libcfs_sock_read(sock, &cr.acr_nid,
- sizeof(cr) -
- offsetof(lnet_acceptor_connreq_t, acr_nid),
- accept_timeout);
- if (rc != 0) {
- CERROR("Error %d reading connection request from "
- "%u.%u.%u.%u\n", rc, HIPQUAD(peer_ip));
- return -EIO;
- }
-
- if (flip)
- __swab64s(&cr.acr_nid);
-
- ni = lnet_net2ni(LNET_NIDNET(cr.acr_nid));
-
- if (ni == NULL || /* no matching net */
- ni->ni_nid != cr.acr_nid) { /* right NET, wrong NID! */
- if (ni != NULL)
- lnet_ni_decref(ni);
- LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: "
- " No matching NI\n",
- HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid));
- return -EPERM;
- }
-
- if (ni->ni_lnd->lnd_accept == NULL) {
- lnet_ni_decref(ni);
- LCONSOLE_ERROR("Refusing connection from %u.%u.%u.%u for %s: "
- " NI doesn not accept IP connections\n",
- HIPQUAD(peer_ip), libcfs_nid2str(cr.acr_nid));
- return -EPERM;
- }
-
- CDEBUG(D_NET, "Accept %s from %u.%u.%u.%u\n",
- libcfs_nid2str(cr.acr_nid), HIPQUAD(peer_ip));
-
- rc = ni->ni_lnd->lnd_accept(ni, sock);
-
lnet_ni_decref(ni);
return rc;
}
lnet_acceptor(void *arg)
{
char name[16];
- int secure = (int)((unsigned long)arg);
+ cfs_socket_t *newsock;
int rc;
- int newsock;
+ __u32 magic;
__u32 peer_ip;
int peer_port;
- __u32 magic;
+ int secure = (int)((long_ptr_t)arg);
+
+ LASSERT (lnet_acceptor_state.pta_sock == NULL);
snprintf(name, sizeof(name), "acceptor_%03d", accept_port);
cfs_daemonize(name);
0, accept_port, accept_backlog);
if (rc != 0) {
if (rc == -EADDRINUSE)
- LCONSOLE_ERROR("Can't start acceptor on port %d: "
- "port already in use\n",
- accept_port);
+ LCONSOLE_ERROR_MSG(0x122, "Can't start acceptor on port"
+ " %d: port already in use\n",
+ accept_port);
else
- LCONSOLE_ERROR("Can't start acceptor on port %d: "
- "unexpected error %d\n",
- accept_port, rc);
+ LCONSOLE_ERROR_MSG(0x123, "Can't start acceptor on port "
+ "%d: unexpected error %d\n",
+ accept_port, rc);
+ lnet_acceptor_state.pta_sock = NULL;
} else {
+#ifdef __KERNEL__
+ LCONSOLE(0, "Accept %s, port %d\n", accept, accept_port);
+#else
LCONSOLE(0, "Accept %s, port %d\n", accept_type, accept_port);
+#endif
}
-
+
/* set init status and unblock parent */
lnet_acceptor_state.pta_shutdown = rc;
- cfs_complete(&lnet_acceptor_state.pta_completion);
+ cfs_complete(&lnet_acceptor_state.pta_signal);
if (rc != 0)
return rc;
while (!lnet_acceptor_state.pta_shutdown) {
- rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock,
- &peer_ip, &peer_port);
- if (rc != 0)
+ rc = libcfs_sock_accept(&newsock, lnet_acceptor_state.pta_sock);
+ if (rc != 0) {
+ if (rc != -EAGAIN) {
+ CWARN("Accept error %d: pausing...\n", rc);
+ cfs_pause(cfs_time_seconds(1));
+ }
continue;
+ }
/* maybe we're waken up with libcfs_sock_abort_accept() */
if (lnet_acceptor_state.pta_shutdown) {
break;
}
+ rc = libcfs_sock_getaddr(newsock, 1, &peer_ip, &peer_port);
+ if (rc != 0) {
+ CERROR("Can't determine new connection's address\n");
+ goto failed;
+ }
+
if (secure && peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
CERROR("Refusing connection from %u.%u.%u.%u: "
"insecure port %d\n",
goto failed;
}
- rc = lnet_accept(newsock, magic, peer_ip, peer_port);
+ rc = lnet_accept(newsock, magic);
if (rc != 0)
goto failed;
-
+
continue;
-
- failed:
+
+ failed:
libcfs_sock_release(newsock);
}
-
+
libcfs_sock_release(lnet_acceptor_state.pta_sock);
- LCONSOLE(0,"Acceptor stopping\n");
+ lnet_acceptor_state.pta_sock = NULL;
- /* unblock lnet_acceptor_stop() */
- cfs_complete(&lnet_acceptor_state.pta_completion);
+ LCONSOLE(0, "Acceptor stopping\n");
+ /* unblock lnet_acceptor_stop() */
+ cfs_complete(&lnet_acceptor_state.pta_signal);
return 0;
}
-static int skip_waiting_for_completion;
+static inline int
+accept2secure(const char *acc, long *sec)
+{
+ if (!strcmp(acc, "secure")) {
+ *sec = 1;
+ return 1;
+ } else if (!strcmp(acc, "all")) {
+ *sec = 0;
+ return 1;
+ } else if (!strcmp(acc, "none")) {
+ return 0;
+ } else {
+ LCONSOLE_ERROR_MSG(0x124, "Can't parse 'accept=\"%s\"'\n",
+ acc);
+ return -EINVAL;
+ }
+}
int
lnet_acceptor_start(void)
{
- long secure;
- int rc;
+ int rc;
+ long rc2;
+ long secure;
+
+ LASSERT (lnet_acceptor_state.pta_sock == NULL);
+#ifndef __KERNEL__
+ /* kernel version uses CFS_MODULE_PARM */
rc = lnet_acceptor_get_tunables();
if (rc != 0)
return rc;
/* Do nothing if we're liblustre clients */
if ((the_lnet.ln_pid & LNET_PID_USERFLAG) != 0)
return 0;
-
- cfs_init_completion(&lnet_acceptor_state.pta_completion);
-
- if (!strcmp(accept_type, "secure")) {
- secure = 1;
- } else if (!strcmp(accept_type, "all")) {
- secure = 0;
- } else if (!strcmp(accept_type, "none")) {
- skip_waiting_for_completion = 1;
- return 0;
- } else {
- LCONSOLE_ERROR ("Can't parse 'accept_type=\"%s\"'\n", accept_type);
- cfs_fini_completion(&lnet_acceptor_state.pta_completion);
- return -EINVAL;
+#endif
+ cfs_init_completion(&lnet_acceptor_state.pta_signal);
+
+#ifdef __KERNEL__
+ rc = accept2secure(accept, &secure);
+#else
+ rc = accept2secure(accept_type, &secure);
+#endif
+ if (rc <= 0) {
+ cfs_fini_completion(&lnet_acceptor_state.pta_signal);
+ return rc;
}
- if (lnet_count_acceptor_nis() == 0) { /* not required */
- skip_waiting_for_completion = 1;
+ if (lnet_count_acceptor_nis() == 0) /* not required */
return 0;
- }
- rc = cfs_create_thread(lnet_acceptor, (void *)secure);
- if (rc != 0) {
+ rc2 = cfs_create_thread(lnet_acceptor, (void *)(ulong_ptr_t)secure);
+ if (rc2 < 0) {
CERROR("Can't start acceptor thread: %d\n", rc);
- cfs_fini_completion(&lnet_acceptor_state.pta_completion);
- return rc;
+ cfs_fini_completion(&lnet_acceptor_state.pta_signal);
+ return -ESRCH;
}
/* wait for acceptor to startup */
- cfs_wait_for_completion(&lnet_acceptor_state.pta_completion);
+ cfs_wait_for_completion(&lnet_acceptor_state.pta_signal);
- if (!lnet_acceptor_state.pta_shutdown)
+ if (!lnet_acceptor_state.pta_shutdown) {
+ /* started OK */
+ LASSERT (lnet_acceptor_state.pta_sock != NULL);
return 0;
-
- cfs_fini_completion(&lnet_acceptor_state.pta_completion);
+ }
+
+ LASSERT (lnet_acceptor_state.pta_sock == NULL);
+ cfs_fini_completion(&lnet_acceptor_state.pta_signal);
return -ENETDOWN;
}
void
lnet_acceptor_stop(void)
{
- /* Do nothing if we're liblustre clients */
- if ((the_lnet.ln_pid & LNET_PID_USERFLAG) != 0)
+ if (lnet_acceptor_state.pta_sock == NULL) /* not running */
return;
- if (!skip_waiting_for_completion) {
- lnet_acceptor_state.pta_shutdown = 1;
- libcfs_sock_abort_accept(accept_port);
-
- /* block until acceptor signals exit */
- cfs_wait_for_completion(&lnet_acceptor_state.pta_completion);
- }
-
- cfs_fini_completion(&lnet_acceptor_state.pta_completion);
+ lnet_acceptor_state.pta_shutdown = 1;
+ libcfs_sock_abort_accept(lnet_acceptor_state.pta_sock);
+
+ /* block until acceptor signals exit */
+ cfs_wait_for_completion(&lnet_acceptor_state.pta_signal);
+
+ cfs_fini_completion(&lnet_acceptor_state.pta_signal);
}
-#else
+
+#else /* single-threaded user-space */
int
lnet_acceptor_start(void)
{
- return 0;
+ return 0;
}
void
lnet_acceptor_stop(void)
{
}
-#endif /* !HAVE_LIBPTHREAD */
-#endif /* !__KERNEL__ */
+#endif /* defined(__KERNEL__) || defined(HAVE_LIBPTHREAD) */
if (conn->uc_rx_flag && /* receiving is in progress */
cfs_time_aftereq(current_time, conn->uc_rx_deadline))
return 1;
-
+
return 0;
}
pthread_mutex_lock(&conn->uc_lock);
if (conn->uc_state != UC_DEAD)
usocklnd_conn_kill_locked(conn);
- pthread_mutex_unlock(&conn->uc_lock);
+ pthread_mutex_unlock(&conn->uc_lock);
}
/* Mark the conn as DEAD and schedule its deletion */
LIBCFS_ALLOC (pr, sizeof(*pr));
if (pr == NULL)
return NULL;
-
+
LIBCFS_ALLOC (conn, sizeof(*conn));
if (conn == NULL) {
LIBCFS_FREE (pr, sizeof(*pr));
LIBCFS_FREE (conn->uc_rx_hello,
offsetof(ksock_hello_msg_t,
kshm_ips[LNET_MAX_INTERFACES]));
-
+
LIBCFS_FREE (conn, sizeof(*conn));
}
lnet_process_id_t id;
int decref_flag = 0;
int killall_flag = 0;
-
+
if (peer == NULL) /* nothing to tear */
return;
-
+
pthread_mutex_lock(&peer->up_lock);
- pthread_mutex_lock(&conn->uc_lock);
+ pthread_mutex_lock(&conn->uc_lock);
ni = peer->up_ni;
id = peer->up_peerid;
if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {
/* change state not to finalize twice */
conn->uc_rx_state = UC_RX_KSM_HEADER;
- lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
+ lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);
}
-
+
usocklnd_destroy_txlist(peer->up_ni,
&conn->uc_tx_list);
if(conn->uc_errored && !peer->up_errored)
peer->up_errored = killall_flag = 1;
}
-
+
pthread_mutex_unlock(&conn->uc_lock);
if (killall_flag)
usocklnd_del_conns_locked(peer);
pthread_mutex_unlock(&peer->up_lock);
-
+
if (!decref_flag)
return;
usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id)
{
usock_peer_t *peer;
-
+
pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
peer = usocklnd_find_peer_locked(ni, id);
for (i = 0; i < N_CONN_TYPES; i++)
LASSERT (peer->up_conns[i] == NULL);
- list_del(&peer->up_list);
-
+ list_del(&peer->up_list);
+
if (peer->up_errored &&
(peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)
lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,
cfs_time_seconds(peer->up_last_alive));
-
+
usocklnd_peer_decref(peer);
}
/* Returns 0 on success, <0 else */
int
-usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp)
+usocklnd_create_passive_conn(lnet_ni_t *ni,
+ cfs_socket_t *sock, usock_conn_t **connp)
{
int rc;
__u32 peer_ip;
- __u16 peer_port;
+ int peer_port;
usock_conn_t *conn;
- rc = libcfs_getpeername(fd, &peer_ip, &peer_port);
+ rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
if (rc)
return rc;
- rc = usocklnd_set_sock_options(fd);
+ LASSERT (peer_port >= 0); /* uc_peer_port is u16 */
+
+ rc = usocklnd_set_sock_options(sock);
if (rc)
return rc;
return -ENOMEM;
usocklnd_rx_hellomagic_state_transition(conn);
-
- conn->uc_fd = fd;
+
+ conn->uc_sock = sock;
conn->uc_peer_ip = peer_ip;
conn->uc_peer_port = peer_port;
conn->uc_state = UC_RECEIVING_HELLO;
usock_conn_t **connp)
{
int rc;
- int fd;
+ cfs_socket_t *sock;
usock_conn_t *conn;
__u32 dst_ip = LNET_NIDADDR(peer->up_peerid.nid);
__u16 dst_port = lnet_acceptor_port();
-
+
conn = usocklnd_conn_allocate();
if (conn == NULL)
return -ENOMEM;
if (conn->uc_tx_hello == NULL) {
usocklnd_conn_free(conn);
return -ENOMEM;
- }
-
+ }
+
if (the_lnet.ln_pid & LNET_PID_USERFLAG)
- rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);
+ rc = usocklnd_connect_cli_mode(&sock, dst_ip, dst_port);
else
- rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);
-
+ rc = usocklnd_connect_srv_mode(&sock, dst_ip, dst_port);
+
if (rc) {
usocklnd_destroy_tx(NULL, conn->uc_tx_hello);
usocklnd_conn_free(conn);
return rc;
}
-
+
conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
- conn->uc_tx_flag = 1;
-
- conn->uc_fd = fd;
- conn->uc_peer_ip = dst_ip;
- conn->uc_peer_port = dst_port;
- conn->uc_type = type;
+ conn->uc_tx_flag = 1;
+
+ conn->uc_sock = sock;
+ conn->uc_peer_ip = dst_ip;
+ conn->uc_peer_port = dst_port;
+ conn->uc_type = type;
conn->uc_activeflag = 1;
- conn->uc_state = UC_CONNECTING;
- conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
- conn->uc_ni = NULL;
- conn->uc_peerid = peer->up_peerid;
- conn->uc_peer = peer;
+ conn->uc_state = UC_CONNECTING;
+ conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);
+ conn->uc_ni = NULL;
+ conn->uc_peerid = peer->up_peerid;
+ conn->uc_peer = peer;
+
usocklnd_peer_addref(peer);
CFS_INIT_LIST_HEAD (&conn->uc_tx_list);
CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);
/* Returns 0 on success, <0 else */
int
-usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
+usocklnd_connect_srv_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
{
- __u16 port;
- int fd;
- int rc;
+ __u16 port;
+ cfs_socket_t *sock;
+ int rc;
+ int fatal;
- for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
- port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
+ for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;
+ port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;
port--) {
/* Iterate through reserved ports. */
-
- rc = libcfs_sock_create(&fd);
- if (rc)
- return rc;
-
- rc = libcfs_sock_bind_to_port(fd, port);
+ rc = libcfs_sock_create(&sock, &fatal, 0, port);
if (rc) {
- close(fd);
+ if (fatal)
+ return rc;
continue;
}
- rc = usocklnd_set_sock_options(fd);
+ rc = usocklnd_set_sock_options(sock);
if (rc) {
- close(fd);
+ libcfs_sock_release(sock);
return rc;
}
- rc = libcfs_sock_connect(fd, dst_ip, dst_port);
+ rc = libcfs_sock_connect(sock, dst_ip, dst_port);
if (rc == 0) {
- *fdp = fd;
+ *sockp = sock;
return 0;
}
-
+
if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {
- close(fd);
+ libcfs_sock_release(sock);
return rc;
}
- close(fd);
+ libcfs_sock_release(sock);
}
CERROR("Can't bind to any reserved port\n");
/* Returns 0 on success, <0 else */
int
-usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port)
+usocklnd_connect_cli_mode(cfs_socket_t **sockp, __u32 dst_ip, __u16 dst_port)
{
- int fd;
- int rc;
+ cfs_socket_t *sock;
+ int rc;
+ int fatal;
- rc = libcfs_sock_create(&fd);
+ rc = libcfs_sock_create(&sock, &fatal, 0, 0);
if (rc)
return rc;
-
- rc = usocklnd_set_sock_options(fd);
+
+ rc = usocklnd_set_sock_options(sock);
if (rc) {
- close(fd);
+ libcfs_sock_release(sock);
return rc;
}
- rc = libcfs_sock_connect(fd, dst_ip, dst_port);
+ rc = libcfs_sock_connect(sock, dst_ip, dst_port);
if (rc) {
- close(fd);
+ libcfs_sock_release(sock);
return rc;
}
- *fdp = fd;
+ *sockp = sock;
return 0;
}
int
-usocklnd_set_sock_options(int fd)
+usocklnd_set_sock_options(cfs_socket_t *sock)
{
int rc;
- rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);
+ rc = libcfs_sock_set_nagle(sock, usock_tuns.ut_socknagle);
if (rc)
return rc;
if (usock_tuns.ut_sockbufsiz) {
- rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);
+ rc = libcfs_sock_set_bufsiz(sock, usock_tuns.ut_sockbufsiz);
if (rc)
- return rc;
+ return rc;
}
-
- return libcfs_fcntl_nonblock(fd);
+
+ return libcfs_fcntl_nonblock(sock);
}
usock_tx_t *
usocklnd_create_noop_tx(__u64 cookie)
{
usock_tx_t *tx;
-
+
LIBCFS_ALLOC (tx, sizeof(usock_tx_t));
if (tx == NULL)
return NULL;
socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
tx->tx_msg.ksm_zc_cookies[1] = cookie;
-
+
tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
tx->tx_iov = tx->tx_iova;
tx->tx_niov = 1;
-
+
return tx;
}
-
+
usock_tx_t *
usocklnd_create_tx(lnet_msg_t *lntmsg)
{
usock_tx_t *tx;
- unsigned int payload_niov = lntmsg->msg_niov;
- struct iovec *payload_iov = lntmsg->msg_iov;
+ unsigned int payload_niov = lntmsg->msg_niov;
+ struct iovec *payload_iov = lntmsg->msg_iov;
unsigned int payload_offset = lntmsg->msg_offset;
unsigned int payload_nob = lntmsg->msg_len;
int size = offsetof(usock_tx_t,
tx->tx_lnetmsg = lntmsg;
tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
-
+
socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
tx->tx_iov = tx->tx_iova;
- tx->tx_niov = 1 +
+ tx->tx_niov = 1 +
lnet_extract_iov(payload_niov, &tx->tx_iov[1],
payload_niov, payload_iov,
payload_offset, payload_nob);
hello->kshm_version = KSOCK_PROTO_V2;
hello->kshm_nips = 0;
hello->kshm_ctype = type;
-
+
hello->kshm_dst_incarnation = 0; /* not used */
hello->kshm_src_incarnation = net->un_incarnation;
hello = (ksock_hello_msg_t *)&tx->tx_iova[1];
usocklnd_init_hello_msg(hello, ni, type, peer_nid);
-
+
tx->tx_iova[0].iov_base = (void *)hello;
tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
offsetof(ksock_hello_msg_t, kshm_ips);
cr->acr_magic = LNET_PROTO_ACCEPTOR_MAGIC;
cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;
cr->acr_nid = peer_nid;
-
+
hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));
usocklnd_init_hello_msg(hello, ni, type, peer_nid);
-
+
tx->tx_iova[0].iov_base = (void *)cr;
tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
sizeof(lnet_acceptor_connreq_t) +
LASSERT (ni != NULL || lnetmsg == NULL);
LIBCFS_FREE (tx, tx->tx_size);
-
+
if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */
lnet_finalize(ni, lnetmsg, rc);
}
while (!list_empty(txlist)) {
tx = list_entry(txlist->next, usock_tx_t, tx_list);
list_del(&tx->tx_list);
-
+
usocklnd_destroy_tx(ni, tx);
}
}
while (!list_empty(zcack_list)) {
zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);
list_del(&zcack->zc_list);
-
+
LIBCFS_FREE (zcack, sizeof(*zcack));
}
}
LIBCFS_FREE (peer, sizeof (*peer));
pthread_mutex_lock(&net->un_lock);
- if(--net->un_peercount == 0)
+ if(--net->un_peercount == 0)
pthread_cond_signal(&net->un_cond);
pthread_mutex_unlock(&net->un_lock);
}
}
if (!list_empty(&conn->uc_tx_list)) {
- LASSERT (conn->uc_peer != NULL);
+ LASSERT (conn->uc_peer != NULL);
usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);
}
usocklnd_destroy_zcack_list(&conn->uc_zcack_list);
-
+
if (conn->uc_peer != NULL)
usocklnd_peer_decref(conn->uc_peer);
return SOCKLND_CONN_ANY;
nob = sizeof(ksock_msg_t) + lntmsg->msg_len;
-
+
if (nob >= usock_tuns.ut_min_bulk)
return SOCKLND_CONN_BULK_OUT;
else
peer->up_errored = 0;
peer->up_last_alive = 0;
cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */
- pthread_mutex_init(&peer->up_lock, NULL);
+ pthread_mutex_init(&peer->up_lock, NULL);
pthread_mutex_lock(&net->un_lock);
- net->un_peercount++;
+ net->un_peercount++;
pthread_mutex_unlock(&net->un_lock);
*peerp = peer;
rc = usocklnd_create_peer(ni, id, &peer);
if (rc)
return rc;
-
+
pthread_rwlock_wrlock(&usock_data.ud_peers_lock);
peer2 = usocklnd_find_peer_locked(ni, id);
if (peer2 == NULL) {
CERROR("Can't create peer: network shutdown\n");
return -ESHUTDOWN;
}
-
+
/* peer table will take 1 of my refs on peer */
usocklnd_peer_addref(peer);
list_add_tail (&peer->up_list,
peer = peer2;
}
pthread_rwlock_unlock(&usock_data.ud_peers_lock);
-
- find_or_create_peer_done:
+
+ find_or_create_peer_done:
*peerp = peer;
return 0;
}
/* NB: both peer and conn locks are held */
static int
usocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack)
-{
+{
if (conn->uc_state == UC_READY &&
list_empty(&conn->uc_tx_list) &&
list_empty(&conn->uc_zcack_list) &&
POLLOUT);
if (rc != 0)
return rc;
- }
+ }
list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);
return 0;
static void
usocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,
int *send_immediately)
-{
+{
if (conn->uc_state == UC_READY &&
list_empty(&conn->uc_tx_list) &&
list_empty(&conn->uc_zcack_list) &&
conn->uc_sending = 1;
*send_immediately = 1;
return;
- }
+ }
*send_immediately = 0;
list_add_tail(&tx->tx_list, &conn->uc_tx_list);
int idx;
int rc;
lnet_pid_t userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;
-
+
if (userflag)
type = SOCKLND_CONN_ANY;
idx = usocklnd_type2idx(type);
-
+
pthread_mutex_lock(&peer->up_lock);
if (peer->up_conns[idx] != NULL) {
conn = peer->up_conns[idx];
rc = -EHOSTUNREACH;
goto find_or_create_conn_failed;
}
-
+
rc = usocklnd_create_active_conn(peer, type, &conn);
if (rc) {
peer->up_errored = 1;
/* peer takes 1 of conn refcount */
usocklnd_link_conn_to_peer(conn, peer, idx);
-
+
rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);
if (rc) {
peer->up_conns[idx] = NULL;
}
usocklnd_wakeup_pollthread(conn->uc_pt_idx);
}
-
+
pthread_mutex_lock(&conn->uc_lock);
LASSERT(conn->uc_peer == peer);
if (tx != NULL) {
usocklnd_enqueue_tx(conn, tx, send_immediately);
} else {
- rc = usocklnd_enqueue_zcack(conn, zc_ack);
+ rc = usocklnd_enqueue_zcack(conn, zc_ack);
if (rc != 0) {
usocklnd_conn_kill_locked(conn);
pthread_mutex_unlock(&conn->uc_lock);
goto find_or_create_conn_failed;
}
}
- pthread_mutex_unlock(&conn->uc_lock);
+ pthread_mutex_unlock(&conn->uc_lock);
usocklnd_conn_addref(conn);
pthread_mutex_unlock(&peer->up_lock);
void
usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx)
{
- peer->up_conns[idx] = conn;
+ peer->up_conns[idx] = conn;
peer->up_errored = 0; /* this new fresh conn will try
* revitalize even stale errored peer */
}
usock_conn_t *skip_conn)
{
int i;
-
+
if (!peer->up_incrn_is_set) {
peer->up_incarnation = incrn;
peer->up_incrn_is_set = 1;
return;
peer->up_incarnation = incrn;
-
+
for (i = 0; i < N_CONN_TYPES; i++) {
usock_conn_t *conn = peer->up_conns[i];
-
+
if (conn == NULL || conn == skip_conn)
continue;
- pthread_mutex_lock(&conn->uc_lock);
+ pthread_mutex_lock(&conn->uc_lock);
LASSERT (conn->uc_peer == peer);
conn->uc_peer = NULL;
peer->up_conns[i] = NULL;
if (conn->uc_state != UC_DEAD)
- usocklnd_conn_kill_locked(conn);
+ usocklnd_conn_kill_locked(conn);
pthread_mutex_unlock(&conn->uc_lock);
usocklnd_conn_decref(conn);
conn->uc_rx_nob_wanted =
conn->uc_rx_nob_left =
sizeof(conn->uc_rx_hello->kshm_version);
-
+
conn->uc_rx_state = UC_RX_HELLO_VERSION;
}
conn->uc_rx_nob_left =
offsetof(ksock_hello_msg_t, kshm_ips) -
offsetof(ksock_hello_msg_t, kshm_src_nid);
-
+
conn->uc_rx_state = UC_RX_HELLO_BODY;
}
conn->uc_rx_nob_left =
conn->uc_rx_hello->kshm_nips *
sizeof(conn->uc_rx_hello->kshm_ips[0]);
-
+
conn->uc_rx_state = UC_RX_HELLO_IPS;
}
{
conn->uc_rx_niov = 1;
conn->uc_rx_iov = conn->uc_rx_iova;
- conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
+ conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;
conn->uc_rx_iov[0].iov_len =
conn->uc_rx_nob_wanted =
conn->uc_rx_nob_left =
sizeof(ksock_lnet_msg_t);
-
+
conn->uc_rx_state = UC_RX_LNET_HEADER;
conn->uc_rx_flag = 1;
}
{
conn->uc_rx_niov = 1;
conn->uc_rx_iov = conn->uc_rx_iova;
- conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
+ conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;
conn->uc_rx_iov[0].iov_len =
conn->uc_rx_nob_wanted =
- conn->uc_rx_nob_left =
+ conn->uc_rx_nob_left =
offsetof(ksock_msg_t, ksm_u);
-
+
conn->uc_rx_state = UC_RX_KSM_HEADER;
conn->uc_rx_flag = 0;
}
unsigned int niov = 0;
int skipped = 0;
int nob_to_skip = conn->uc_rx_nob_left;
-
+
LASSERT(nob_to_skip != 0);
conn->uc_rx_iov = conn->uc_rx_iova;
if (conn->uc_state == UC_CONNECTING ||
conn->uc_state == UC_SENDING_HELLO)
usocklnd_conn_kill_locked(conn);
-
- pthread_mutex_unlock(&conn->uc_lock);
+
+ pthread_mutex_unlock(&conn->uc_lock);
}
int
rc = 0;
pthread_mutex_lock(&conn->uc_lock);
state = conn->uc_state;
-
+
/* process special case: LNET calls lnd_recv() asyncronously */
if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) {
/* still don't have usocklnd_recv() called */
* 2) usocklnd_shutdown() can change uc_state to UC_DEAD */
switch (state) {
-
+
case UC_RECEIVING_HELLO:
case UC_READY:
if (conn->uc_rx_nob_wanted != 0) {
usocklnd_conn_kill(conn);
break;
}
-
+
if (continue_reading)
goto read_again;
default:
LBUG();
}
-
+
return rc;
}
{
int rc = 0;
__u64 cookie;
-
+
*cont_flag = 0;
/* smth. new emerged in RX part - let's process it */
__swab32s(&conn->uc_rx_msg.ksm_csum);
__swab64s(&conn->uc_rx_msg.ksm_zc_cookies[0]);
__swab64s(&conn->uc_rx_msg.ksm_zc_cookies[1]);
- }
+ }
/* we never send packets for wich zc-acking is required */
if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
usocklnd_rx_lnethdr_state_transition(conn);
*cont_flag = 1;
break;
-
+
case UC_RX_LNET_HEADER:
if (the_lnet.ln_pid & LNET_PID_USERFLAG) {
/* replace dest_nid,pid (ksocknal sets its own) */
cpu_to_le64(conn->uc_peer->up_ni->ni_nid);
conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid =
cpu_to_le32(the_lnet.ln_pid);
-
- } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) {
+
+ } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) {
/* Userspace peer */
lnet_process_id_t *id = &conn->uc_peer->up_peerid;
lnet_hdr_t *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr;
-
+
/* Substitute process ID assigned at connection time */
lhdr->src_pid = cpu_to_le32(id->pid);
lhdr->src_nid = cpu_to_le64(id->nid);
}
-
+
conn->uc_rx_state = UC_RX_PARSE;
usocklnd_conn_addref(conn); /* ++ref while parsing */
-
- rc = lnet_parse(conn->uc_peer->up_ni,
- &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr,
+
+ rc = lnet_parse(conn->uc_peer->up_ni,
+ &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr,
conn->uc_peerid.nid, conn, 0);
-
+
if (rc < 0) {
/* I just received garbage: give up on this conn */
conn->uc_errored = 1;
pthread_mutex_lock(&conn->uc_lock);
LASSERT (conn->uc_rx_state == UC_RX_PARSE ||
conn->uc_rx_state == UC_RX_LNET_PAYLOAD);
-
+
/* check whether usocklnd_recv() got called */
if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD)
*cont_flag = 1;
pthread_mutex_unlock(&conn->uc_lock);
break;
-
+
case UC_RX_PARSE:
LBUG(); /* it's error to be here, because this special
* case is handled by caller */
break;
-
+
case UC_RX_PARSE_WAIT:
LBUG(); /* it's error to be here, because the conn
* shouldn't wait for POLLIN event in this
* state */
break;
-
+
case UC_RX_LNET_PAYLOAD:
/* payload all received */
cookie = conn->uc_rx_msg.ksm_zc_cookies[0];
if (cookie != 0)
rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
-
+
if (rc != 0) {
/* change state not to finalize twice */
conn->uc_rx_state = UC_RX_KSM_HEADER;
return -EPROTO;
}
-
+
/* Fall through */
-
+
case UC_RX_SKIPPING:
if (conn->uc_rx_nob_left != 0) {
usocklnd_rx_skipping_state_transition(conn);
if (zc_ack == NULL)
return -ENOMEM;
zc_ack->zc_cookie = cookie;
-
+
/* Let's assume that CONTROL is the best type for zcack,
* but userspace clients don't use typed connections */
if (the_lnet.ln_pid & LNET_PID_USERFLAG)
type = SOCKLND_CONN_ANY;
else
- type = SOCKLND_CONN_CONTROL;
+ type = SOCKLND_CONN_CONTROL;
rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack,
&dummy);
{
int rc = 0;
ksock_hello_msg_t *hello = conn->uc_rx_hello;
-
+
*cont_flag = 0;
-
+
/* smth. new emerged in hello - let's process it */
switch (conn->uc_rx_state) {
case UC_RX_HELLO_MAGIC:
usocklnd_rx_helloversion_state_transition(conn);
*cont_flag = 1;
- break;
+ break;
case UC_RX_HELLO_VERSION:
if ((!conn->uc_flip &&
usocklnd_rx_hellobody_state_transition(conn);
*cont_flag = 1;
break;
-
+
case UC_RX_HELLO_BODY:
if (conn->uc_flip) {
ksock_hello_msg_t *hello = conn->uc_rx_hello;
HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port);
return -EPROTO;
}
-
- if (conn->uc_rx_hello->kshm_nips) {
+
+ if (conn->uc_rx_hello->kshm_nips) {
usocklnd_rx_helloIPs_state_transition(conn);
*cont_flag = 1;
break;
rc = usocklnd_activeconn_hellorecv(conn);
else /* passive conn */
rc = usocklnd_passiveconn_hellorecv(conn);
-
- break;
+
+ break;
default:
LBUG(); /* unknown state */
}
peer->up_last_alive = cfs_time_current();
-
+
/* peer says that we lost the race */
if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
/* Start new active conn, relink txs and zc_acks from
* Actually, we're expecting that a passive conn will
* make us zombie soon and take care of our txs and
* zc_acks */
-
+
struct list_head tx_list, zcack_list;
usock_conn_t *conn2;
int idx = usocklnd_type2idx(conn->uc_type);
pthread_mutex_unlock(&peer->up_lock);
return 0;
}
-
+
LASSERT (peer == conn->uc_peer);
LASSERT (peer->up_conns[idx] == conn);
pthread_mutex_unlock(&peer->up_lock);
return rc;
}
-
+
usocklnd_link_conn_to_peer(conn2, peer, idx);
conn2->uc_peer = peer;
-
+
/* unlink txs and zcack from the conn */
list_add(&tx_list, &conn->uc_tx_list);
list_del_init(&conn->uc_tx_list);
list_add(&zcack_list, &conn->uc_zcack_list);
list_del_init(&conn->uc_zcack_list);
-
+
/* link they to the conn2 */
list_add(&conn2->uc_tx_list, &tx_list);
list_del_init(&tx_list);
list_add(&conn2->uc_zcack_list, &zcack_list);
list_del_init(&zcack_list);
-
+
/* make conn zombie */
conn->uc_peer = NULL;
usocklnd_peer_decref(peer);
} else {
usocklnd_conn_kill_locked(conn);
}
-
- pthread_mutex_unlock(&conn->uc_lock);
+
+ pthread_mutex_unlock(&conn->uc_lock);
pthread_mutex_unlock(&peer->up_lock);
usocklnd_conn_decref(conn);
-
+
} else { /* hello->kshm_ctype != SOCKLND_CONN_NONE */
if (conn->uc_type != usocklnd_invert_type(hello->kshm_ctype))
return -EPROTO;
usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation,
conn);
pthread_mutex_unlock(&peer->up_lock);
-
+
/* safely transit to UC_READY state */
/* rc == 0 */
pthread_mutex_lock(&conn->uc_lock);
LASSERT (conn->uc_sending == 0);
if ( !list_empty(&conn->uc_tx_list) ||
!list_empty(&conn->uc_zcack_list) ) {
-
+
conn->uc_tx_deadline =
cfs_time_shift(usock_tuns.ut_timeout);
conn->uc_tx_flag = 1;
if (rc == 0)
conn->uc_state = UC_READY;
}
- pthread_mutex_unlock(&conn->uc_lock);
+ pthread_mutex_unlock(&conn->uc_lock);
}
return rc;
/* don't know parent peer yet and not zombie */
LASSERT (conn->uc_peer == NULL &&
ni != NULL);
-
+
/* don't know peer's nid and incarnation yet */
if (peer_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
/* do not trust liblustre clients */
peer_ip);
if (hello->kshm_ctype != SOCKLND_CONN_ANY) {
lnet_ni_decref(ni);
- conn->uc_ni = NULL;
+ conn->uc_ni = NULL;
CERROR("Refusing to accept connection of type=%d from "
"userspace process %u.%u.%u.%u:%d\n", hello->kshm_ctype,
HIPQUAD(peer_ip), peer_port);
conn->uc_peerid.nid = hello->kshm_src_nid;
}
conn->uc_type = type = usocklnd_invert_type(hello->kshm_ctype);
-
+
rc = usocklnd_find_or_create_peer(ni, conn->uc_peerid, &peer);
if (rc) {
lnet_ni_decref(ni);
}
peer->up_last_alive = cfs_time_current();
-
+
idx = usocklnd_type2idx(conn->uc_type);
/* safely check whether we're first */
pthread_mutex_lock(&peer->up_lock);
usocklnd_cleanup_stale_conns(peer, hello->kshm_src_incarnation, NULL);
-
+
if (peer->up_conns[idx] == NULL) {
peer->up_last_alive = cfs_time_current();
conn->uc_peer = peer;
usocklnd_link_conn_to_peer(conn, peer, idx);
usocklnd_conn_addref(conn);
} else {
- usocklnd_peer_decref(peer);
+ usocklnd_peer_decref(peer);
/* Resolve race in favour of higher NID */
if (conn->uc_peerid.nid < conn->uc_ni->ni_nid) {
/* if conn->uc_peerid.nid > conn->uc_ni->ni_nid,
* postpone race resolution till READY state
* (hopefully that conn[idx] will die because of
- * incoming hello of CONN_NONE type) */
+ * incoming hello of CONN_NONE type) */
}
pthread_mutex_unlock(&peer->up_lock);
conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);
conn->uc_tx_flag = 1;
rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLOUT);
-
+
passive_hellorecv_done:
pthread_mutex_unlock(&conn->uc_lock);
- return rc;
+ return rc;
}
int
int state;
usock_peer_t *peer;
lnet_ni_t *ni;
-
+
pthread_mutex_lock(&conn->uc_lock); /* like membar */
state = conn->uc_state;
pthread_mutex_unlock(&conn->uc_lock);
rc = usocklnd_activeconn_hellosent(conn);
else /* passive conn */
rc = usocklnd_passiveconn_hellosent(conn);
-
- break;
+
+ break;
case UC_READY:
pthread_mutex_lock(&conn->uc_lock);
LASSERT(usock_tuns.ut_fair_limit > 1);
pthread_mutex_unlock(&conn->uc_lock);
return 0;
- }
+ }
tx = usocklnd_try_piggyback(&conn->uc_tx_list,
&conn->uc_zcack_list);
if (rc)
break;
-
+
rc = usocklnd_send_tx(conn, tx);
if (rc == 0) { /* partial send or connection closed */
pthread_mutex_lock(&conn->uc_lock);
rc = ret;
}
pthread_mutex_unlock(&conn->uc_lock);
-
+
break;
case UC_DEAD:
if (rc < 0)
usocklnd_conn_kill(conn);
-
+
return rc;
}
tx->tx_resid != tx->tx_nob)
return tx;
}
-
+
if (list_empty(zcack_list_p)) {
/* nothing to piggyback */
return tx;
zc_ack = list_entry(zcack_list_p->next,
usock_zc_ack_t, zc_list);
list_del(&zc_ack->zc_list);
- }
-
+ }
+
if (tx != NULL)
/* piggyback the zc-ack cookie */
tx->tx_msg.ksm_zc_cookies[1] = zc_ack->zc_cookie;
else
/* cannot piggyback, need noop */
- tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);
-
+ tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);
+
LIBCFS_FREE (zc_ack, sizeof(*zc_ack));
return tx;
}
usocklnd_activeconn_hellosent(usock_conn_t *conn)
{
int rc = 0;
-
+
pthread_mutex_lock(&conn->uc_lock);
if (conn->uc_state != UC_DEAD) {
/* conn->uc_peer == NULL, so the conn isn't accessible via
* peer hash list, so nobody can touch the conn but us */
-
+
if (conn->uc_ni == NULL) /* remove zombie conn */
goto passive_hellosent_connkill;
-
+
/* all code below is race resolution, because normally
* passive conn is linked to peer just after receiving hello */
CFS_INIT_LIST_HEAD (&tx_list);
CFS_INIT_LIST_HEAD (&zcack_list);
-
+
/* conn is passive and isn't linked to any peer,
so its tx and zc_ack lists have to be empty */
LASSERT (list_empty(&conn->uc_tx_list) &&
if (rc)
return rc;
- idx = usocklnd_type2idx(conn->uc_type);
+ idx = usocklnd_type2idx(conn->uc_type);
/* try to link conn to peer */
- pthread_mutex_lock(&peer->up_lock);
+ pthread_mutex_lock(&peer->up_lock);
if (peer->up_conns[idx] == NULL) {
usocklnd_link_conn_to_peer(conn, peer, idx);
usocklnd_conn_addref(conn);
list_del_init(&conn2->uc_tx_list);
list_add(&zcack_list, &conn2->uc_zcack_list);
list_del_init(&conn2->uc_zcack_list);
-
+
pthread_mutex_lock(&conn->uc_lock);
list_add_tail(&conn->uc_tx_list, &tx_list);
list_del_init(&tx_list);
list_del_init(&zcack_list);
conn->uc_peer = peer;
pthread_mutex_unlock(&conn->uc_lock);
-
+
conn2->uc_peer = NULL; /* make conn2 zombie */
pthread_mutex_unlock(&conn2->uc_lock);
usocklnd_conn_decref(conn2);
pthread_mutex_unlock(&peer->up_lock);
usocklnd_peer_decref(peer);
- passive_hellosent_done:
+ passive_hellosent_done:
/* safely transit to UC_READY state */
/* rc == 0 */
pthread_mutex_lock(&conn->uc_lock);
{
struct iovec *iov;
int nob;
- int fd = conn->uc_fd;
cfs_time_t t;
-
+
LASSERT (tx->tx_resid != 0);
do {
usock_peer_t *peer = conn->uc_peer;
LASSERT (tx->tx_niov > 0);
-
- nob = libcfs_sock_writev(fd, tx->tx_iov, tx->tx_niov);
+
+ nob = libcfs_sock_writev(conn->uc_sock,
+ tx->tx_iov, tx->tx_niov);
if (nob < 0)
conn->uc_errored = 1;
if (nob <= 0) /* write queue is flow-controlled or error */
return nob;
-
- LASSERT (nob <= tx->tx_resid);
+
+ LASSERT (nob <= tx->tx_resid);
tx->tx_resid -= nob;
t = cfs_time_current();
conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));
if(peer != NULL)
peer->up_last_alive = t;
- /* "consume" iov */
+ /* "consume" iov */
iov = tx->tx_iov;
- do {
- LASSERT (tx->tx_niov > 0);
-
- if (nob < iov->iov_len) {
+ do {
+ LASSERT (tx->tx_niov > 0);
+
+ if (nob < iov->iov_len) {
iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
- iov->iov_len -= nob;
- break;
- }
+ iov->iov_len -= nob;
+ break;
+ }
- nob -= iov->iov_len;
- tx->tx_iov = ++iov;
- tx->tx_niov--;
+ nob -= iov->iov_len;
+ tx->tx_iov = ++iov;
+ tx->tx_niov--;
} while (nob != 0);
-
+
} while (tx->tx_resid != 0);
return 1; /* send complete */
do {
usock_peer_t *peer = conn->uc_peer;
-
+
LASSERT (conn->uc_rx_niov > 0);
-
- nob = libcfs_sock_readv(conn->uc_fd, conn->uc_rx_iov, conn->uc_rx_niov);
+
+ nob = libcfs_sock_readv(conn->uc_sock,
+ conn->uc_rx_iov, conn->uc_rx_niov);
if (nob <= 0) {/* read nothing or error */
conn->uc_errored = 1;
return nob;
}
-
- LASSERT (nob <= conn->uc_rx_nob_wanted);
+
+ LASSERT (nob <= conn->uc_rx_nob_wanted);
conn->uc_rx_nob_wanted -= nob;
conn->uc_rx_nob_left -= nob;
t = cfs_time_current();
if(peer != NULL)
peer->up_last_alive = t;
-
- /* "consume" iov */
+
+ /* "consume" iov */
iov = conn->uc_rx_iov;
- do {
- LASSERT (conn->uc_rx_niov > 0);
-
- if (nob < iov->iov_len) {
- iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
- iov->iov_len -= nob;
- break;
- }
-
- nob -= iov->iov_len;
+ do {
+ LASSERT (conn->uc_rx_niov > 0);
+
+ if (nob < iov->iov_len) {
+ iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
+ iov->iov_len -= nob;
+ break;
+ }
+
+ nob -= iov->iov_len;
conn->uc_rx_iov = ++iov;
- conn->uc_rx_niov--;
+ conn->uc_rx_niov--;
} while (nob != 0);
-
+
} while (conn->uc_rx_nob_wanted != 0);
return 1; /* read complete */
usocklnd_process_stale_list(usock_pollthread_t *pt_data)
{
while (!list_empty(&pt_data->upt_stale_list)) {
- usock_conn_t *conn;
+ usock_conn_t *conn;
conn = list_entry(pt_data->upt_stale_list.next,
usock_conn_t, uc_stale_list);
-
+
list_del(&conn->uc_stale_list);
-
+
usocklnd_tear_peer_conn(conn);
usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
}
sigset_t sigs;
sigfillset (&sigs);
pthread_sigmask (SIG_SETMASK, &sigs, 0);
-
+
LASSERT(pt_data != NULL);
-
+
planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
saved_nfds = pt_data->upt_nfds;
idx_start = 1;
-
+
/* Main loop */
while (usock_data.ud_shutdown == 0) {
rc = 0;
usock_pollrequest_t *pr;
pr = list_entry(pt_data->upt_pollrequests.next,
usock_pollrequest_t, upr_list);
-
+
list_del(&pr->upr_list);
rc = usocklnd_process_pollrequest(pr, pt_data);
if (rc)
- break;
+ break;
}
pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
/* Delete conns orphaned due to POLL_DEL_REQUESTs */
usocklnd_process_stale_list(pt_data);
-
+
/* Actual polling for events */
rc = poll(pt_data->upt_pollfd,
pt_data->upt_nfds,
extra = 0;
}
- times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;
+ times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;
idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
for (idx = idx_start; idx < idx_finish; idx++) {
pthread_mutex_unlock(&conn->uc_lock);
}
- if (idx_finish == pt_data->upt_nfds) {
+ if (idx_finish == pt_data->upt_nfds) {
chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
saved_nfds = pt_data->upt_nfds;
idx_start = 1;
else {
idx_start = idx_finish;
}
-
+
planned_time = cfs_time_add(current_time,
cfs_time_seconds(usock_tuns.ut_poll_timeout));
}
-
+
/* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
LASSERT (rc != 0 || pt_data->upt_nfds == 1);
/* Block new poll requests to be enqueued */
pt_data->upt_errno = rc;
-
+
while (!list_empty(&pt_data->upt_pollrequests)) {
usock_pollrequest_t *pr;
pr = list_entry(pt_data->upt_pollrequests.next,
usock_pollrequest_t, upr_list);
-
+
list_del(&pr->upr_list);
if (pr->upr_type == POLL_ADD_REQUEST) {
- close(pr->upr_conn->uc_fd);
+ libcfs_sock_release(pr->upr_conn->uc_sock);
list_add_tail(&pr->upr_conn->uc_stale_list,
&pt_data->upt_stale_list);
} else {
usocklnd_conn_decref(pr->upr_conn);
}
-
+
LIBCFS_FREE (pr, sizeof(*pr));
}
pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
usocklnd_process_stale_list(pt_data);
-
+
for (idx = 1; idx < pt_data->upt_nfds; idx++) {
usock_conn_t *conn = pt_data->upt_idx2conn[idx];
LASSERT(conn != NULL);
- close(conn->uc_fd);
+ libcfs_sock_release(conn->uc_sock);
usocklnd_tear_peer_conn(conn);
usocklnd_conn_decref(conn);
}
}
-
+
/* unblock usocklnd_shutdown() */
cfs_complete(&pt_data->upt_completion);
pr->upr_value = value;
usocklnd_conn_addref(conn); /* +1 for poll request */
-
+
pthread_mutex_lock(&pt->upt_pollrequests_lock);
if (pt->upt_errno) { /* very rare case: errored poll thread */
LIBCFS_FREE(pr, sizeof(*pr));
return rc;
}
-
+
list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
pthread_mutex_unlock(&pt->upt_pollrequests_lock);
return 0;
int pt_idx = conn->uc_pt_idx;
usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
usock_pollrequest_t *pr = conn->uc_preq;
-
+
/* Use preallocated poll request because there is no good
* workaround for ENOMEM error while killing connection */
if (pr) {
pr->upr_value = 0;
usocklnd_conn_addref(conn); /* +1 for poll request */
-
+
pthread_mutex_lock(&pt->upt_pollrequests_lock);
-
+
if (pt->upt_errno) { /* very rare case: errored poll thread */
pthread_mutex_unlock(&pt->upt_pollrequests_lock);
usocklnd_conn_decref(conn);
int *fd2idx = pt_data->upt_fd2idx;
usock_conn_t **idx2conn = pt_data->upt_idx2conn;
int *skip = pt_data->upt_skip;
-
+
LASSERT(conn != NULL);
- LASSERT(conn->uc_fd >=0);
+ LASSERT(conn->uc_sock != NULL);
LASSERT(type == POLL_ADD_REQUEST ||
- conn->uc_fd < pt_data->upt_nfd2idx);
+ LIBCFS_SOCK2FD(conn->uc_sock) < pt_data->upt_nfd2idx);
if (type != POLL_ADD_REQUEST) {
- idx = fd2idx[conn->uc_fd];
+ idx = fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)];
if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
- LASSERT(pollfd[idx].fd == conn->uc_fd);
+ LASSERT(pollfd[idx].fd ==
+ LIBCFS_SOCK2FD(conn->uc_sock));
} else { /* unlikely */
CWARN("Very unlikely event happend: trying to"
" handle poll request of type %d but idx=%d"
}
LIBCFS_FREE (pr, sizeof(*pr));
-
+
switch (type) {
case POLL_ADD_REQUEST:
if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
if (new_pollfd == NULL)
goto process_pollrequest_enomem;
pt_data->upt_pollfd = pollfd = new_pollfd;
-
+
new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
sizeof(usock_conn_t *));
if (new_idx2conn == NULL)
if (new_skip == NULL)
goto process_pollrequest_enomem;
pt_data->upt_skip = new_skip;
-
+
pt_data->upt_npollfd = new_npollfd;
}
- if (conn->uc_fd >= pt_data->upt_nfd2idx) {
+ if (LIBCFS_SOCK2FD(conn->uc_sock) >= pt_data->upt_nfd2idx) {
/* resize fd2idx[] */
int *new_fd2idx;
int new_nfd2idx = pt_data->upt_nfd2idx * 2;
- while (new_nfd2idx <= conn->uc_fd)
+ while (new_nfd2idx <= LIBCFS_SOCK2FD(conn->uc_sock))
new_nfd2idx *= 2;
new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
pt_data->upt_nfd2idx = new_nfd2idx;
}
- LASSERT(fd2idx[conn->uc_fd] == 0);
+ LASSERT(fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] == 0);
idx = pt_data->upt_nfds++;
idx2conn[idx] = conn;
- fd2idx[conn->uc_fd] = idx;
+ fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = idx;
- pollfd[idx].fd = conn->uc_fd;
+ pollfd[idx].fd = LIBCFS_SOCK2FD(conn->uc_sock);
pollfd[idx].events = value;
pollfd[idx].revents = 0;
break;
case POLL_DEL_REQUEST:
- fd2idx[conn->uc_fd] = 0; /* invalidate this entry */
-
+ fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = 0; /* invalidate this
+ * entry */
--pt_data->upt_nfds;
if (idx != pt_data->upt_nfds) {
/* shift last entry into released position */
memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
sizeof(struct pollfd));
idx2conn[idx] = idx2conn[pt_data->upt_nfds];
- fd2idx[pollfd[idx].fd] = idx;
+ fd2idx[pollfd[idx].fd] = idx;
}
- close(conn->uc_fd);
+ libcfs_sock_release(conn->uc_sock);
list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list);
break;
case POLL_RX_SET_REQUEST:
pollfd[idx].events = value;
break;
default:
- LBUG(); /* unknown type */
+ LBUG(); /* unknown type */
}
/* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
* reference that poll request possesses */
if (type != POLL_ADD_REQUEST)
usocklnd_conn_decref(conn);
-
+
return 0;
process_pollrequest_enomem:
for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
int prev = 0;
int i = skip[0];
-
+
if (i >= nfds) /* nothing ready */
break;
-
+
do {
usock_conn_t *conn = idx2conn[i];
int next;
-
+
if (j == 0) /* first pass... */
next = skip[i] = i+1; /* set skip chain */
else /* later passes... */
else
usocklnd_exception_handler(conn);
}
-
+
if ((pollfd[i].revents & POLLIN) != 0 &&
usocklnd_read_handler(conn) <= 0)
pollfd[i].revents &= ~POLLIN;
-
+
if ((pollfd[i].revents & POLLOUT) != 0 &&
usocklnd_write_handler(conn) <= 0)
pollfd[i].revents &= ~POLLOUT;
-
+
if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
skip[prev] = next; /* skip this entry next pass */
else
prev = i;
-
+
i = next;
} while (i < nfds);
}
const int n = 4;
const int p = usock_tuns.ut_poll_timeout;
int chunk = num;
-
+
/* chunk should be big enough to detect a timeout on any
* connection within (n+1)/n times the timeout interval
* if we checks every 'p' seconds 'chunk' conns */
-
+
if (usock_tuns.ut_timeout > n * p)
chunk = (chunk * n * p) / usock_tuns.ut_timeout;
-
+
if (chunk == 0)
chunk = 1;
int notification = 0;
int rc;
- rc = syscall(SYS_write, pt->upt_notifier_fd, ¬ification,
- sizeof(notification));
+ rc = syscall(SYS_write, LIBCFS_SOCK2FD(pt->upt_notifier[0]),
+ ¬ification, sizeof(notification));
if (rc != sizeof(notification))
CERROR("Very unlikely event happend: "
usock_tuns.ut_timeout);
return -1;
}
-
+
if (usock_tuns.ut_poll_timeout <= 0 ||
usock_tuns.ut_poll_timeout > MAX_REASONABLE_TIMEOUT) {
CERROR("USOCK_POLL_TIMEOUT: %d is out of reasonable limits\n",
usock_tuns.ut_fair_limit);
return -1;
}
-
+
if (usock_tuns.ut_npollthreads < 0 ||
usock_tuns.ut_npollthreads > MAX_REASONABLE_NPT) {
CERROR("USOCK_NPOLLTHREADS: %d is out of reasonable limits\n",
usock_tuns.ut_txcredits);
return -1;
}
-
+
if (usock_tuns.ut_peertxcredits <= 0) {
CERROR("USOCK_PEERTXCREDITS: %d should be positive\n",
usock_tuns.ut_peertxcredits);
usock_tuns.ut_socknagle);
return -1;
}
-
+
if (usock_tuns.ut_sockbufsiz < 0) {
CERROR("USOCK_SOCKBUFSIZ: %d should be 0 or positive\n",
usock_tuns.ut_sockbufsiz);
usocklnd_release_poll_states(int n)
{
int i;
-
+
for (i = 0; i < n; i++) {
usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
-
- close(pt->upt_notifier_fd);
- close(pt->upt_pollfd[0].fd);
+
+ libcfs_sock_release(pt->upt_notifier[0]);
+ libcfs_sock_release(pt->upt_notifier[1]);
pthread_mutex_destroy(&pt->upt_pollrequests_lock);
cfs_fini_completion(&pt->upt_completion);
-
+
LIBCFS_FREE (pt->upt_pollfd,
sizeof(struct pollfd) * pt->upt_npollfd);
LIBCFS_FREE (pt->upt_idx2conn,
sizeof(usock_conn_t *) * pt->upt_npollfd);
LIBCFS_FREE (pt->upt_fd2idx,
- sizeof(int) * pt->upt_nfd2idx);
+ sizeof(int) * pt->upt_nfd2idx);
}
}
usocklnd_update_tunables()
{
int rc;
-
- rc = cfs_parse_int_tunable(&usock_tuns.ut_timeout,
+
+ rc = lnet_parse_int_tunable(&usock_tuns.ut_timeout,
"USOCK_TIMEOUT");
if (rc)
return rc;
- rc = cfs_parse_int_tunable(&usock_tuns.ut_poll_timeout,
+ rc = lnet_parse_int_tunable(&usock_tuns.ut_poll_timeout,
"USOCK_POLL_TIMEOUT");
if (rc)
return rc;
- rc = cfs_parse_int_tunable(&usock_tuns.ut_npollthreads,
+ rc = lnet_parse_int_tunable(&usock_tuns.ut_npollthreads,
"USOCK_NPOLLTHREADS");
if (rc)
return rc;
- rc = cfs_parse_int_tunable(&usock_tuns.ut_fair_limit,
+ rc = lnet_parse_int_tunable(&usock_tuns.ut_fair_limit,
"USOCK_FAIR_LIMIT");
if (rc)
return rc;
- rc = cfs_parse_int_tunable(&usock_tuns.ut_min_bulk,
+ rc = lnet_parse_int_tunable(&usock_tuns.ut_min_bulk,
"USOCK_MIN_BULK");
if (rc)
return rc;
- rc = cfs_parse_int_tunable(&usock_tuns.ut_txcredits,
+ rc = lnet_parse_int_tunable(&usock_tuns.ut_txcredits,
"USOCK_TXCREDITS");
if (rc)
return rc;
- rc = cfs_parse_int_tunable(&usock_tuns.ut_peertxcredits,
+ rc = lnet_parse_int_tunable(&usock_tuns.ut_peertxcredits,
"USOCK_PEERTXCREDITS");
if (rc)
return rc;
- rc = cfs_parse_int_tunable(&usock_tuns.ut_socknagle,
+ rc = lnet_parse_int_tunable(&usock_tuns.ut_socknagle,
"USOCK_SOCKNAGLE");
if (rc)
return rc;
- rc = cfs_parse_int_tunable(&usock_tuns.ut_sockbufsiz,
+ rc = lnet_parse_int_tunable(&usock_tuns.ut_sockbufsiz,
"USOCK_SOCKBUFSIZ");
if (rc)
return rc;
if (usocklnd_validate_tunables())
return -EINVAL;
-
+
if (usock_tuns.ut_npollthreads == 0) {
usock_tuns.ut_npollthreads = cfs_online_cpus();
return -EINVAL;
}
}
-
+
return 0;
}
usock_pollthread_t *pt;
int i;
int rc;
-
+
rc = usocklnd_update_tunables();
if (rc)
return rc;
-
+
usock_data.ud_npollthreads = usock_tuns.ut_npollthreads;
LIBCFS_ALLOC (usock_data.ud_pollthreads,
/* Initialize poll thread state structures */
for (i = 0; i < usock_data.ud_npollthreads; i++) {
- int notifier[2];
pt = &usock_data.ud_pollthreads[i];
rc = -ENOMEM;
-
+
LIBCFS_ALLOC (pt->upt_pollfd,
sizeof(struct pollfd) * UPT_START_SIZ);
if (pt->upt_pollfd == NULL)
goto base_startup_failed_0;
-
+
LIBCFS_ALLOC (pt->upt_idx2conn,
sizeof(usock_conn_t *) * UPT_START_SIZ);
if (pt->upt_idx2conn == NULL)
LIBCFS_ALLOC (pt->upt_fd2idx,
sizeof(int) * UPT_START_SIZ);
if (pt->upt_fd2idx == NULL)
- goto base_startup_failed_2;
-
+ goto base_startup_failed_2;
+
memset(pt->upt_fd2idx, 0,
- sizeof(int) * UPT_START_SIZ);
-
+ sizeof(int) * UPT_START_SIZ);
+
LIBCFS_ALLOC (pt->upt_skip,
sizeof(int) * UPT_START_SIZ);
if (pt->upt_skip == NULL)
pt->upt_npollfd = pt->upt_nfd2idx = UPT_START_SIZ;
- rc = libcfs_socketpair(notifier);
+ rc = libcfs_socketpair(pt->upt_notifier);
if (rc != 0)
goto base_startup_failed_4;
- pt->upt_notifier_fd = notifier[0];
-
- pt->upt_pollfd[0].fd = notifier[1];
+ pt->upt_pollfd[0].fd = LIBCFS_SOCK2FD(pt->upt_notifier[1]);
pt->upt_pollfd[0].events = POLLIN;
pt->upt_pollfd[0].revents = 0;
cfs_init_completion(&pt->upt_completion);
}
- /* Initialize peer hash list */
+ /* Initialize peer hash list */
for (i = 0; i < UD_PEER_HASH_SIZE; i++)
CFS_INIT_LIST_HEAD(&usock_data.ud_peers[i]);
-
+
pthread_rwlock_init(&usock_data.ud_peers_lock, NULL);
/* Spawn poll threads */
return rc;
}
}
-
+
usock_data.ud_state = UD_STATE_INITIALIZED;
-
+
return 0;
base_startup_failed_4:
usocklnd_base_shutdown(int n)
{
int i;
-
+
usock_data.ud_shutdown = 1;
for (i = 0; i < n; i++) {
usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
LIBCFS_FREE (usock_data.ud_pollthreads,
usock_data.ud_npollthreads *
sizeof(usock_pollthread_t));
-
+
usock_data.ud_state = UD_STATE_INIT_NOTHING;
}
int rc;
int up;
__u32 ipaddr;
-
+
/* Find correct IP-address and update ni_nid with it.
* Two cases are supported:
* 1) no explicit interfaces are defined. NID will be assigned to
* first non-lo interface that is up;
* 2) exactly one explicit interface is defined. For example,
- * LNET_NETWORKS='tcp(eth0)' */
+ * LNET_NETWORKS='tcp(eth0)' */
if (ni->ni_interfaces[0] == NULL) {
char **names;
int i, n;
-
+
n = libcfs_ipif_enumerate(&names);
if (n <= 0) {
CERROR("Can't enumerate interfaces: %d\n", n);
}
for (i = 0; i < n; i++) {
-
+
if (!strcmp(names[i], "lo")) /* skip the loopback IF */
continue;
-
+
rc = libcfs_ipif_query(names[i], &up, &ipaddr);
if (rc != 0) {
CWARN("Can't get interface %s info: %d\n",
names[i], rc);
continue;
}
-
+
if (!up) {
CWARN("Ignoring interface %s (down)\n",
names[i]);
continue;
}
-
+
break; /* one address is quite enough */
}
-
+
libcfs_ipif_free_enumeration(names, n);
if (i >= n) {
ni->ni_interfaces[0]);
return -1;
}
-
+
CDEBUG(D_NET, "Explicit interface defined: %s. "
"%u.%u.%u.%u used\n",
ni->ni_interfaces[0], HIPQUAD(ipaddr));
-
+
}
-
+
ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ipaddr);
return 0;
ni->ni_maxtxcredits = usock_tuns.ut_txcredits;
ni->ni_peertxcredits = usock_tuns.ut_peertxcredits;
-
+
usock_data.ud_nets_count++;
return 0;
net->un_shutdown = 1;
- usocklnd_del_all_peers(ni);
+ usocklnd_del_all_peers(ni);
/* Wait for all peer state to clean up */
pthread_mutex_lock(&net->un_lock);
- while (net->un_peercount != 0)
+ while (net->un_peercount != 0)
pthread_cond_wait(&net->un_cond, &net->un_lock);
pthread_mutex_unlock(&net->un_lock);
-
+
/* Release usock_net_t structure */
pthread_mutex_destroy(&net->un_lock);
pthread_cond_destroy(&net->un_cond);
for (i = 0; i < UD_PEER_HASH_SIZE; i++) {
list_for_each_safe (ptmp, pnxt, &usock_data.ud_peers[i]) {
peer = list_entry (ptmp, usock_peer_t, up_list);
-
+
if (peer->up_ni != ni)
continue;
}
pthread_rwlock_unlock(&usock_data.ud_peers_lock);
-
+
/* wakeup all threads */
for (i = 0; i < usock_data.ud_npollthreads; i++)
usocklnd_wakeup_pollthread(i);
usocklnd_del_conns_locked(usock_peer_t *peer)
{
int i;
-
+
for (i=0; i < N_CONN_TYPES; i++) {
usock_conn_t *conn = peer->up_conns[i];
if (conn != NULL)
- usocklnd_conn_kill(conn);
- }
+ usocklnd_conn_kill(conn);
+ }
}
struct usock_peer_s;
typedef struct {
- int uc_fd; /* socket */
+ cfs_socket_t *uc_sock; /* socket */
int uc_type; /* conn type */
int uc_activeflag; /* active side of connection? */
int uc_flip; /* is peer other endian? */
__u32 uc_peer_ip; /* IP address of the peer */
__u16 uc_peer_port; /* port of the peer */
struct list_head uc_stale_list; /* orphaned connections */
-
+
/* Receive state */
int uc_rx_state; /* message or hello state */
ksock_hello_msg_t *uc_rx_hello; /* hello buffer */
int uc_tx_flag; /* deadline valid? */
int uc_sending; /* send op is in progress */
usock_tx_t *uc_tx_hello; /* fake tx with hello */
-
+
cfs_atomic_t uc_refcount; /* # of users */
pthread_mutex_t uc_lock; /* serialize */
- int uc_errored; /* a flag for lnet_notify() */
+ int uc_errored; /* a flag for lnet_notify() */
} usock_conn_t;
/* Allowable conn states are: */
* hasn't been set so far */
cfs_atomic_t up_refcount; /* # of users */
pthread_mutex_t up_lock; /* serialize */
- int up_errored; /* a flag for lnet_notify() */
+ int up_errored; /* a flag for lnet_notify() */
cfs_time_t up_last_alive; /* when the peer was last alive */
} usock_peer_t;
typedef struct {
- int upt_notifier_fd; /* notifier fd for writing */
+ cfs_socket_t *upt_notifier[2]; /* notifier sockets: 1st for
+ writing, 2nd for reading */
struct pollfd *upt_pollfd; /* poll fds */
int upt_nfds; /* active poll fds */
int upt_npollfd; /* allocated poll fds */
pthread_cond_t un_cond; /* condvar to wait for notifications */
pthread_mutex_t un_lock; /* a lock to protect un_cond */
} usock_net_t;
-
+
typedef struct {
int ut_poll_timeout; /* the third arg for poll(2) (seconds) */
int ut_timeout; /* "stuck" socket timeout (seconds) */
int usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
unsigned int offset, unsigned int mlen, unsigned int rlen);
-int usocklnd_accept(lnet_ni_t *ni, int sock_fd);
+int usocklnd_accept(lnet_ni_t *ni, cfs_socket_t *sock);
int usocklnd_poll_thread(void *arg);
int usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value);
void usocklnd_conn_free(usock_conn_t *conn);
void usocklnd_tear_peer_conn(usock_conn_t *conn);
void usocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id);
-int usocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp);
+int usocklnd_create_passive_conn(lnet_ni_t *ni,
+ cfs_socket_t *sock, usock_conn_t **connp);
int usocklnd_create_active_conn(usock_peer_t *peer, int type,
usock_conn_t **connp);
-int usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port);
-int usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port);
-int usocklnd_set_sock_options(int fd);
+int usocklnd_connect_srv_mode(cfs_socket_t **sockp,
+ __u32 dst_ip, __u16 dst_port);
+int usocklnd_connect_cli_mode(cfs_socket_t **sockp,
+ __u32 dst_ip, __u16 dst_port);
+int usocklnd_set_sock_options(cfs_socket_t *sock);
usock_tx_t *usocklnd_create_noop_tx(__u64 cookie);
usock_tx_t *usocklnd_create_tx(lnet_msg_t *lntmsg);
void usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
int usocklnd_find_or_create_conn(usock_peer_t *peer, int type,
usock_conn_t **connp,
usock_tx_t *tx, usock_zc_ack_t *zc_ack,
- int *send_immediately_flag);
+ int *send_immediately_flag);
void usocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx);
int usocklnd_invert_type(int type);
void usocklnd_conn_new_state(usock_conn_t *conn, int new_state);
conn->uc_sending = 0;
pthread_mutex_unlock(&conn->uc_lock);
partial_send = 1;
- } else {
+ } else {
usocklnd_destroy_tx(peer->up_ni, tx);
/* NB: lnetmsg was finalized, so we *must* return 0 */
- if (rc < 0) { /* real error */
+ if (rc < 0) { /* real error */
usocklnd_conn_kill(conn);
return 0;
}
-
+
/* rc == 1: tx was sent completely */
rc = 0; /* let's say to caller 'Ok' */
//counter_imm_complete++;
}
pthread_mutex_lock(&conn->uc_lock);
- conn->uc_sending = 0;
+ conn->uc_sending = 0;
/* schedule write handler */
if (partial_send ||
(conn->uc_state == UC_READY &&
(!list_empty(&conn->uc_tx_list) ||
!list_empty(&conn->uc_zcack_list)))) {
- conn->uc_tx_deadline =
+ conn->uc_tx_deadline =
cfs_time_shift(usock_tuns.ut_timeout);
conn->uc_tx_flag = 1;
rc2 = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST, POLLOUT);
int rc;
usock_conn_t *conn;
int send_immediately;
-
+
tx = usocklnd_create_tx(lntmsg);
if (tx == NULL)
return -ENOMEM;
-
+
rc = usocklnd_find_or_create_peer(ni, target, &peer);
if (rc) {
LIBCFS_FREE (tx, tx->tx_size);
return rc;
- }
+ }
/* peer cannot disappear now because its refcount was incremented */
type = usocklnd_get_conn_type(lntmsg);
/* I don't think that we'll win much concurrency moving lock()
* call below lnet_extract_iov() */
pthread_mutex_lock(&conn->uc_lock);
-
+
conn->uc_rx_lnetmsg = msg;
conn->uc_rx_nob_wanted = mlen;
conn->uc_rx_nob_left = rlen;
/* the gap between lnet_parse() and usocklnd_recv() happened? */
if (conn->uc_rx_state == UC_RX_PARSE_WAIT) {
conn->uc_rx_flag = 1; /* waiting for incoming lnet payload */
- conn->uc_rx_deadline =
- cfs_time_shift(usock_tuns.ut_timeout);
+ conn->uc_rx_deadline =
+ cfs_time_shift(usock_tuns.ut_timeout);
rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, POLLIN);
if (rc != 0) {
usocklnd_conn_kill_locked(conn);
}
conn->uc_rx_state = UC_RX_LNET_PAYLOAD;
- recv_out:
+ recv_out:
pthread_mutex_unlock(&conn->uc_lock);
usocklnd_conn_decref(conn);
return rc;
}
int
-usocklnd_accept(lnet_ni_t *ni, int sock_fd)
+usocklnd_accept(lnet_ni_t *ni, cfs_socket_t *sock)
{
int rc;
usock_conn_t *conn;
-
- rc = usocklnd_create_passive_conn(ni, sock_fd, &conn);
+
+ rc = usocklnd_create_passive_conn(ni, sock, &conn);
if (rc)
return rc;
LASSERT(conn != NULL);
-
+
/* disable shutdown event temporarily */
lnet_ni_addref(ni);
/* NB: conn reference counter was incremented while adding
* poll request if rc == 0 */
-
+
usocklnd_conn_decref(conn); /* should destroy conn if rc != 0 */
return rc;
}