From 545fbb58e6fd30ffa5ca337e881a99625d8bf375 Mon Sep 17 00:00:00 2001 From: eeb Date: Tue, 11 Jan 2005 03:37:43 +0000 Subject: [PATCH 1/1] * Made openib not use the subnet manager to discover connection parameters + Openib listens on a TCP/IP port for service queries and responds with the service id, port GID and pkey. + Openib peer table entries have become (NID, IP, port) tuples, where IP and port connect to the peer's service query server. + /proc interfaces for port, listener timeout and backlog (restarts kernel listener on update). + lmc/lconf support for new openib peers, including only running acceptor if net == 'tcp' + Changed connection daemon to conduct both sides of the service query as well as IB connection establishment. Spawning several since tcp/ip reads can block for a timeout. + Added a reaper thread to do connection cleanup and timeout checks (the single connection daemon used to do that too). * Removed some unused lconf default constants * Better openib automagic compilation check. * Consistent usage (flipping + optional checksum) of all openib messages (service query, connection requests and "normal" message flow). * Exhaustive openib "are-you-still-the-same-person" checks * Fixed bug which stopped idle persistent peers from getting removed from the peer table. * Fixed some ranal bugs which became obvious when similar problems were debugged in openibnal --- lnet/autoconf/lustre-lnet.m4 | 11 +- lnet/klnds/openiblnd/openiblnd.c | 1107 ++++++++++++++++++++++++++++------- lnet/klnds/openiblnd/openiblnd.h | 202 ++++--- lnet/klnds/openiblnd/openiblnd_cb.c | 533 ++++++++--------- lnet/klnds/ralnd/ralnd.c | 77 ++- lnet/klnds/ralnd/ralnd.h | 15 +- lnet/klnds/ralnd/ralnd_cb.c | 3 +- lnet/utils/portals.c | 10 +- 8 files changed, 1339 insertions(+), 619 deletions(-) diff --git a/lnet/autoconf/lustre-lnet.m4 b/lnet/autoconf/lustre-lnet.m4 index 2c20b92..c72b2ff 100644 --- a/lnet/autoconf/lustre-lnet.m4 +++ b/lnet/autoconf/lustre-lnet.m4 @@ -151,7 +151,8 @@ AC_ARG_WITH([openib], if test $ENABLEOPENIB -eq 0; then AC_MSG_RESULT([disabled]) elif test ! \( -f ${OPENIBPATH}/include/ts_ib_core.h -a \ - -f ${OPENIBPATH}/include/ts_ib_sa_client.h \); then + -f ${OPENIBPATH}/include/ts_ib_cm.h -a\ + -f ${OPENIBPATH}/include/ts_ib_sa_client.h \); then AC_MSG_RESULT([no]) case $ENABLEOPENIB in 1) ;; @@ -170,10 +171,12 @@ else EXTRA_KCFLAGS="$EXTRA_KCFLAGS $OPENIBCPPFLAGS" LB_LINUX_TRY_COMPILE([ #include - #include + #include + #include ],[ - struct ib_device_properties props; - struct ib_common_attrib_services svc; + struct ib_device_properties dev_props; + struct ib_cm_active_param cm_active_params; + tTS_IB_CLIENT_QUERY_TID tid; return 0; ],[ AC_MSG_RESULT([yes]) diff --git a/lnet/klnds/openiblnd/openiblnd.c b/lnet/klnds/openiblnd/openiblnd.c index 8443c87..378fff9 100644 --- a/lnet/klnds/openiblnd/openiblnd.c +++ b/lnet/klnds/openiblnd/openiblnd.c @@ -28,15 +28,28 @@ ptl_handle_ni_t kibnal_ni; kib_data_t kibnal_data; kib_tunables_t kibnal_tunables; -#ifdef CONFIG_SYSCTL #define IBNAL_SYSCTL 202 -#define IBNAL_SYSCTL_TIMEOUT 1 +enum { + IBNAL_SYSCTL_TIMEOUT=1, + IBNAL_SYSCTL_LISTENER_TIMEOUT, + IBNAL_SYSCTL_BACKLOG, + IBNAL_SYSCTL_PORT +}; static ctl_table kibnal_ctl_table[] = { {IBNAL_SYSCTL_TIMEOUT, "timeout", &kibnal_tunables.kib_io_timeout, sizeof (int), 0644, NULL, &proc_dointvec}, + {IBNAL_SYSCTL_LISTENER_TIMEOUT, "listener_timeout", + &kibnal_tunables.kib_listener_timeout, sizeof(int), + 0644, NULL, &proc_dointvec}, + {IBNAL_SYSCTL_BACKLOG, "backlog", + &kibnal_tunables.kib_backlog, sizeof(int), + 0644, NULL, kibnal_listener_procint}, + {IBNAL_SYSCTL_PORT, "port", + &kibnal_tunables.kib_port, sizeof(int), + 0644, NULL, kibnal_listener_procint}, { 0 } }; @@ -44,211 +57,869 @@ static ctl_table kibnal_top_ctl_table[] = { {IBNAL_SYSCTL, "openibnal", NULL, 0, 0555, kibnal_ctl_table}, { 0 } }; -#endif + +__u32 +kibnal_cksum (void *ptr, int nob) +{ + char *c = ptr; + __u32 sum = 0; + + while (nob-- > 0) + sum = ((sum << 1) | (sum >> 31)) + *c++; + + /* ensure I don't return 0 (== no checksum) */ + return (sum == 0) ? 1 : sum; +} void -print_service(struct ib_common_attrib_service *service, char *tag, int rc) +kibnal_init_msg(kib_msg_t *msg, int type, int body_nob) { - char name[32]; + msg->ibm_type = type; + msg->ibm_nob = offsetof(kib_msg_t, ibm_u) + body_nob; +} - if (service == NULL) - { - CWARN("tag : %s\n" - "status : %d (NULL)\n", tag, rc); - return; +void +kibnal_pack_msg(kib_msg_t *msg, int credits, ptl_nid_t dstnid, __u64 dststamp) +{ + /* CAVEAT EMPTOR! all message fields not set here should have been + * initialised previously. */ + msg->ibm_magic = IBNAL_MSG_MAGIC; + msg->ibm_version = IBNAL_MSG_VERSION; + /* ibm_type */ + msg->ibm_credits = credits; + /* ibm_nob */ + msg->ibm_cksum = 0; + msg->ibm_srcnid = kibnal_lib.libnal_ni.ni_pid.nid; + msg->ibm_srcstamp = kibnal_data.kib_incarnation; + msg->ibm_dstnid = dstnid; + msg->ibm_dststamp = dststamp; +#if IBNAL_CKSUM + /* NB ibm_cksum zero while computing cksum */ + msg->ibm_cksum = kibnal_cksum(msg, msg->ibm_nob); +#endif +} + +int +kibnal_unpack_msg(kib_msg_t *msg, int nob) +{ + const int hdr_size = offsetof(kib_msg_t, ibm_u); + __u32 msg_cksum; + int flip; + int msg_nob; + + if (nob < 6) { + CERROR("Short message: %d\n", nob); + return -EPROTO; + } + + if (msg->ibm_magic == IBNAL_MSG_MAGIC) { + flip = 0; + } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) { + flip = 1; + } else { + CERROR("Bad magic: %08x\n", msg->ibm_magic); + return -EPROTO; + } + + if (msg->ibm_version != + (flip ? __swab16(IBNAL_MSG_VERSION) : IBNAL_MSG_VERSION)) { + CERROR("Bad version: %d\n", msg->ibm_version); + return -EPROTO; + } + + if (nob < hdr_size) { + CERROR("Short message: %d\n", nob); + return -EPROTO; + } + + msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob; + if (msg_nob > nob) { + CERROR("Short message: got %d, wanted %d\n", nob, msg_nob); + return -EPROTO; + } + + /* checksum must be computed with ibm_cksum zero and BEFORE anything + * gets flipped */ + msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum; + msg->ibm_cksum = 0; + if (msg_cksum != 0 && + msg_cksum != kibnal_cksum(msg, msg_nob)) { + CERROR("Bad checksum\n"); + return -EPROTO; + } + msg->ibm_cksum = msg_cksum; + + if (flip) { + /* leave magic unflipped as a clue to peer endianness */ + __swab16s(&msg->ibm_version); + LASSERT (sizeof(msg->ibm_type) == 1); + LASSERT (sizeof(msg->ibm_credits) == 1); + msg->ibm_nob = msg_nob; + __swab64s(&msg->ibm_srcnid); + __swab64s(&msg->ibm_srcstamp); + __swab64s(&msg->ibm_dstnid); + __swab64s(&msg->ibm_dststamp); } - strncpy (name, service->service_name, sizeof(name)-1); - name[sizeof(name)-1] = 0; - CWARN("tag : %s\n" - "status : %d\n" - "service id: "LPX64"\n" - "name : %s\n" - "NID : "LPX64"\n", tag, rc, - service->service_id, name, - *kibnal_service_nid_field(service)); + switch (msg->ibm_type) { + default: + CERROR("Unknown message type %x\n", msg->ibm_type); + return -EPROTO; + + case IBNAL_MSG_SVCQRY: + case IBNAL_MSG_NOOP: + break; + + case IBNAL_MSG_SVCRSP: + if (msg_nob < hdr_size + sizeof(msg->ibm_u.svcrsp)) { + CERROR("Short SVCRSP: %d(%d)\n", msg_nob, + (int)(hdr_size + sizeof(msg->ibm_u.svcrsp))); + return -EPROTO; + } + if (flip) { + __swab64s(&msg->ibm_u.svcrsp.ibsr_svc_id); + __swab16s(&msg->ibm_u.svcrsp.ibsr_svc_pkey); + } + break; + + case IBNAL_MSG_CONNREQ: + case IBNAL_MSG_CONNACK: + if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) { + CERROR("Short CONNREQ: %d(%d)\n", msg_nob, + (int)(hdr_size + sizeof(msg->ibm_u.connparams))); + return -EPROTO; + } + if (flip) + __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth); + break; + + case IBNAL_MSG_IMMEDIATE: + if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) { + CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob, + (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])); + return -EPROTO; + } + break; + + case IBNAL_MSG_PUT_RDMA: + case IBNAL_MSG_GET_RDMA: + if (msg_nob < hdr_size + sizeof(msg->ibm_u.rdma)) { + CERROR("Short RDMA req: %d(%d)\n", msg_nob, + (int)(hdr_size + sizeof(msg->ibm_u.rdma))); + return -EPROTO; + } + if (flip) { + __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_key); + __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_nob); + __swab64s(&msg->ibm_u.rdma.ibrm_desc.rd_addr); + } + break; + + case IBNAL_MSG_PUT_DONE: + case IBNAL_MSG_GET_DONE: + if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) { + CERROR("Short RDMA completion: %d(%d)\n", msg_nob, + (int)(hdr_size + sizeof(msg->ibm_u.completion))); + return -EPROTO; + } + if (flip) + __swab32s(&msg->ibm_u.completion.ibcm_status); + break; + } + return 0; } -void -kibnal_service_setunset_done (tTS_IB_CLIENT_QUERY_TID tid, int status, - struct ib_common_attrib_service *service, void *arg) +int +kibnal_sock_write (struct socket *sock, void *buffer, int nob) { - *(int *)arg = status; - up (&kibnal_data.kib_nid_signal); + int rc; + mm_segment_t oldmm = get_fs(); + struct iovec iov = { + .iov_base = buffer, + .iov_len = nob + }; + struct msghdr msg = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = MSG_DONTWAIT + }; + + /* We've set up the socket's send buffer to be large enough for + * everything we send, so a single non-blocking send should + * complete without error. */ + + set_fs(KERNEL_DS); + rc = sock_sendmsg(sock, &msg, iov.iov_len); + set_fs(oldmm); + + if (rc == nob) + return 0; + + if (rc >= 0) + return -EAGAIN; + + return rc; } -#if IBNAL_CHECK_ADVERT -void -kibnal_check_advert (void) +int +kibnal_sock_read (struct socket *sock, void *buffer, int nob, int timeout) { - struct ib_common_attrib_service *svc; - __u64 tid; - int rc; - int rc2; + int rc; + mm_segment_t oldmm = get_fs(); + long ticks = timeout * HZ; + unsigned long then; + struct timeval tv; - PORTAL_ALLOC(svc, sizeof(*svc)); - if (svc == NULL) - return; + LASSERT (nob > 0); + LASSERT (ticks > 0); + + for (;;) { + struct iovec iov = { + .iov_base = buffer, + .iov_len = nob + }; + struct msghdr msg = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = 0 + }; + + /* Set receive timeout to remaining time */ + tv = (struct timeval) { + .tv_sec = ticks / HZ, + .tv_usec = ((ticks % HZ) * 1000000) / HZ + }; + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, + (char *)&tv, sizeof(tv)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set socket recv timeout %d: %d\n", + timeout, rc); + return rc; + } - memset (svc, 0, sizeof (*svc)); - kibnal_set_service_keys(svc, kibnal_data.kib_nid); + set_fs(KERNEL_DS); + then = jiffies; + rc = sock_recvmsg(sock, &msg, iov.iov_len, 0); + ticks -= jiffies - then; + set_fs(oldmm); - rc = ib_service_get (kibnal_data.kib_device, - kibnal_data.kib_port, - svc, - KIBNAL_SERVICE_KEY_MASK, - kibnal_tunables.kib_io_timeout * HZ, - kibnal_service_setunset_done, &rc2, - &tid); + if (rc < 0) + return rc; + if (rc == 0) + return -ECONNABORTED; + + buffer = ((char *)buffer) + rc; + nob -= rc; + + if (nob == 0) + return 0; + + if (ticks <= 0) + return -ETIMEDOUT; + } +} + +int +kibnal_create_sock(struct socket **sockp) +{ + struct socket *sock; + int rc; + int option; + mm_segment_t oldmm = get_fs(); + + rc = sock_create(PF_INET, SOCK_STREAM, 0, &sock); if (rc != 0) { - CERROR ("Immediate error %d checking SM service\n", rc); - } else { - down (&kibnal_data.kib_nid_signal); - rc = rc2; + CERROR("Can't create socket: %d\n", rc); + return rc; + } + + /* Ensure sends will not block */ + option = 2 * sizeof(kib_msg_t); + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF, + (char *)&option, sizeof(option)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set send buffer %d: %d\n", option, rc); + goto failed; + } + + option = 1; + set_fs(KERNEL_DS); + rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&option, sizeof(option)); + set_fs(oldmm); + if (rc != 0) { + CERROR("Can't set SO_REUSEADDR: %d\n", rc); + goto failed; + } + + *sockp = sock; + return 0; + + failed: + sock_release(sock); + return rc; +} + +void +kibnal_pause(int ticks) +{ + set_current_state(TASK_UNINTERRUPTIBLE); + schedule_timeout(ticks); +} + +int +kibnal_connect_sock(kib_peer_t *peer, struct socket **sockp) +{ + struct sockaddr_in locaddr; + struct sockaddr_in srvaddr; + struct socket *sock; + unsigned int port; + int rc; + + for (port = 1023; port >= 512; port--) { + memset(&locaddr, 0, sizeof(locaddr)); + locaddr.sin_family = AF_INET; + locaddr.sin_port = htons(port); + locaddr.sin_addr.s_addr = htonl(INADDR_ANY); + + memset (&srvaddr, 0, sizeof (srvaddr)); + srvaddr.sin_family = AF_INET; + srvaddr.sin_port = htons (peer->ibp_port); + srvaddr.sin_addr.s_addr = htonl (peer->ibp_ip); + + rc = kibnal_create_sock(&sock); if (rc != 0) - CERROR ("Error %d checking SM service\n", rc); + return rc; + + rc = sock->ops->bind(sock, + (struct sockaddr *)&locaddr, sizeof(locaddr)); + if (rc != 0) { + sock_release(sock); + + if (rc == -EADDRINUSE) { + CDEBUG(D_NET, "Port %d already in use\n", port); + continue; + } + + CERROR("Can't bind to reserved port %d: %d\n", port, rc); + return rc; + } + + rc = sock->ops->connect(sock, + (struct sockaddr *)&srvaddr, sizeof(srvaddr), + 0); + if (rc == 0) { + *sockp = sock; + return 0; + } + + sock_release(sock); + + if (rc != -EADDRNOTAVAIL) { + CERROR("Can't connect port %d to %u.%u.%u.%u/%d: %d\n", + port, HIPQUAD(peer->ibp_ip), peer->ibp_port, rc); + return rc; + } + + CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", + port, HIPQUAD(peer->ibp_ip), peer->ibp_port); } - PORTAL_FREE(svc, sizeof(*svc)); + /* all ports busy */ + return -EHOSTUNREACH; } -#endif int -kibnal_advertise (void) +kibnal_make_svcqry (kib_conn_t *conn) { - struct ib_common_attrib_service *svc; - __u64 tid; - int rc; - int rc2; + kib_peer_t *peer = conn->ibc_peer; + kib_msg_t *msg; + struct socket *sock; + int rc; + int nob; - LASSERT (kibnal_data.kib_nid != PTL_NID_ANY); + LASSERT (conn->ibc_connreq != NULL); + msg = &conn->ibc_connreq->cr_msg; - PORTAL_ALLOC(svc, sizeof(*svc)); - if (svc == NULL) - return (-ENOMEM); + kibnal_init_msg(msg, IBNAL_MSG_SVCQRY, 0); + kibnal_pack_msg(msg, 0, peer->ibp_nid, 0); - memset (svc, 0, sizeof (*svc)); + rc = kibnal_connect_sock(peer, &sock); + if (rc != 0) + return rc; - svc->service_id = kibnal_data.kib_service_id; + rc = kibnal_sock_write(sock, msg, msg->ibm_nob); + if (rc != 0) { + CERROR("Error %d sending svcqry to " + LPX64"@%u.%u.%u.%u/%d\n", rc, + peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port); + goto out; + } - rc = ib_cached_gid_get(kibnal_data.kib_device, - kibnal_data.kib_port, - 0, - svc->service_gid); + nob = offsetof(kib_msg_t, ibm_u) + sizeof(msg->ibm_u.svcrsp); + rc = kibnal_sock_read(sock, msg, nob, kibnal_tunables.kib_io_timeout); if (rc != 0) { - CERROR ("Can't get port %d GID: %d\n", - kibnal_data.kib_port, rc); + CERROR("Error %d receiving svcrsp from " + LPX64"@%u.%u.%u.%u/%d\n", rc, + peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port); goto out; } - - rc = ib_cached_pkey_get(kibnal_data.kib_device, - kibnal_data.kib_port, - 0, - &svc->service_pkey); + + rc = kibnal_unpack_msg(msg, nob); if (rc != 0) { - CERROR ("Can't get port %d PKEY: %d\n", - kibnal_data.kib_port, rc); + CERROR("Error %d unpacking svcrsp from " + LPX64"@%u.%u.%u.%u/%d\n", rc, + peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port); + goto out; + } + + if (msg->ibm_type != IBNAL_MSG_SVCRSP) { + CERROR("Unexpected response type %d from " + LPX64"@%u.%u.%u.%u/%d\n", msg->ibm_type, + peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port); + rc = -EPROTO; goto out; } - svc->service_lease = 0xffffffff; + if (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid || + msg->ibm_dststamp != kibnal_data.kib_incarnation) { + CERROR("Unexpected dst NID/stamp "LPX64"/"LPX64" from " + LPX64"@%u.%u.%u.%u/%d\n", + msg->ibm_dstnid, msg->ibm_dststamp, + peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port); + rc = -EPROTO; + goto out; + } - kibnal_set_service_keys(svc, kibnal_data.kib_nid); + if (msg->ibm_srcnid != peer->ibp_nid) { + CERROR("Unexpected src NID "LPX64" from " + LPX64"@%u.%u.%u.%u/%d\n", msg->ibm_srcnid, + peer->ibp_nid, HIPQUAD(peer->ibp_ip), peer->ibp_port); + rc = -EPROTO; + goto out; + } - CDEBUG(D_NET, "Advertising service id "LPX64" %s:"LPX64"\n", - svc->service_id, - svc->service_name, *kibnal_service_nid_field(svc)); + conn->ibc_incarnation = msg->ibm_srcstamp; + conn->ibc_connreq->cr_svcrsp = msg->ibm_u.svcrsp; + out: + sock_release(sock); + return rc; +} - rc = ib_service_set (kibnal_data.kib_device, - kibnal_data.kib_port, - svc, - IB_SA_SERVICE_COMP_MASK_ID | - IB_SA_SERVICE_COMP_MASK_GID | - IB_SA_SERVICE_COMP_MASK_PKEY | - IB_SA_SERVICE_COMP_MASK_LEASE | - KIBNAL_SERVICE_KEY_MASK, - kibnal_tunables.kib_io_timeout * HZ, - kibnal_service_setunset_done, &rc2, &tid); +void +kibnal_handle_svcqry (struct socket *sock) +{ + struct sockaddr_in addr; + __u32 peer_ip; + unsigned int peer_port; + kib_msg_t *msg; + __u64 srcnid; + __u64 srcstamp; + int len; + int rc; + + len = sizeof(addr); + rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2); + if (rc != 0) { + CERROR("Can't get peer's IP: %d\n", rc); + return; + } + + peer_ip = ntohl(addr.sin_addr.s_addr); + peer_port = ntohs(addr.sin_port); + + if (peer_port >= 1024) { + CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n", + HIPQUAD(peer_ip), peer_port); + return; + } + PORTAL_ALLOC(msg, sizeof(*msg)); + if (msg == NULL) { + CERROR("Can't allocate msgs for %u.%u.%u.%u/%d\n", + HIPQUAD(peer_ip), peer_port); + goto out; + } + + rc = kibnal_sock_read(sock, msg, offsetof(kib_msg_t, ibm_u), + kibnal_tunables.kib_listener_timeout); if (rc != 0) { - CERROR ("Immediate error %d advertising NID "LPX64"\n", - rc, kibnal_data.kib_nid); + CERROR("Error %d receiving svcqry from %u.%u.%u.%u/%d\n", + rc, HIPQUAD(peer_ip), peer_port); + goto out; + } + + rc = kibnal_unpack_msg(msg, offsetof(kib_msg_t, ibm_u)); + if (rc != 0) { + CERROR("Error %d unpacking svcqry from %u.%u.%u.%u/%d\n", + rc, HIPQUAD(peer_ip), peer_port); + goto out; + } + + if (msg->ibm_type != IBNAL_MSG_SVCQRY) { + CERROR("Unexpected message %d from %u.%u.%u.%u/%d\n", + msg->ibm_type, HIPQUAD(peer_ip), peer_port); + goto out; + } + + if (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid) { + CERROR("Unexpected dstnid "LPX64"(expected "LPX64" " + "from %u.%u.%u.%u/%d\n", msg->ibm_dstnid, + kibnal_lib.libnal_ni.ni_pid.nid, + HIPQUAD(peer_ip), peer_port); goto out; } - down (&kibnal_data.kib_nid_signal); + srcnid = msg->ibm_srcnid; + srcstamp = msg->ibm_srcstamp; + + kibnal_init_msg(msg, IBNAL_MSG_SVCRSP, sizeof(msg->ibm_u.svcrsp)); - rc = rc2; - if (rc != 0) - CERROR ("Error %d advertising NID "LPX64"\n", - rc, kibnal_data.kib_nid); + msg->ibm_u.svcrsp.ibsr_svc_id = kibnal_data.kib_svc_id; + memcpy(msg->ibm_u.svcrsp.ibsr_svc_gid, kibnal_data.kib_svc_gid, + sizeof(kibnal_data.kib_svc_gid)); + msg->ibm_u.svcrsp.ibsr_svc_pkey = kibnal_data.kib_svc_pkey; + + kibnal_pack_msg(msg, 0, srcnid, srcstamp); + + rc = kibnal_sock_write (sock, msg, msg->ibm_nob); + if (rc != 0) { + CERROR("Error %d replying to svcqry from %u.%u.%u.%u/%d\n", + rc, HIPQUAD(peer_ip), peer_port); + goto out; + } + out: - PORTAL_FREE(svc, sizeof(*svc)); - return (rc); + PORTAL_FREE(msg, sizeof(*msg)); } void -kibnal_unadvertise (int expect_success) +kibnal_free_acceptsock (kib_acceptsock_t *as) { - struct ib_common_attrib_service *svc; - __u64 tid; - int rc; - int rc2; + sock_release(as->ibas_sock); + PORTAL_FREE(as, sizeof(*as)); +} + +int +kibnal_ip_listener(void *arg) +{ + struct sockaddr_in addr; + wait_queue_t wait; + struct socket *sock; + kib_acceptsock_t *as; + int port; + char name[16]; + int rc; + unsigned long flags; + + /* Parent thread holds kib_nid_mutex, and is, or is about to + * block on kib_listener_signal */ + + port = kibnal_tunables.kib_port; + snprintf(name, sizeof(name), "kibnal_lstn%03d", port); + kportal_daemonize(name); + kportal_blockallsigs(); + + init_waitqueue_entry(&wait, current); + + rc = kibnal_create_sock(&sock); + if (rc != 0) + goto out_0; + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY; + + rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr)); + if (rc != 0) { + CERROR("Can't bind to port %d\n", port); + goto out_1; + } + + rc = sock->ops->listen(sock, kibnal_tunables.kib_backlog); + if (rc != 0) { + CERROR("Can't set listen backlog %d: %d\n", + kibnal_tunables.kib_backlog, rc); + goto out_1; + } + + LASSERT (kibnal_data.kib_listener_sock == NULL); + kibnal_data.kib_listener_sock = sock; + + /* unblock waiting parent */ + LASSERT (kibnal_data.kib_listener_shutdown == 0); + up(&kibnal_data.kib_listener_signal); + + /* Wake me any time something happens on my socket */ + add_wait_queue(sock->sk->sk_sleep, &wait); + as = NULL; + + while (kibnal_data.kib_listener_shutdown == 0) { + + if (as == NULL) { + PORTAL_ALLOC(as, sizeof(*as)); + if (as == NULL) { + CERROR("Out of Memory: pausing...\n"); + kibnal_pause(HZ); + continue; + } + as->ibas_sock = NULL; + } + + if (as->ibas_sock == NULL) { + as->ibas_sock = sock_alloc(); + if (as->ibas_sock == NULL) { + CERROR("Can't allocate socket: pausing...\n"); + kibnal_pause(HZ); + continue; + } + /* XXX this should add a ref to sock->ops->owner, if + * TCP could be a module */ + as->ibas_sock->type = sock->type; + as->ibas_sock->ops = sock->ops; + } + + set_current_state(TASK_INTERRUPTIBLE); + + rc = sock->ops->accept(sock, as->ibas_sock, O_NONBLOCK); + + /* Sleep for socket activity? */ + if (rc == -EAGAIN && + kibnal_data.kib_listener_shutdown == 0) + schedule(); + + set_current_state(TASK_RUNNING); + + if (rc == 0) { + spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags); + + list_add_tail(&as->ibas_list, + &kibnal_data.kib_connd_acceptq); + + spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags); + wake_up(&kibnal_data.kib_connd_waitq); + + as = NULL; + continue; + } + + if (rc != -EAGAIN) { + CERROR("Accept failed: %d, pausing...\n", rc); + kibnal_pause(HZ); + } + } + + if (as != NULL) { + if (as->ibas_sock != NULL) + sock_release(as->ibas_sock); + PORTAL_FREE(as, sizeof(*as)); + } + + rc = 0; + remove_wait_queue(sock->sk->sk_sleep, &wait); + out_1: + sock_release(sock); + kibnal_data.kib_listener_sock = NULL; + out_0: + /* set completion status and unblock thread waiting for me + * (parent on startup failure, executioner on normal shutdown) */ + kibnal_data.kib_listener_shutdown = rc; + up(&kibnal_data.kib_listener_signal); + + return 0; +} + +int +kibnal_start_ip_listener (void) +{ + long pid; + int rc; + + CDEBUG(D_WARNING, "Starting listener\n"); - LASSERT (kibnal_data.kib_nid != PTL_NID_ANY); + /* Called holding kib_nid_mutex: listener stopped */ + LASSERT (kibnal_data.kib_listener_sock == NULL); - PORTAL_ALLOC(svc, sizeof(*svc)); - if (svc == NULL) + kibnal_data.kib_listener_shutdown = 0; + pid = kernel_thread(kibnal_ip_listener, NULL, 0); + if (pid < 0) { + CERROR("Can't spawn listener: %ld\n", pid); + return (int)pid; + } + + /* Block until listener has started up. */ + down(&kibnal_data.kib_listener_signal); + + rc = kibnal_data.kib_listener_shutdown; + LASSERT ((rc != 0) == (kibnal_data.kib_listener_sock == NULL)); + + CDEBUG(D_WARNING, "Listener %ld started OK\n", pid); + return rc; +} + +void +kibnal_stop_ip_listener(int clear_acceptq) +{ + struct list_head zombie_accepts; + kib_acceptsock_t *as; + unsigned long flags; + + CDEBUG(D_WARNING, "Stopping listener\n"); + + /* Called holding kib_nid_mutex: listener running */ + LASSERT (kibnal_data.kib_listener_sock != NULL); + + kibnal_data.kib_listener_shutdown = 1; + wake_up_all(kibnal_data.kib_listener_sock->sk->sk_sleep); + + /* Block until listener has torn down. */ + down(&kibnal_data.kib_listener_signal); + + LASSERT (kibnal_data.kib_listener_sock == NULL); + CDEBUG(D_WARNING, "Listener stopped\n"); + + if (!clear_acceptq) return; - memset (svc, 0, sizeof(*svc)); + /* Close any unhandled accepts */ + spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags); + + list_add(&zombie_accepts, &kibnal_data.kib_connd_acceptq); + list_del_init(&kibnal_data.kib_connd_acceptq); + + spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags); + + while (!list_empty(&zombie_accepts)) { + as = list_entry(zombie_accepts.next, + kib_acceptsock_t, ibas_list); + list_del(&as->ibas_list); + kibnal_free_acceptsock(as); + } +} + +int +kibnal_listener_procint(ctl_table *table, int write, struct file *filp, + void *buffer, size_t *lenp) +{ + int *tunable = (int *)table->data; + int old_val; + int rc; + + /* No race with nal initialisation since the nal is setup all the time + * it's loaded. When that changes, change this! */ + LASSERT (kibnal_data.kib_init == IBNAL_INIT_ALL); + + down(&kibnal_data.kib_nid_mutex); + + LASSERT (tunable == &kibnal_tunables.kib_port || + tunable == &kibnal_tunables.kib_backlog); + old_val = *tunable; - kibnal_set_service_keys(svc, kibnal_data.kib_nid); + rc = proc_dointvec(table, write, filp, buffer, lenp); - CDEBUG(D_NET, "Unadvertising service %s:"LPX64"\n", - svc->service_name, *kibnal_service_nid_field(svc)); + if (write && + (*tunable != old_val || + kibnal_data.kib_listener_sock == NULL)) { - rc = ib_service_delete (kibnal_data.kib_device, - kibnal_data.kib_port, - svc, - KIBNAL_SERVICE_KEY_MASK, - kibnal_tunables.kib_io_timeout * HZ, - kibnal_service_setunset_done, &rc2, &tid); + if (kibnal_data.kib_listener_sock != NULL) + kibnal_stop_ip_listener(0); + + rc = kibnal_start_ip_listener(); + + if (rc != 0) { + CERROR("Unable to restart listener with new tunable:" + " reverting to old value\n"); + *tunable = old_val; + kibnal_start_ip_listener(); + } + } + + up(&kibnal_data.kib_nid_mutex); + + LASSERT (kibnal_data.kib_init == IBNAL_INIT_ALL); + return rc; +} + +int +kibnal_start_ib_listener (void) +{ + int rc; + + LASSERT (kibnal_data.kib_listen_handle == NULL); + + kibnal_data.kib_svc_id = ib_cm_service_assign(); + CDEBUG(D_NET, "svc id "LPX64"\n", kibnal_data.kib_svc_id); + + rc = ib_cached_gid_get(kibnal_data.kib_device, + kibnal_data.kib_port, 0, + kibnal_data.kib_svc_gid); if (rc != 0) { - CERROR ("Immediate error %d unadvertising NID "LPX64"\n", - rc, kibnal_data.kib_nid); - goto out; + CERROR("Can't get port %d GID: %d\n", + kibnal_data.kib_port, rc); + return rc; + } + + rc = ib_cached_pkey_get(kibnal_data.kib_device, + kibnal_data.kib_port, 0, + &kibnal_data.kib_svc_pkey); + if (rc != 0) { + CERROR ("Can't get port %d PKEY: %d\n", + kibnal_data.kib_port, rc); + return rc; } - down (&kibnal_data.kib_nid_signal); + rc = ib_cm_listen(kibnal_data.kib_svc_id, + TS_IB_CM_SERVICE_EXACT_MASK, + kibnal_passive_conn_callback, NULL, + &kibnal_data.kib_listen_handle); + if (rc != 0) { + kibnal_data.kib_listen_handle = NULL; + CERROR ("Can't create IB listener: %d\n", rc); + return rc; + } - if ((rc2 == 0) == !!expect_success) - goto out; /* success: rc == 0 */ - - if (expect_success) - CERROR("Error %d unadvertising NID "LPX64"\n", - rc, kibnal_data.kib_nid); - else - CWARN("Removed conflicting NID "LPX64"\n", - kibnal_data.kib_nid); - out: - PORTAL_FREE(svc, sizeof(*svc)); + LASSERT (kibnal_data.kib_listen_handle != NULL); + return 0; +} + +void +kibnal_stop_ib_listener (void) +{ + int rc; + + LASSERT (kibnal_data.kib_listen_handle != NULL); + + rc = ib_cm_listen_stop (kibnal_data.kib_listen_handle); + if (rc != 0) + CERROR("Error stopping IB listener: %d\n", rc); + + kibnal_data.kib_listen_handle = NULL; } int -kibnal_set_mynid(ptl_nid_t nid) +kibnal_set_mynid (ptl_nid_t nid) { - struct timeval tv; - lib_ni_t *ni = &kibnal_lib.libnal_ni; - int rc; + lib_ni_t *ni = &kibnal_lib.libnal_ni; + int rc; CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n", nid, ni->ni_pid.nid); - do_gettimeofday(&tv); - down (&kibnal_data.kib_nid_mutex); if (nid == kibnal_data.kib_nid) { @@ -259,60 +930,48 @@ kibnal_set_mynid(ptl_nid_t nid) CDEBUG(D_NET, "NID "LPX64"("LPX64")\n", kibnal_data.kib_nid, nid); - - if (kibnal_data.kib_nid != PTL_NID_ANY) { - - kibnal_unadvertise (1); - rc = ib_cm_listen_stop (kibnal_data.kib_listen_handle); - if (rc != 0) - CERROR ("Error %d stopping listener\n", rc); - } - - kibnal_data.kib_nid = ni->ni_pid.nid = nid; - kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; + if (kibnal_data.kib_listener_sock != NULL) + kibnal_stop_ip_listener(1); + if (kibnal_data.kib_listen_handle != NULL) + kibnal_stop_ib_listener(); + + ni->ni_pid.nid = nid; + kibnal_data.kib_incarnation++; + mb(); /* Delete all existing peers and their connections after new - * NID/incarnation set to ensure no old connections in our brave - * new world. */ + * NID/incarnation set to ensure no old connections in our brave new + * world. */ kibnal_del_peer (PTL_NID_ANY, 0); - if (kibnal_data.kib_nid == PTL_NID_ANY) { - /* No new NID to install */ - up (&kibnal_data.kib_nid_mutex); - return (0); - } - - /* remove any previous advert (crashed node etc) */ - kibnal_unadvertise(0); - - /* Assign new service number */ - kibnal_data.kib_service_id = ib_cm_service_assign(); - CDEBUG(D_NET, "service_id "LPX64"\n", kibnal_data.kib_service_id); + if (ni->ni_pid.nid != PTL_NID_ANY) { + /* got a new NID to install */ + rc = kibnal_start_ib_listener(); + if (rc != 0) { + CERROR("Can't start IB listener: %d\n", rc); + goto failed_0; + } - rc = ib_cm_listen(kibnal_data.kib_service_id, - TS_IB_CM_SERVICE_EXACT_MASK, - kibnal_passive_conn_callback, NULL, - &kibnal_data.kib_listen_handle); - if (rc == 0) { - rc = kibnal_advertise(); - if (rc == 0) { -#if IBNAL_CHECK_ADVERT - kibnal_check_advert(); -#endif - up (&kibnal_data.kib_nid_mutex); - return (0); + rc = kibnal_start_ip_listener(); + if (rc != 0) { + CERROR("Can't start IP listener: %d\n", rc); + goto failed_1; } - - ib_cm_listen_stop(kibnal_data.kib_listen_handle); - /* remove any peers that sprung up while I failed to - * advertise myself */ - kibnal_del_peer (PTL_NID_ANY, 0); } - kibnal_data.kib_nid = PTL_NID_ANY; - up (&kibnal_data.kib_nid_mutex); - return (rc); + up(&kibnal_data.kib_nid_mutex); + return 0; + + failed_1: + kibnal_stop_ib_listener(); + failed_0: + ni->ni_pid.nid = PTL_NID_ANY; + kibnal_data.kib_incarnation++; + mb(); + kibnal_del_peer (PTL_NID_ANY, 0); + up(&kibnal_data.kib_nid_mutex); + return rc; } kib_peer_t * @@ -431,7 +1090,8 @@ kibnal_unlink_peer_locked (kib_peer_t *peer) } int -kibnal_get_peer_info (int index, ptl_nid_t *nidp, int *persistencep) +kibnal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, + int *persistencep) { kib_peer_t *peer; struct list_head *ptmp; @@ -452,6 +1112,8 @@ kibnal_get_peer_info (int index, ptl_nid_t *nidp, int *persistencep) continue; *nidp = peer->ibp_nid; + *ipp = peer->ibp_ip; + *portp = peer->ibp_port; *persistencep = peer->ibp_persistence; read_unlock (&kibnal_data.kib_global_lock); @@ -464,7 +1126,7 @@ kibnal_get_peer_info (int index, ptl_nid_t *nidp, int *persistencep) } int -kibnal_add_persistent_peer (ptl_nid_t nid) +kibnal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port) { unsigned long flags; kib_peer_t *peer; @@ -489,6 +1151,8 @@ kibnal_add_persistent_peer (ptl_nid_t nid) kibnal_nid2peerlist (nid)); } + peer->ibp_ip = ip; + peer->ibp_port = port; peer->ibp_persistence++; write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags); @@ -773,7 +1437,7 @@ kibnal_destroy_conn (kib_conn_t *conn) /* I just nuked the last connection on shutdown; wake up * everyone so they can exit. */ wake_up_all(&kibnal_data.kib_sched_waitq); - wake_up_all(&kibnal_data.kib_connd_waitq); + wake_up_all(&kibnal_data.kib_reaper_waitq); } } @@ -793,12 +1457,12 @@ kibnal_put_conn (kib_conn_t *conn) /* last ref only goes on zombies */ LASSERT (conn->ibc_state == IBNAL_CONN_ZOMBIE); - spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags); + spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags); - list_add (&conn->ibc_list, &kibnal_data.kib_connd_conns); - wake_up (&kibnal_data.kib_connd_waitq); + list_add (&conn->ibc_list, &kibnal_data.kib_reaper_conns); + wake_up (&kibnal_data.kib_reaper_waitq); - spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); + spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags); } int @@ -898,20 +1562,24 @@ kibnal_cmd(struct portals_cfg *pcfg, void * private) switch(pcfg->pcfg_command) { case NAL_CMD_GET_PEER: { ptl_nid_t nid = 0; + __u32 ip = 0; + int port = 0; int share_count = 0; rc = kibnal_get_peer_info(pcfg->pcfg_count, - &nid, &share_count); + &nid, &ip, &port, &share_count); pcfg->pcfg_nid = nid; pcfg->pcfg_size = 0; - pcfg->pcfg_id = 0; - pcfg->pcfg_misc = 0; + pcfg->pcfg_id = ip; + pcfg->pcfg_misc = port; pcfg->pcfg_count = 0; pcfg->pcfg_wait = share_count; break; } case NAL_CMD_ADD_PEER: { - rc = kibnal_add_persistent_peer (pcfg->pcfg_nid); + rc = kibnal_add_persistent_peer (pcfg->pcfg_nid, + pcfg->pcfg_id, /* IP */ + pcfg->pcfg_misc); /* port */ break; } case NAL_CMD_DEL_PEER: { @@ -1176,12 +1844,14 @@ kibnal_api_shutdown (nal_t *nal) LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0); LASSERT (list_empty (&kibnal_data.kib_sched_rxq)); LASSERT (list_empty (&kibnal_data.kib_sched_txq)); - LASSERT (list_empty (&kibnal_data.kib_connd_conns)); + LASSERT (list_empty (&kibnal_data.kib_reaper_conns)); LASSERT (list_empty (&kibnal_data.kib_connd_peers)); + LASSERT (list_empty (&kibnal_data.kib_connd_acceptq)); /* flag threads to terminate; wake and wait for them to die */ kibnal_data.kib_shutdown = 1; wake_up_all (&kibnal_data.kib_sched_waitq); + wake_up_all (&kibnal_data.kib_reaper_waitq); wake_up_all (&kibnal_data.kib_connd_waitq); i = 2; @@ -1221,6 +1891,7 @@ kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid, ptl_ni_limits_t *requested_limits, ptl_ni_limits_t *actual_limits) { + struct timeval tv; ptl_process_id_t process_id; int pkmem = atomic_read(&portal_kmemory); int rc; @@ -1240,9 +1911,11 @@ kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid, memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */ + do_gettimeofday(&tv); + kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; + init_MUTEX (&kibnal_data.kib_nid_mutex); - init_MUTEX_LOCKED (&kibnal_data.kib_nid_signal); - kibnal_data.kib_nid = PTL_NID_ANY; + init_MUTEX_LOCKED (&kibnal_data.kib_listener_signal); rwlock_init(&kibnal_data.kib_global_lock); @@ -1255,9 +1928,13 @@ kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid, for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) INIT_LIST_HEAD(&kibnal_data.kib_peers[i]); + spin_lock_init (&kibnal_data.kib_reaper_lock); + INIT_LIST_HEAD (&kibnal_data.kib_reaper_conns); + init_waitqueue_head (&kibnal_data.kib_reaper_waitq); + spin_lock_init (&kibnal_data.kib_connd_lock); + INIT_LIST_HEAD (&kibnal_data.kib_connd_acceptq); INIT_LIST_HEAD (&kibnal_data.kib_connd_peers); - INIT_LIST_HEAD (&kibnal_data.kib_connd_conns); init_waitqueue_head (&kibnal_data.kib_connd_waitq); spin_lock_init (&kibnal_data.kib_sched_lock); @@ -1283,7 +1960,7 @@ kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid, process_id.pid = requested_pid; - process_id.nid = kibnal_data.kib_nid; + process_id.nid = PTL_NID_ANY; /* don't know my NID yet */ rc = lib_init(&kibnal_lib, nal, process_id, requested_limits, actual_limits); @@ -1306,9 +1983,19 @@ kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid, } } - rc = kibnal_thread_start (kibnal_connd, NULL); + for (i = 0; i < IBNAL_N_CONND; i++) { + rc = kibnal_thread_start (kibnal_connd, + (void *)((unsigned long)i)); + if (rc != 0) { + CERROR("Can't spawn openibnal connd[%d]: %d\n", + i, rc); + goto failed; + } + } + + rc = kibnal_thread_start (kibnal_reaper, NULL); if (rc != 0) { - CERROR ("Can't spawn openibnal connd: %d\n", rc); + CERROR ("Can't spawn openibnal reaper: %d\n", rc); goto failed; } @@ -1442,10 +2129,8 @@ kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid, void __exit kibnal_module_fini (void) { -#ifdef CONFIG_SYSCTL if (kibnal_tunables.kib_sysctl != NULL) unregister_sysctl_table (kibnal_tunables.kib_sysctl); -#endif PtlNIFini(kibnal_ni); ptl_unregister_nal(OPENIBNAL); @@ -1457,13 +2142,19 @@ kibnal_module_init (void) int rc; /* the following must be sizeof(int) for proc_dointvec() */ - LASSERT(sizeof (kibnal_tunables.kib_io_timeout) == sizeof (int)); + LASSERT (sizeof(kibnal_tunables.kib_io_timeout) == sizeof(int)); + LASSERT (sizeof(kibnal_tunables.kib_listener_timeout) == sizeof(int)); + LASSERT (sizeof(kibnal_tunables.kib_backlog) == sizeof(int)); + LASSERT (sizeof(kibnal_tunables.kib_port) == sizeof(int)); kibnal_api.nal_ni_init = kibnal_api_startup; kibnal_api.nal_ni_fini = kibnal_api_shutdown; /* Initialise dynamic tunables to defaults once only */ kibnal_tunables.kib_io_timeout = IBNAL_IO_TIMEOUT; + kibnal_tunables.kib_listener_timeout = IBNAL_LISTENER_TIMEOUT; + kibnal_tunables.kib_backlog = IBNAL_BACKLOG; + kibnal_tunables.kib_port = IBNAL_PORT; rc = ptl_register_nal(OPENIBNAL, &kibnal_api); if (rc != PTL_OK) { @@ -1478,11 +2169,15 @@ kibnal_module_init (void) return (-ENODEV); } -#ifdef CONFIG_SYSCTL - /* Press on regardless even if registering sysctl doesn't work */ kibnal_tunables.kib_sysctl = register_sysctl_table (kibnal_top_ctl_table, 0); -#endif + if (kibnal_tunables.kib_sysctl == NULL) { + CERROR("Can't register sysctl table\n"); + PtlNIFini(kibnal_ni); + ptl_unregister_nal(OPENIBNAL); + return (-ENOMEM); + } + return (0); } diff --git a/lnet/klnds/openiblnd/openiblnd.h b/lnet/klnds/openiblnd/openiblnd.h index fd7bc61..c3e9837 100644 --- a/lnet/klnds/openiblnd/openiblnd.h +++ b/lnet/klnds/openiblnd/openiblnd.h @@ -48,6 +48,9 @@ #include #include +#include +#include + #define DEBUG_SUBSYSTEM S_NAL #include @@ -59,39 +62,40 @@ #include #include -#define IBNAL_SERVICE_NAME "openibnal" -#define IBNAL_CHECK_ADVERT 1 - #if CONFIG_SMP # define IBNAL_N_SCHED num_online_cpus() /* # schedulers */ #else # define IBNAL_N_SCHED 1 /* # schedulers */ #endif +#define IBNAL_N_CONND 4 /* # connection daemons */ #define IBNAL_MIN_RECONNECT_INTERVAL HZ /* first failed connection retry... */ #define IBNAL_MAX_RECONNECT_INTERVAL (60*HZ) /* ...exponentially increasing to this */ -#define IBNAL_MSG_SIZE (4<<10) /* max size of queued messages (inc hdr) */ +#define IBNAL_MSG_SIZE (4<<10) /* max size of queued messages (inc hdr) */ -#define IBNAL_MSG_QUEUE_SIZE 8 /* # messages/RDMAs in-flight */ -#define IBNAL_CREDIT_HIGHWATER 6 /* when to eagerly return credits */ -#define IBNAL_RETRY 7 /* # times to retry */ -#define IBNAL_RNR_RETRY 7 /* */ -#define IBNAL_CM_RETRY 7 /* # times to retry connection */ -#define IBNAL_FLOW_CONTROL 1 +#define IBNAL_MSG_QUEUE_SIZE 8 /* # messages/RDMAs in-flight */ +#define IBNAL_CREDIT_HIGHWATER 6 /* when to eagerly return credits */ +#define IBNAL_RETRY 7 /* # times to retry */ +#define IBNAL_RNR_RETRY 7 /* */ +#define IBNAL_CM_RETRY 7 /* # times to retry connection */ +#define IBNAL_FLOW_CONTROL 1 #define IBNAL_RESPONDER_RESOURCES 8 -#define IBNAL_NTX 64 /* # tx descs */ -#define IBNAL_NTX_NBLK 256 /* # reserved tx descs */ +#define IBNAL_NTX 64 /* # tx descs */ +#define IBNAL_NTX_NBLK 256 /* # reserved tx descs */ -#define IBNAL_PEER_HASH_SIZE 101 /* # peer lists */ +#define IBNAL_PEER_HASH_SIZE 101 /* # peer lists */ -#define IBNAL_RESCHED 100 /* # scheduler loops before reschedule */ +#define IBNAL_RESCHED 100 /* # scheduler loops before reschedule */ -#define IBNAL_CONCURRENT_PEERS 1000 /* # nodes all talking at once to me */ +#define IBNAL_CONCURRENT_PEERS 1000 /* # nodes all talking at once to me */ /* default vals for runtime tunables */ -#define IBNAL_IO_TIMEOUT 50 /* default comms timeout (seconds) */ +#define IBNAL_IO_TIMEOUT 50 /* default comms timeout (seconds) */ +#define IBNAL_LISTENER_TIMEOUT 5 /* default listener timeout (seconds) */ +#define IBNAL_BACKLOG 127 /* default listener backlog */ +#define IBNAL_PORT 988 /* default listener port */ /************************/ /* derived constants... */ @@ -113,13 +117,16 @@ #define IBNAL_RDMA_BASE 0x0eeb0000 #define IBNAL_FMR 1 -#define IBNAL_CKSUM 0 +#define IBNAL_CKSUM 1 //#define IBNAL_CALLBACK_CTXT IB_CQ_CALLBACK_PROCESS #define IBNAL_CALLBACK_CTXT IB_CQ_CALLBACK_INTERRUPT typedef struct { int kib_io_timeout; /* comms timeout (seconds) */ + int kib_listener_timeout; /* listener's timeout */ + int kib_backlog; /* listenter's accept backlog */ + int kib_port; /* where the listener listens */ struct ctl_table_header *kib_sysctl; /* sysctl interface */ } kib_tunables_t; @@ -141,11 +148,17 @@ typedef struct int kib_shutdown; /* shut down? */ atomic_t kib_nthreads; /* # live threads */ - __u64 kib_service_id; /* service number I listen on */ + __u64 kib_svc_id; /* service number I listen on */ + tTS_IB_GID kib_svc_gid; /* device/port GID */ + __u16 kib_svc_pkey; /* device/port pkey */ + ptl_nid_t kib_nid; /* my NID */ struct semaphore kib_nid_mutex; /* serialise NID ops */ - struct semaphore kib_nid_signal; /* signal completion */ - + struct semaphore kib_listener_signal; /* signal IP listener completion */ + struct socket *kib_listener_sock; /* IP listener's socket */ + int kib_listener_shutdown; /* ask IP listener to close */ + void *kib_listen_handle; /* IB listen handle */ + rwlock_t kib_global_lock; /* stabilize peer/conn ops */ struct list_head *kib_peers; /* hash table of all my known peers */ @@ -153,10 +166,14 @@ typedef struct atomic_t kib_npeers; /* # peers extant */ atomic_t kib_nconns; /* # connections extant */ - struct list_head kib_connd_conns; /* connections to progress */ + struct list_head kib_reaper_conns; /* connections to reap */ + wait_queue_head_t kib_reaper_waitq; /* reaper sleeps here */ + unsigned long kib_reaper_waketime; /* when reaper will wake */ + spinlock_t kib_reaper_lock; /* serialise */ + struct list_head kib_connd_peers; /* peers waiting for a connection */ + struct list_head kib_connd_acceptq; /* accepted sockets to handle */ wait_queue_head_t kib_connd_waitq; /* connection daemons sleep here */ - unsigned long kib_connd_waketime; /* when connd will wake */ spinlock_t kib_connd_lock; /* serialise */ wait_queue_head_t kib_sched_waitq; /* schedulers sleep here */ @@ -182,7 +199,6 @@ typedef struct struct ib_fmr_pool *kib_fmr_pool; /* fast memory region pool */ #endif struct ib_cq *kib_cq; /* completion queue */ - void *kib_listen_handle; /* where I listen for connections */ } kib_data_t; @@ -195,13 +211,31 @@ typedef struct #define IBNAL_INIT_CQ 6 #define IBNAL_INIT_ALL 7 +typedef struct kib_acceptsock /* accepted socket queued for connd */ +{ + struct list_head ibas_list; /* queue for attention */ + struct socket *ibas_sock; /* the accepted socket */ +} kib_acceptsock_t; + /************************************************************************ - * Wire message structs. + * IB Wire message format. * These are sent in sender's byte order (i.e. receiver flips). - * CAVEAT EMPTOR: other structs communicated between nodes (e.g. MAD - * private data and SM service info), is LE on the wire. + * They may be sent via TCP/IP (service ID,GID,PKEY query/response), + * as private data in the connection request/response, or "normally". */ +typedef struct kib_svcrsp /* service response */ +{ + __u64 ibsr_svc_id; /* service's id */ + __u8 ibsr_svc_gid[16]; /* service's gid */ + __u16 ibsr_svc_pkey; /* service's pkey */ +} kib_svcrsp_t; + +typedef struct kib_connparams +{ + __u32 ibcp_queue_depth; +} kib_connparams_t; + typedef struct { union { @@ -215,12 +249,11 @@ typedef struct typedef struct { - __u32 rd_key; /* remote key */ - __u32 rd_nob; /* # of bytes */ - __u64 rd_addr; /* remote io vaddr */ + __u32 rd_key; /* remote key */ + __u32 rd_nob; /* # of bytes */ + __u64 rd_addr; /* remote io vaddr */ } kib_rdma_desc_t; - typedef struct { ptl_hdr_t ibim_hdr; /* portals header */ @@ -242,15 +275,21 @@ typedef struct typedef struct { - __u32 ibm_magic; /* I'm an openibnal message */ - __u16 ibm_version; /* this is my version number */ - __u8 ibm_type; /* msg type */ - __u8 ibm_credits; /* returned credits */ -#if IBNAL_CKSUM - __u32 ibm_nob; - __u32 ibm_cksum; -#endif + /* First 2 fields fixed FOR ALL TIME */ + __u32 ibm_magic; /* I'm an openibnal message */ + __u16 ibm_version; /* this is my version number */ + + __u8 ibm_type; /* msg type */ + __u8 ibm_credits; /* returned credits */ + __u32 ibm_nob; /* # bytes in whole message */ + __u32 ibm_cksum; /* checksum (0 == no checksum) */ + __u64 ibm_srcnid; /* sender's NID */ + __u64 ibm_srcstamp; /* sender's incarnation */ + __u64 ibm_dstnid; /* destination's NID */ + __u64 ibm_dststamp; /* destination's incarnation */ union { + kib_svcrsp_t svcrsp; + kib_connparams_t connparams; kib_immediate_msg_t immediate; kib_rdma_msg_t rdma; kib_completion_msg_t completion; @@ -258,8 +297,12 @@ typedef struct } kib_msg_t; #define IBNAL_MSG_MAGIC 0x0be91b91 /* unique magic */ -#define IBNAL_MSG_VERSION 1 /* current protocol version */ +#define IBNAL_MSG_VERSION 2 /* current protocol version */ +#define IBNAL_MSG_SVCQRY 0xb0 /* service query */ +#define IBNAL_MSG_SVCRSP 0xb1 /* service response */ +#define IBNAL_MSG_CONNREQ 0xc0 /* connection request */ +#define IBNAL_MSG_CONNACK 0xc1 /* connection acknowledge */ #define IBNAL_MSG_NOOP 0xd0 /* nothing (just credits) */ #define IBNAL_MSG_IMMEDIATE 0xd1 /* portals hdr + payload */ #define IBNAL_MSG_PUT_RDMA 0xd2 /* portals PUT hdr + source rdma desc */ @@ -306,25 +349,16 @@ typedef struct kib_tx /* transmit message */ #define KIB_TX_MAPPED 1 #define KIB_TX_MAPPED_FMR 2 -typedef struct kib_wire_connreq -{ - __u32 wcr_magic; /* I'm an openibnal connreq */ - __u16 wcr_version; /* this is my version number */ - __u16 wcr_queue_depth; /* this is my receive queue size */ - __u64 wcr_nid; /* peer's NID */ - __u64 wcr_incarnation; /* peer's incarnation */ -} kib_wire_connreq_t; - typedef struct kib_connreq { - /* connection-in-progress */ - struct kib_conn *cr_conn; - kib_wire_connreq_t cr_wcr; - __u64 cr_tid; - struct ib_common_attrib_service cr_service; - tTS_IB_GID cr_gid; - struct ib_path_record cr_path; - struct ib_cm_active_param cr_connparam; + /* active connection-in-progress state */ + struct kib_conn *cr_conn; + kib_msg_t cr_msg; + __u64 cr_tid; + tTS_IB_GID cr_gid; + kib_svcrsp_t cr_svcrsp; + struct ib_path_record cr_path; + struct ib_cm_active_param cr_connparam; } kib_connreq_t; typedef struct kib_conn @@ -361,6 +395,9 @@ typedef struct kib_peer struct list_head ibp_list; /* stash on global peer list */ struct list_head ibp_connd_list; /* schedule on kib_connd_peers */ ptl_nid_t ibp_nid; /* who's on the other end(s) */ + __u32 ibp_ip; /* IP to query for peer conn params */ + int ibp_port; /* port to qery for peer conn params */ + __u64 ibp_incarnation; /* peer's incarnation */ atomic_t ibp_refcount; /* # users */ int ibp_persistence; /* "known" peer refs */ struct list_head ibp_conns; /* all active connections */ @@ -370,7 +407,6 @@ typedef struct kib_peer unsigned long ibp_reconnect_interval; /* exponential backoff */ } kib_peer_t; - extern lib_nal_t kibnal_lib; extern kib_data_t kibnal_data; extern kib_tunables_t kibnal_tunables; @@ -403,34 +439,6 @@ kibnal_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn) list_add_tail(&tx->tx_list, &conn->ibc_tx_queue); } -#define KIBNAL_SERVICE_KEY_MASK (IB_SA_SERVICE_COMP_MASK_NAME | \ - IB_SA_SERVICE_COMP_MASK_DATA8_1 | \ - IB_SA_SERVICE_COMP_MASK_DATA8_2 | \ - IB_SA_SERVICE_COMP_MASK_DATA8_3 | \ - IB_SA_SERVICE_COMP_MASK_DATA8_4 | \ - IB_SA_SERVICE_COMP_MASK_DATA8_5 | \ - IB_SA_SERVICE_COMP_MASK_DATA8_6 | \ - IB_SA_SERVICE_COMP_MASK_DATA8_7 | \ - IB_SA_SERVICE_COMP_MASK_DATA8_8) - -static inline __u64* -kibnal_service_nid_field(struct ib_common_attrib_service *srv) -{ - /* must be consistent with KIBNAL_SERVICE_KEY_MASK */ - return (__u64 *)srv->service_data8; -} - - -static inline void -kibnal_set_service_keys(struct ib_common_attrib_service *srv, ptl_nid_t nid) -{ - LASSERT (strlen (IBNAL_SERVICE_NAME) < sizeof(srv->service_name)); - memset (srv->service_name, 0, sizeof(srv->service_name)); - strcpy (srv->service_name, IBNAL_SERVICE_NAME); - - *kibnal_service_nid_field(srv) = cpu_to_le64(nid); -} - #if 0 static inline void kibnal_show_rdma_attr (kib_conn_t *conn) @@ -486,6 +494,29 @@ kibnal_wreqid_is_rx (__u64 wreqid) return (wreqid & 1) != 0; } +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0)) +# define sk_allocation allocation +# define sk_data_ready data_ready +# define sk_write_space write_space +# define sk_user_data user_data +# define sk_prot prot +# define sk_sndbuf sndbuf +# define sk_socket socket +# define sk_wmem_queued wmem_queued +# define sk_err err +# define sk_sleep sleep +#endif + +extern void kibnal_init_msg(kib_msg_t *msg, int type, int body_nob); +extern void kibnal_pack_msg(kib_msg_t *msg, int credits, + ptl_nid_t dstnid, __u64 dststamp); +extern int kibnal_unpack_msg(kib_msg_t *msg, int nob); +extern void kibnal_handle_svcqry (struct socket *sock); +extern int kibnal_make_svcqry (kib_conn_t *conn); +extern void kibnal_free_acceptsock (kib_acceptsock_t *as); +extern int kibnal_listener_procint(ctl_table *table, int write, + struct file *filp, void *buffer, + size_t *lenp); extern kib_peer_t *kibnal_create_peer (ptl_nid_t nid); extern void kibnal_put_peer (kib_peer_t *peer); extern int kibnal_del_peer (ptl_nid_t nid, int single_share); @@ -513,6 +544,7 @@ extern void kibnal_destroy_conn (kib_conn_t *conn); extern int kibnal_thread_start (int (*fn)(void *arg), void *arg); extern int kibnal_scheduler(void *arg); extern int kibnal_connd (void *arg); +extern int kibnal_reaper (void *arg); extern void kibnal_callback (struct ib_cq *cq, struct ib_cq_entry *e, void *arg); extern void kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob); extern int kibnal_close_conn (kib_conn_t *conn, int why); diff --git a/lnet/klnds/openiblnd/openiblnd_cb.c b/lnet/klnds/openiblnd/openiblnd_cb.c index f6f18ff..7226ff9 100644 --- a/lnet/klnds/openiblnd/openiblnd_cb.c +++ b/lnet/klnds/openiblnd/openiblnd_cb.c @@ -292,34 +292,15 @@ kibnal_post_rx (kib_rx_t *rx, int do_credits) kibnal_put_conn (conn); } -#if IBNAL_CKSUM -__u32 kibnal_cksum (void *ptr, int nob) -{ - char *c = ptr; - __u32 sum = 0; - - while (nob-- > 0) - sum = ((sum << 1) | (sum >> 31)) + *c++; - - return (sum); -} -#endif - void kibnal_rx_callback (struct ib_cq_entry *e) { kib_rx_t *rx = (kib_rx_t *)kibnal_wreqid2ptr(e->work_request_id); kib_msg_t *msg = rx->rx_msg; kib_conn_t *conn = rx->rx_conn; - int nob = e->bytes_transferred; - const int base_nob = offsetof(kib_msg_t, ibm_u); int credits; - int flipped; unsigned long flags; -#if IBNAL_CKSUM - __u32 msg_cksum; - __u32 computed_cksum; -#endif + int rc; CDEBUG (D_NET, "rx %p conn %p\n", rx, conn); LASSERT (rx->rx_posted); @@ -340,51 +321,21 @@ kibnal_rx_callback (struct ib_cq_entry *e) goto failed; } - if (nob < base_nob) { - CERROR ("Short rx from "LPX64": %d\n", - conn->ibc_peer->ibp_nid, nob); - goto failed; - } - - /* Receiver does any byte flipping if necessary... */ - - if (msg->ibm_magic == IBNAL_MSG_MAGIC) { - flipped = 0; - } else { - if (msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) { - CERROR ("Unrecognised magic: %08x from "LPX64"\n", - msg->ibm_magic, conn->ibc_peer->ibp_nid); - goto failed; - } - flipped = 1; - __swab16s (&msg->ibm_version); - LASSERT (sizeof(msg->ibm_type) == 1); - LASSERT (sizeof(msg->ibm_credits) == 1); - } - - if (msg->ibm_version != IBNAL_MSG_VERSION) { - CERROR ("Incompatible msg version %d (%d expected)\n", - msg->ibm_version, IBNAL_MSG_VERSION); - goto failed; - } - -#if IBNAL_CKSUM - if (nob != msg->ibm_nob) { - CERROR ("Unexpected # bytes %d (%d expected)\n", nob, msg->ibm_nob); + rc = kibnal_unpack_msg(msg, e->bytes_transferred); + if (rc != 0) { + CERROR ("Error %d unpacking rx from "LPX64"\n", + rc, conn->ibc_peer->ibp_nid); goto failed; } - msg_cksum = le32_to_cpu(msg->ibm_cksum); - msg->ibm_cksum = 0; - computed_cksum = kibnal_cksum (msg, nob); - - if (msg_cksum != computed_cksum) { - CERROR ("Checksum failure %d: (%d expected)\n", - computed_cksum, msg_cksum); + if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid || + msg->ibm_srcstamp != conn->ibc_incarnation || + msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid || + msg->ibm_dststamp != kibnal_data.kib_incarnation) { + CERROR ("Stale rx from "LPX64"\n", + conn->ibc_peer->ibp_nid); goto failed; } - CDEBUG(D_NET, "cksum %x, nob %d\n", computed_cksum, nob); -#endif /* Have I received credits that will let me send? */ credits = msg->ibm_credits; @@ -402,25 +353,10 @@ kibnal_rx_callback (struct ib_cq_entry *e) return; case IBNAL_MSG_IMMEDIATE: - if (nob < base_nob + sizeof (kib_immediate_msg_t)) { - CERROR ("Short IMMEDIATE from "LPX64": %d\n", - conn->ibc_peer->ibp_nid, nob); - goto failed; - } break; case IBNAL_MSG_PUT_RDMA: case IBNAL_MSG_GET_RDMA: - if (nob < base_nob + sizeof (kib_rdma_msg_t)) { - CERROR ("Short RDMA msg from "LPX64": %d\n", - conn->ibc_peer->ibp_nid, nob); - goto failed; - } - if (flipped) { - __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_key); - __swab32s(&msg->ibm_u.rdma.ibrm_desc.rd_nob); - __swab64s(&msg->ibm_u.rdma.ibrm_desc.rd_addr); - } CDEBUG(D_NET, "%d RDMA: cookie "LPX64", key %x, addr "LPX64", nob %d\n", msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie, msg->ibm_u.rdma.ibrm_desc.rd_key, @@ -430,14 +366,6 @@ kibnal_rx_callback (struct ib_cq_entry *e) case IBNAL_MSG_PUT_DONE: case IBNAL_MSG_GET_DONE: - if (nob < base_nob + sizeof (kib_completion_msg_t)) { - CERROR ("Short COMPLETION msg from "LPX64": %d\n", - conn->ibc_peer->ibp_nid, nob); - goto failed; - } - if (flipped) - __swab32s(&msg->ibm_u.completion.ibcm_status); - CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n", msg->ibm_type, msg->ibm_u.completion.ibcm_cookie, msg->ibm_u.completion.ibcm_status); @@ -449,8 +377,8 @@ kibnal_rx_callback (struct ib_cq_entry *e) return; default: - CERROR ("Can't parse type from "LPX64": %d\n", - conn->ibc_peer->ibp_nid, msg->ibm_type); + CERROR ("Bad msg type %x from "LPX64"\n", + msg->ibm_type, conn->ibc_peer->ibp_nid); goto failed; } @@ -800,20 +728,17 @@ kibnal_check_sends (kib_conn_t *conn) continue; } - tx->tx_msg->ibm_credits = conn->ibc_outstanding_credits; - conn->ibc_outstanding_credits = 0; + kibnal_pack_msg(tx->tx_msg, conn->ibc_outstanding_credits, + conn->ibc_peer->ibp_nid, conn->ibc_incarnation); + conn->ibc_outstanding_credits = 0; conn->ibc_nsends_posted++; conn->ibc_credits--; tx->tx_sending = tx->tx_nsp; tx->tx_passive_rdma_wait = tx->tx_passive_rdma; list_add (&tx->tx_list, &conn->ibc_active_txs); -#if IBNAL_CKSUM - tx->tx_msg->ibm_cksum = 0; - tx->tx_msg->ibm_cksum = kibnal_cksum(tx->tx_msg, tx->tx_msg->ibm_nob); - CDEBUG(D_NET, "cksum %x, nob %d\n", tx->tx_msg->ibm_cksum, tx->tx_msg->ibm_nob); -#endif + spin_unlock_irqrestore (&conn->ibc_lock, flags); /* NB the gap between removing tx from the queue and sending it @@ -949,13 +874,9 @@ kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob) LASSERT (tx->tx_nsp >= 0 && tx->tx_nsp < sizeof(tx->tx_sp)/sizeof(tx->tx_sp[0])); LASSERT (nob <= IBNAL_MSG_SIZE); - - tx->tx_msg->ibm_magic = IBNAL_MSG_MAGIC; - tx->tx_msg->ibm_version = IBNAL_MSG_VERSION; - tx->tx_msg->ibm_type = type; -#if IBNAL_CKSUM - tx->tx_msg->ibm_nob = nob; -#endif + + kibnal_init_msg(tx->tx_msg, type, body_nob); + /* Fence the message if it's bundled with an RDMA read */ fence = (tx->tx_nsp > 0) && (type == IBNAL_MSG_PUT_DONE); @@ -1283,7 +1204,7 @@ kibnal_start_active_rdma (int type, int status, } else { LASSERT (tx->tx_nsp == 1); /* No RDMA: local completion happens now! */ - CDEBUG(D_WARNING,"No data: immediate completion\n"); + CDEBUG(D_NET, "No data: immediate completion\n"); lib_finalize (&kibnal_lib, NULL, libmsg, status == 0 ? PTL_OK : PTL_FAIL); } @@ -1538,7 +1459,7 @@ void kibnal_close_conn_locked (kib_conn_t *conn, int error) { /* This just does the immmediate housekeeping, and schedules the - * connection for the connd to finish off. + * connection for the reaper to finish off. * Caller holds kib_global_lock exclusively in irq context */ kib_peer_t *peer = conn->ibc_peer; @@ -1549,10 +1470,10 @@ kibnal_close_conn_locked (kib_conn_t *conn, int error) conn->ibc_state == IBNAL_CONN_CONNECTING); if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) { - /* kib_connd_conns takes ibc_list's ref */ + /* kib_reaper_conns takes ibc_list's ref */ list_del (&conn->ibc_list); } else { - /* new ref for kib_connd_conns */ + /* new ref for kib_reaper_conns */ CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n", conn, conn->ibc_state, conn->ibc_peer->ibp_nid, atomic_read (&conn->ibc_refcount)); @@ -1568,12 +1489,12 @@ kibnal_close_conn_locked (kib_conn_t *conn, int error) conn->ibc_state = IBNAL_CONN_DEATHROW; /* Schedule conn for closing/destruction */ - spin_lock (&kibnal_data.kib_connd_lock); + spin_lock (&kibnal_data.kib_reaper_lock); - list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns); - wake_up (&kibnal_data.kib_connd_waitq); + list_add_tail (&conn->ibc_list, &kibnal_data.kib_reaper_conns); + wake_up (&kibnal_data.kib_reaper_waitq); - spin_unlock (&kibnal_data.kib_connd_lock); + spin_unlock (&kibnal_data.kib_reaper_lock); } int @@ -1706,6 +1627,8 @@ kibnal_connreq_done (kib_conn_t *conn, int active, int status) if (status == 0) { /* Everything worked! */ +#warning "purge old conn incarnations" + peer->ibp_connecting--; /* +1 ref for ibc_list; caller(== CM)'s ref remains until @@ -1765,7 +1688,7 @@ kibnal_connreq_done (kib_conn_t *conn, int active, int status) /* connection failed */ if (state == IBNAL_CONN_CONNECTING) { - /* schedule for connd to close */ + /* schedule for reaper to close */ kibnal_close_conn_locked (conn, status); } else { /* Don't have a CM comm_id; just wait for refs to drain */ @@ -1785,24 +1708,41 @@ kibnal_connreq_done (kib_conn_t *conn, int active, int status) int kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid, - ptl_nid_t nid, __u64 incarnation, int queue_depth) + kib_msg_t *msg, int nob) { - kib_conn_t *conn = kibnal_create_conn(); + kib_conn_t *conn; kib_peer_t *peer; kib_peer_t *peer2; unsigned long flags; + int rc; - if (conn == NULL) - return (-ENOMEM); + rc = kibnal_unpack_msg(msg, nob); + if (rc != 0) { + CERROR("Can't unpack connreq msg: %d\n", rc); + return -EPROTO; + } + + CDEBUG(D_NET, "connreq from "LPX64"\n", msg->ibm_srcnid); - if (queue_depth != IBNAL_MSG_QUEUE_SIZE) { + if (msg->ibm_type != IBNAL_MSG_CONNREQ) { + CERROR("Unexpected connreq msg type: %x from "LPX64"\n", + msg->ibm_type, msg->ibm_srcnid); + return -EPROTO; + } + + if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) { CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n", - nid, queue_depth, IBNAL_MSG_QUEUE_SIZE); + msg->ibm_srcnid, msg->ibm_u.connparams.ibcp_queue_depth, + IBNAL_MSG_QUEUE_SIZE); return (-EPROTO); } + conn = kibnal_create_conn(); + if (conn == NULL) + return (-ENOMEM); + /* assume 'nid' is a new peer */ - peer = kibnal_create_peer (nid); + peer = kibnal_create_peer (msg->ibm_srcnid); if (peer == NULL) { CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n", conn, conn->ibc_state, conn->ibc_peer->ibp_nid, @@ -1814,11 +1754,27 @@ kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid, write_lock_irqsave (&kibnal_data.kib_global_lock, flags); - peer2 = kibnal_find_peer_locked(nid); + /* Check I'm the same instance that gave the connection parameters. + * NB If my incarnation changes after this, the peer will get nuked and + * we'll spot that when the connection is finally added into the peer's + * connlist */ + if (msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid || + msg->ibm_dststamp != kibnal_data.kib_incarnation) { + write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags); + + CERROR("Stale connection params from "LPX64"\n", + msg->ibm_srcnid); + atomic_dec(&conn->ibc_refcount); + kibnal_destroy_conn(conn); + kibnal_put_peer(peer); + return -ESTALE; + } + + peer2 = kibnal_find_peer_locked(msg->ibm_srcnid); if (peer2 == NULL) { /* peer table takes my ref on peer */ list_add_tail (&peer->ibp_list, - kibnal_nid2peerlist(nid)); + kibnal_nid2peerlist(msg->ibm_srcnid)); } else { kibnal_put_peer (peer); peer = peer2; @@ -1833,7 +1789,7 @@ kibnal_accept (kib_conn_t **connp, tTS_IB_CM_COMM_ID cid, conn->ibc_peer = peer; conn->ibc_state = IBNAL_CONN_CONNECTING; conn->ibc_comm_id = cid; - conn->ibc_incarnation = incarnation; + conn->ibc_incarnation = msg->ibm_srcstamp; conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE; *connp = conn; @@ -1951,7 +1907,7 @@ kibnal_passive_conn_callback (tTS_IB_CM_EVENT event, void *param, void *arg) { - kib_conn_t *conn = arg; + kib_conn_t *conn = arg; int rc; switch (event) { @@ -1969,56 +1925,37 @@ kibnal_passive_conn_callback (tTS_IB_CM_EVENT event, case TS_IB_CM_REQ_RECEIVED: { struct ib_cm_req_received_param *req = param; - kib_wire_connreq_t *wcr = req->remote_private_data; + kib_msg_t *msg = req->remote_private_data; LASSERT (conn == NULL); - CDEBUG(D_NET, "REQ from "LPX64"\n", le64_to_cpu(wcr->wcr_nid)); - - if (req->remote_private_data_len < sizeof (*wcr)) { - CERROR("Connect from remote LID %04x: too short %d\n", - req->dlid, req->remote_private_data_len); - return TS_IB_CM_CALLBACK_ABORT; - } + /* Don't really know srcnid until successful unpack */ + CDEBUG(D_NET, "REQ from ?"LPX64"?\n", msg->ibm_srcnid); - if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) { - CERROR ("Can't accept LID %04x: bad magic %08x\n", - req->dlid, le32_to_cpu(wcr->wcr_magic)); - return TS_IB_CM_CALLBACK_ABORT; - } - - if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) { - CERROR ("Can't accept LID %04x: bad version %d\n", - req->dlid, le16_to_cpu(wcr->wcr_magic)); - return TS_IB_CM_CALLBACK_ABORT; - } - - rc = kibnal_accept(&conn, - cid, - le64_to_cpu(wcr->wcr_nid), - le64_to_cpu(wcr->wcr_incarnation), - le16_to_cpu(wcr->wcr_queue_depth)); + rc = kibnal_accept(&conn, cid, msg, + req->remote_private_data_len); if (rc != 0) { - CERROR ("Can't accept "LPX64": %d\n", - le64_to_cpu(wcr->wcr_nid), rc); + CERROR ("Can't accept ?"LPX64"?: %d\n", + msg->ibm_srcnid, rc); return TS_IB_CM_CALLBACK_ABORT; } /* update 'arg' for next callback */ - rc = tsIbCmCallbackModify(cid, - kibnal_passive_conn_callback, conn); + rc = tsIbCmCallbackModify(cid, kibnal_passive_conn_callback, conn); LASSERT (rc == 0); + msg = req->accept_param.reply_private_data; + kibnal_init_msg(msg, IBNAL_MSG_CONNACK, + sizeof(msg->ibm_u.connparams)); + + msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE; + + kibnal_pack_msg(msg, 0, + conn->ibc_peer->ibp_nid, + conn->ibc_incarnation); + req->accept_param.qp = conn->ibc_qp; - *((kib_wire_connreq_t *)req->accept_param.reply_private_data) - = (kib_wire_connreq_t) { - .wcr_magic = cpu_to_le32(IBNAL_MSG_MAGIC), - .wcr_version = cpu_to_le16(IBNAL_MSG_VERSION), - .wcr_queue_depth = cpu_to_le32(IBNAL_MSG_QUEUE_SIZE), - .wcr_nid = cpu_to_le64(kibnal_data.kib_nid), - .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation), - }; - req->accept_param.reply_private_data_len = sizeof(kib_wire_connreq_t); + req->accept_param.reply_private_data_len = msg->ibm_nob; req->accept_param.responder_resources = IBNAL_RESPONDER_RESOURCES; req->accept_param.initiator_depth = IBNAL_RESPONDER_RESOURCES; req->accept_param.rnr_retry_count = IBNAL_RNR_RETRY; @@ -2052,48 +1989,46 @@ kibnal_active_conn_callback (tTS_IB_CM_EVENT event, switch (event) { case TS_IB_CM_REP_RECEIVED: { struct ib_cm_rep_received_param *rep = param; - kib_wire_connreq_t *wcr = rep->remote_private_data; + kib_msg_t *msg = rep->remote_private_data; + int nob = rep->remote_private_data_len; + int rc; - if (rep->remote_private_data_len < sizeof (*wcr)) { - CERROR ("Short reply from "LPX64": %d\n", - conn->ibc_peer->ibp_nid, - rep->remote_private_data_len); - kibnal_connreq_done (conn, 1, -EPROTO); + rc = kibnal_unpack_msg(msg, nob); + if (rc != 0) { + CERROR ("Error %d unpacking conn ack from "LPX64"\n", + rc, conn->ibc_peer->ibp_nid); + kibnal_connreq_done (conn, 1, rc); break; } - if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) { - CERROR ("Can't connect "LPX64": bad magic %08x\n", - conn->ibc_peer->ibp_nid, le32_to_cpu(wcr->wcr_magic)); + if (msg->ibm_type != IBNAL_MSG_CONNACK) { + CERROR ("Unexpected conn ack type %d from "LPX64"\n", + msg->ibm_type, conn->ibc_peer->ibp_nid); kibnal_connreq_done (conn, 1, -EPROTO); break; } - - if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) { - CERROR ("Can't connect "LPX64": bad version %d\n", - conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_magic)); - kibnal_connreq_done (conn, 1, -EPROTO); + + if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid || + msg->ibm_srcstamp != conn->ibc_incarnation || + msg->ibm_dstnid != kibnal_lib.libnal_ni.ni_pid.nid || + msg->ibm_dststamp != kibnal_data.kib_incarnation) { + CERROR("Stale conn ack from "LPX64"\n", + conn->ibc_peer->ibp_nid); + kibnal_connreq_done (conn, 1, -ESTALE); break; } - - if (wcr->wcr_queue_depth != cpu_to_le16(IBNAL_MSG_QUEUE_SIZE)) { - CERROR ("Can't connect "LPX64": bad queue depth %d\n", - conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_queue_depth)); + + if (msg->ibm_u.connparams.ibcp_queue_depth != IBNAL_MSG_QUEUE_SIZE) { + CERROR ("Bad queue depth %d from "LPX64"\n", + msg->ibm_u.connparams.ibcp_queue_depth, + conn->ibc_peer->ibp_nid); kibnal_connreq_done (conn, 1, -EPROTO); break; } - if (le64_to_cpu(wcr->wcr_nid) != conn->ibc_peer->ibp_nid) { - CERROR ("Unexpected NID "LPX64" from "LPX64"\n", - le64_to_cpu(wcr->wcr_nid), conn->ibc_peer->ibp_nid); - kibnal_connreq_done (conn, 1, -EPROTO); - break; - } - CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n", conn, conn->ibc_peer->ibp_nid); - conn->ibc_incarnation = le64_to_cpu(wcr->wcr_incarnation); conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE; break; } @@ -2131,7 +2066,9 @@ kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status, void *arg) { kib_conn_t *conn = arg; - + kib_peer_t *peer = conn->ibc_peer; + kib_msg_t *msg = &conn->ibc_connreq->cr_msg; + if (status != 0) { CERROR ("status %d\n", status); kibnal_connreq_done (conn, 1, status); @@ -2140,18 +2077,14 @@ kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status, conn->ibc_connreq->cr_path = *resp; - conn->ibc_connreq->cr_wcr = (kib_wire_connreq_t) { - .wcr_magic = cpu_to_le32(IBNAL_MSG_MAGIC), - .wcr_version = cpu_to_le16(IBNAL_MSG_VERSION), - .wcr_queue_depth = cpu_to_le16(IBNAL_MSG_QUEUE_SIZE), - .wcr_nid = cpu_to_le64(kibnal_data.kib_nid), - .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation), - }; + kibnal_init_msg(msg, IBNAL_MSG_CONNREQ, sizeof(msg->ibm_u.connparams)); + msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE; + kibnal_pack_msg(msg, 0, peer->ibp_nid, conn->ibc_incarnation); conn->ibc_connreq->cr_connparam = (struct ib_cm_active_param) { .qp = conn->ibc_qp, - .req_private_data = &conn->ibc_connreq->cr_wcr, - .req_private_data_len = sizeof(conn->ibc_connreq->cr_wcr), + .req_private_data = msg, + .req_private_data_len = msg->ibm_nob, .responder_resources = IBNAL_RESPONDER_RESOURCES, .initiator_depth = IBNAL_RESPONDER_RESOURCES, .retry_count = IBNAL_RETRY, @@ -2167,14 +2100,13 @@ kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status, /* Flag I'm getting involved with the CM... */ conn->ibc_state = IBNAL_CONN_CONNECTING; - CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n", - conn->ibc_connreq->cr_service.service_id, - *kibnal_service_nid_field(&conn->ibc_connreq->cr_service)); + CDEBUG(D_WARNING, "Connecting to, service id "LPX64", on "LPX64"\n", + conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, peer->ibp_nid); /* kibnal_connect_callback gets my conn ref */ status = ib_cm_connect (&conn->ibc_connreq->cr_connparam, &conn->ibc_connreq->cr_path, NULL, - conn->ibc_connreq->cr_service.service_id, 0, + conn->ibc_connreq->cr_svcrsp.ibsr_svc_id, 0, kibnal_active_conn_callback, conn, &conn->ibc_comm_id); if (status != 0) { @@ -2190,55 +2122,12 @@ kibnal_pathreq_callback (tTS_IB_CLIENT_QUERY_TID tid, int status, } void -kibnal_service_get_callback (tTS_IB_CLIENT_QUERY_TID tid, int status, - struct ib_common_attrib_service *resp, void *arg) -{ - kib_conn_t *conn = arg; - - if (status != 0) { - CERROR ("status %d\n", status); - kibnal_connreq_done (conn, 1, status); - return; - } - - CDEBUG(D_NET, "Got status %d, service id "LPX64", on "LPX64"\n", - status, resp->service_id, - *kibnal_service_nid_field(resp)); - - conn->ibc_connreq->cr_service = *resp; - - status = ib_cached_gid_get(kibnal_data.kib_device, - kibnal_data.kib_port, 0, - conn->ibc_connreq->cr_gid); - LASSERT (status == 0); - - /* kibnal_pathreq_callback gets my conn ref */ - status = tsIbPathRecordRequest (kibnal_data.kib_device, - kibnal_data.kib_port, - conn->ibc_connreq->cr_gid, - conn->ibc_connreq->cr_service.service_gid, - conn->ibc_connreq->cr_service.service_pkey, - 0, - kibnal_tunables.kib_io_timeout * HZ, - 0, - kibnal_pathreq_callback, conn, - &conn->ibc_connreq->cr_tid); - - if (status == 0) - return; - - CERROR ("Path record request: %d\n", status); - kibnal_connreq_done (conn, 1, status); -} - -void kibnal_connect_peer (kib_peer_t *peer) { - kib_conn_t *conn = kibnal_create_conn(); + kib_conn_t *conn; int rc; - LASSERT (peer->ibp_connecting != 0); - + conn = kibnal_create_conn(); if (conn == NULL) { CERROR ("Can't allocate conn\n"); kibnal_peer_connect_failed (peer, 1, -ENOMEM); @@ -2257,21 +2146,32 @@ kibnal_connect_peer (kib_peer_t *peer) memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq)); - kibnal_set_service_keys(&conn->ibc_connreq->cr_service, peer->ibp_nid); + rc = kibnal_make_svcqry(conn); + if (rc != 0) { + kibnal_connreq_done (conn, 1, rc); + return; + } + + rc = ib_cached_gid_get(kibnal_data.kib_device, + kibnal_data.kib_port, 0, + conn->ibc_connreq->cr_gid); + LASSERT (rc == 0); - /* kibnal_service_get_callback gets my conn ref */ - rc = ib_service_get (kibnal_data.kib_device, - kibnal_data.kib_port, - &conn->ibc_connreq->cr_service, - KIBNAL_SERVICE_KEY_MASK, - kibnal_tunables.kib_io_timeout * HZ, - kibnal_service_get_callback, conn, - &conn->ibc_connreq->cr_tid); - + /* kibnal_pathreq_callback gets my conn ref */ + rc = tsIbPathRecordRequest (kibnal_data.kib_device, + kibnal_data.kib_port, + conn->ibc_connreq->cr_gid, + conn->ibc_connreq->cr_svcrsp.ibsr_svc_gid, + conn->ibc_connreq->cr_svcrsp.ibsr_svc_pkey, + 0, + kibnal_tunables.kib_io_timeout * HZ, + 0, + kibnal_pathreq_callback, conn, + &conn->ibc_connreq->cr_tid); if (rc == 0) return; - CERROR ("ib_service_get: %d\n", rc); + CERROR ("Path record request: %d\n", rc); kibnal_connreq_done (conn, 1, rc); } @@ -2385,31 +2285,30 @@ kibnal_terminate_conn (kib_conn_t *conn) } int -kibnal_connd (void *arg) +kibnal_reaper (void *arg) { wait_queue_t wait; unsigned long flags; kib_conn_t *conn; - kib_peer_t *peer; int timeout; int i; int peer_index = 0; unsigned long deadline = jiffies; - kportal_daemonize ("kibnal_connd"); + kportal_daemonize ("kibnal_reaper"); kportal_blockallsigs (); init_waitqueue_entry (&wait, current); - spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags); + spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags); - for (;;) { - if (!list_empty (&kibnal_data.kib_connd_conns)) { - conn = list_entry (kibnal_data.kib_connd_conns.next, + while (!kibnal_data.kib_shutdown) { + if (!list_empty (&kibnal_data.kib_reaper_conns)) { + conn = list_entry (kibnal_data.kib_reaper_conns.next, kib_conn_t, ibc_list); list_del (&conn->ibc_list); - spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); + spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags); switch (conn->ibc_state) { case IBNAL_CONN_DEATHROW: @@ -2431,29 +2330,11 @@ kibnal_connd (void *arg) LBUG(); } - spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags); + spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags); continue; } - if (!list_empty (&kibnal_data.kib_connd_peers)) { - peer = list_entry (kibnal_data.kib_connd_peers.next, - kib_peer_t, ibp_connd_list); - - list_del_init (&peer->ibp_connd_list); - spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); - - kibnal_connect_peer (peer); - kibnal_put_peer (peer); - - spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags); - } - - /* shut down and nobody left to reap... */ - if (kibnal_data.kib_shutdown && - atomic_read(&kibnal_data.kib_nconns) == 0) - break; - - spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); + spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags); /* careful with the jiffy wrap... */ while ((timeout = (int)(deadline - jiffies)) <= 0) { @@ -2484,15 +2365,85 @@ kibnal_connd (void *arg) deadline += p * HZ; } - kibnal_data.kib_connd_waketime = jiffies + timeout; + kibnal_data.kib_reaper_waketime = jiffies + timeout; + + set_current_state (TASK_INTERRUPTIBLE); + add_wait_queue (&kibnal_data.kib_reaper_waitq, &wait); + + schedule_timeout (timeout); + + set_current_state (TASK_RUNNING); + remove_wait_queue (&kibnal_data.kib_reaper_waitq, &wait); + + spin_lock_irqsave (&kibnal_data.kib_reaper_lock, flags); + } + + spin_unlock_irqrestore (&kibnal_data.kib_reaper_lock, flags); + + kibnal_thread_fini (); + return (0); +} + +int +kibnal_connd (void *arg) +{ + long id = (long)arg; + char name[16]; + wait_queue_t wait; + unsigned long flags; + kib_peer_t *peer; + kib_acceptsock_t *as; + int did_something; + + snprintf(name, sizeof(name), "kibnal_connd_%02ld", id); + kportal_daemonize(name); + kportal_blockallsigs(); + + init_waitqueue_entry (&wait, current); + + spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags); + + while (!kibnal_data.kib_shutdown) { + did_something = 0; + + if (!list_empty (&kibnal_data.kib_connd_acceptq)) { + as = list_entry (kibnal_data.kib_connd_acceptq.next, + kib_acceptsock_t, ibas_list); + list_del (&as->ibas_list); + + spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); + + kibnal_handle_svcqry(as->ibas_sock); + sock_release(as->ibas_sock); + PORTAL_FREE(as, sizeof(*as)); + + spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags); + did_something = 1; + } + + if (!list_empty (&kibnal_data.kib_connd_peers)) { + peer = list_entry (kibnal_data.kib_connd_peers.next, + kib_peer_t, ibp_connd_list); + + list_del_init (&peer->ibp_connd_list); + spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); + + kibnal_connect_peer (peer); + kibnal_put_peer (peer); + + spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags); + did_something = 1; + } + + if (did_something) + continue; set_current_state (TASK_INTERRUPTIBLE); add_wait_queue (&kibnal_data.kib_connd_waitq, &wait); - if (!kibnal_data.kib_shutdown && - list_empty (&kibnal_data.kib_connd_conns) && - list_empty (&kibnal_data.kib_connd_peers)) - schedule_timeout (timeout); + spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags); + + schedule(); set_current_state (TASK_RUNNING); remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait); @@ -2524,7 +2475,7 @@ kibnal_scheduler(void *arg) spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags); - for (;;) { + while (!kibnal_data.kib_shutdown) { did_something = 0; while (!list_empty(&kibnal_data.kib_sched_txq)) { @@ -2553,11 +2504,6 @@ kibnal_scheduler(void *arg) flags); } - /* shut down and no receives to complete... */ - if (kibnal_data.kib_shutdown && - atomic_read(&kibnal_data.kib_nconns) == 0) - break; - /* nothing to do or hogging CPU */ if (!did_something || counter++ == IBNAL_RESCHED) { spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, @@ -2569,8 +2515,7 @@ kibnal_scheduler(void *arg) kibnal_data.kib_sched_waitq, !list_empty(&kibnal_data.kib_sched_txq) || !list_empty(&kibnal_data.kib_sched_rxq) || - (kibnal_data.kib_shutdown && - atomic_read (&kibnal_data.kib_nconns) == 0)); + kibnal_data.kib_shutdown); } else { our_cond_resched(); } diff --git a/lnet/klnds/ralnd/ralnd.c b/lnet/klnds/ralnd/ralnd.c index 4c2ce9d..6333bcf 100644 --- a/lnet/klnds/ralnd/ralnd.c +++ b/lnet/klnds/ralnd/ralnd.c @@ -28,7 +28,6 @@ ptl_handle_ni_t kranal_ni; kra_data_t kranal_data; kra_tunables_t kranal_tunables; -#ifdef CONFIG_SYSCTL #define RANAL_SYSCTL_TIMEOUT 1 #define RANAL_SYSCTL_LISTENER_TIMEOUT 2 #define RANAL_SYSCTL_BACKLOG 3 @@ -60,7 +59,6 @@ static ctl_table kranal_top_ctl_table[] = { {RANAL_SYSCTL, "ranal", NULL, 0, 0555, kranal_ctl_table}, { 0 } }; -#endif int kranal_sock_write (struct socket *sock, void *buffer, int nob) @@ -89,6 +87,12 @@ kranal_sock_write (struct socket *sock, void *buffer, int nob) rc = sock_sendmsg(sock, &msg, iov.iov_len); set_fs(oldmm); + if (rc == nob) + return 0; + + if (rc >= 0) + return -EAGAIN; + return rc; } @@ -957,8 +961,15 @@ kranal_connect (kra_peer_t *peer) } while (!list_empty(&zombies)); } +void +kranal_free_acceptsock (kra_acceptsock_t *ras) +{ + sock_release(ras->ras_sock); + PORTAL_FREE(ras, sizeof(*ras)); +} + int -kranal_listener(void *arg) +kranal_listener (void *arg) { struct sockaddr_in addr; wait_queue_t wait; @@ -1031,6 +1042,10 @@ kranal_listener(void *arg) kranal_pause(HZ); continue; } + /* XXX this should add a ref to sock->ops->owner, if + * TCP could be a module */ + ras->ras_sock->type = sock->type; + ras->ras_sock->ops = sock->ops; } set_current_state(TASK_INTERRUPTIBLE); @@ -1112,8 +1127,12 @@ kranal_start_listener (void) } void -kranal_stop_listener(void) +kranal_stop_listener(int clear_acceptq) { + struct list_head zombie_accepts; + unsigned long flags; + kra_acceptsock_t *ras; + CDEBUG(D_WARNING, "Stopping listener\n"); /* Called holding kra_nid_mutex: listener running */ @@ -1127,6 +1146,24 @@ kranal_stop_listener(void) LASSERT (kranal_data.kra_listener_sock == NULL); CDEBUG(D_WARNING, "Listener stopped\n"); + + if (!clear_acceptq) + return; + + /* Close any unhandled accepts */ + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + + list_add(&zombie_accepts, &kranal_data.kra_connd_acceptq); + list_del_init(&kranal_data.kra_connd_acceptq); + + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + + while (!list_empty(&zombie_accepts)) { + ras = list_entry(zombie_accepts.next, + kra_acceptsock_t, ras_list); + list_del(&ras->ras_list); + kranal_free_acceptsock(ras); + } } int @@ -1154,7 +1191,7 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp, kranal_data.kra_listener_sock == NULL)) { if (kranal_data.kra_listener_sock != NULL) - kranal_stop_listener(); + kranal_stop_listener(0); rc = kranal_start_listener(); @@ -1175,9 +1212,9 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp, int kranal_set_mynid(ptl_nid_t nid) { - unsigned long flags; - lib_ni_t *ni = &kranal_lib.libnal_ni; - int rc = 0; + unsigned long flags; + lib_ni_t *ni = &kranal_lib.libnal_ni; + int rc = 0; CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n", nid, ni->ni_pid.nid); @@ -1191,14 +1228,13 @@ kranal_set_mynid(ptl_nid_t nid) } if (kranal_data.kra_listener_sock != NULL) - kranal_stop_listener(); + kranal_stop_listener(1); write_lock_irqsave(&kranal_data.kra_global_lock, flags); kranal_data.kra_peerstamp++; - write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); - ni->ni_pid.nid = nid; - + write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + /* Delete all existing peers and their connections after new * NID/connstamp set to ensure no old connections in our brave * new world. */ @@ -2013,10 +2049,9 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid, void __exit kranal_module_fini (void) { -#ifdef CONFIG_SYSCTL if (kranal_tunables.kra_sysctl != NULL) unregister_sysctl_table(kranal_tunables.kra_sysctl); -#endif + PtlNIFini(kranal_ni); ptl_unregister_nal(RANAL); @@ -2040,6 +2075,10 @@ kranal_module_init (void) /* Initialise dynamic tunables to defaults once only */ kranal_tunables.kra_timeout = RANAL_TIMEOUT; + kranal_tunables.kra_listener_timeout = RANAL_LISTENER_TIMEOUT; + kranal_tunables.kra_backlog = RANAL_BACKLOG; + kranal_tunables.kra_port = RANAL_PORT; + kranal_tunables.kra_max_immediate = RANAL_MAX_IMMEDIATE; rc = ptl_register_nal(RANAL, &kranal_api); if (rc != PTL_OK) { @@ -2054,11 +2093,15 @@ kranal_module_init (void) return -ENODEV; } -#ifdef CONFIG_SYSCTL - /* Press on regardless even if registering sysctl doesn't work */ kranal_tunables.kra_sysctl = register_sysctl_table(kranal_top_ctl_table, 0); -#endif + if (kranal_tunables.kra_sysctl == NULL) { + CERROR("Can't register sysctl table\n"); + PtlNIFini(kranal_ni); + ptl_unregister_nal(RANAL); + return -ENOMEM; + } + return 0; } diff --git a/lnet/klnds/ralnd/ralnd.h b/lnet/klnds/ralnd/ralnd.h index 53762eb..d904c72 100644 --- a/lnet/klnds/ralnd/ralnd.h +++ b/lnet/klnds/ralnd/ralnd.h @@ -86,6 +86,8 @@ /* default vals for runtime tunables */ #define RANAL_TIMEOUT 30 /* comms timeout (seconds) */ #define RANAL_LISTENER_TIMEOUT 5 /* listener timeout (seconds) */ +#define RANAL_BACKLOG 127 /* listener's backlog */ +#define RANAL_PORT 988 /* listener's port */ #define RANAL_MAX_IMMEDIATE (2<<10) /* immediate payload breakpoint */ typedef struct @@ -465,18 +467,19 @@ kranal_page2phys (struct page *p) # error "no page->phys" #endif -extern int kranal_listener_procint(ctl_table *table, - int write, struct file *filp, - void *buffer, size_t *lenp); -extern void kranal_update_reaper_timeout(long timeout); +extern void kranal_free_acceptsock (kra_acceptsock_t *ras); +extern int kranal_listener_procint (ctl_table *table, + int write, struct file *filp, + void *buffer, size_t *lenp); +extern void kranal_update_reaper_timeout (long timeout); extern void kranal_tx_done (kra_tx_t *tx, int completion); extern void kranal_unlink_peer_locked (kra_peer_t *peer); -extern void kranal_schedule_conn(kra_conn_t *conn); +extern void kranal_schedule_conn (kra_conn_t *conn); extern kra_peer_t *kranal_create_peer (ptl_nid_t nid); extern kra_peer_t *kranal_find_peer_locked (ptl_nid_t nid); extern void kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx); extern int kranal_del_peer (ptl_nid_t nid, int single_share); -extern void kranal_device_callback(RAP_INT32 devid); +extern void kranal_device_callback (RAP_INT32 devid); extern int kranal_thread_start (int(*fn)(void *arg), void *arg); extern int kranal_connd (void *arg); extern int kranal_reaper (void *arg); diff --git a/lnet/klnds/ralnd/ralnd_cb.c b/lnet/klnds/ralnd/ralnd_cb.c index 541a15a..d4bacdf 100644 --- a/lnet/klnds/ralnd/ralnd_cb.c +++ b/lnet/klnds/ralnd/ralnd_cb.c @@ -1050,8 +1050,7 @@ kranal_connd (void *arg) spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); kranal_conn_handshake(ras->ras_sock, NULL); - sock_release(ras->ras_sock); - PORTAL_FREE(ras, sizeof(*ras)); + kranal_free_acceptsock(ras); spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); did_something = 1; diff --git a/lnet/utils/portals.c b/lnet/utils/portals.c index 268e2d6..9dc47da 100644 --- a/lnet/utils/portals.c +++ b/lnet/utils/portals.c @@ -693,7 +693,7 @@ jt_ptl_print_peers (int argc, char **argv) ptl_ipaddr_2_str (pcfg.pcfg_size, buffer[0], 1), ptl_ipaddr_2_str (pcfg.pcfg_id, buffer[1], 1), pcfg.pcfg_misc, pcfg.pcfg_count); - else if (g_nal_is_compatible(NULL, RANAL, 0)) + else if (g_nal_is_compatible(NULL, RANAL, OPENIBNAL, 0)) printf (LPX64"[%d]@%s:%d\n", pcfg.pcfg_nid, pcfg.pcfg_wait, ptl_ipaddr_2_str (pcfg.pcfg_id, buffer[1], 1), @@ -721,14 +721,14 @@ jt_ptl_add_peer (int argc, char **argv) OPENIBNAL, IIBNAL, VIBNAL, 0)) return -1; - if (g_nal_is_compatible(NULL, SOCKNAL, RANAL, 0)) { + if (g_nal_is_compatible(NULL, SOCKNAL, OPENIBNAL, RANAL, 0)) { if (argc != 4) { - fprintf (stderr, "usage(tcp,ra): %s nid ipaddr port\n", + fprintf (stderr, "usage(tcp,openib,ra): %s nid ipaddr port\n", argv[0]); return 0; } } else if (argc != 2) { - fprintf (stderr, "usage(openib,iib,vib): %s nid\n", argv[0]); + fprintf (stderr, "usage(iib,vib): %s nid\n", argv[0]); return 0; } @@ -738,7 +738,7 @@ jt_ptl_add_peer (int argc, char **argv) return -1; } - if (g_nal_is_compatible (NULL, SOCKNAL, RANAL, 0)) { + if (g_nal_is_compatible (NULL, SOCKNAL, OPENIBNAL, RANAL, 0)) { if (ptl_parse_ipaddr (&ip, argv[2]) != 0) { fprintf (stderr, "Can't parse ip addr: %s\n", argv[2]); return -1; -- 1.8.3.1