*
* Copyright (c) 2002 Cray Inc.
*
- * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
+ * This file is part of Lustre, http://www.lustre.org.
*
- * Portals is free software; you can redistribute it and/or
- * modify it under the terms of version 2.1 of the GNU Lesser General
- * Public License as published by the Free Software Foundation.
+ * Lustre is free software; you can redistribute it and/or
+ * modify it under the terms of version 2 of the GNU General Public
+ * License as published by the Free Software Foundation.
*
- * Portals is distributed in the hope that it will be useful,
+ * Lustre is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Lesser General Public License for more details.
+ * GNU General Public License for more details.
*
- * You should have received a copy of the GNU Lesser General Public
- * License along with Portals; if not, write to the Free Software
+ * You should have received a copy of the GNU General Public License
+ * along with Lustre; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
-#include <syscall.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <portals/types.h>
+#include <portals/lib-types.h>
+#include <portals/socknal.h>
+#include <libcfs/kp30.h>
#include <connection.h>
+#include <pthread.h>
#include <errno.h>
-
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
/* global variable: acceptor port */
unsigned short tcpnal_acceptor_port = 988;
*/
static int compare_connection(void *arg1, void *arg2)
{
- connection c = arg1;
- unsigned int * id = arg2;
- return((c->ip==id[0]) && (c->port==id[1]));
+ connection c = arg1;
+ unsigned int * id = arg2;
+#if 0
+ return((c->ip==id[0]) && (c->port==id[1]));
+#else
+ /* CFS specific hacking */
+ return (c->ip == id[0]);
+#endif
}
*/
static unsigned int connection_key(unsigned int *id)
{
+#if 0
return(id[0]^id[1]);
+#else
+ /* CFS specific hacking */
+ return (unsigned int) id[0];
+#endif
}
unsigned char *dest,
int len)
{
- int offset=0,rc;
+ int offset = 0,rc;
- if (len){
+ if (len) {
do {
- if((rc=syscall(SYS_read, c->fd, dest+offset, len-offset))<=0){
- if (errno==EINTR) {
- rc=0;
+#ifndef __CYGWIN__
+ rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
+#else
+ rc = recv(c->fd, dest+offset, len-offset, 0);
+#endif
+ if (rc <= 0) {
+ if (errno == EINTR) {
+ rc = 0;
} else {
remove_connection(c);
- return(0);
+ return (0);
}
}
- offset+=rc;
- } while (offset<len);
+ offset += rc;
+ } while (offset < len);
}
- return(1);
+ return (1);
}
static int connection_input(void *d)
unsigned int nid=*((unsigned int *)&s.sin_addr);
/* cfs specific hack */
//unsigned short pid=s.sin_port;
+ pthread_mutex_lock(&m->conn_lock);
allocate_connection(m,htonl(nid),0/*pid*/,fd);
+ pthread_mutex_unlock(&m->conn_lock);
return(1);
}
+extern ptl_nid_t tcpnal_mynid;
+
+int
+tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
+{
+ int rc;
+ int nob;
+ ptl_hdr_t hdr;
+ ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
+
+ LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
+
+ memset (&hdr, 0, sizeof (hdr));
+ hmv->magic = cpu_to_le32(PORTALS_PROTO_MAGIC);
+ hmv->version_major = cpu_to_le32(PORTALS_PROTO_VERSION_MAJOR);
+ hmv->version_minor = cpu_to_le32(PORTALS_PROTO_VERSION_MINOR);
+
+ hdr.src_nid = cpu_to_le64(tcpnal_mynid);
+ hdr.type = cpu_to_le32(PTL_MSG_HELLO);
+
+ hdr.msg.hello.type = cpu_to_le32(type);
+ hdr.msg.hello.incarnation = cpu_to_le64(incarnation);
+
+ /* I don't send any interface info */
+
+ /* Assume sufficient socket buffering for this message */
+ rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
+ if (rc <= 0) {
+ CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
+ return (rc);
+ }
+
+ rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
+ if (rc <= 0) {
+ CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
+ return (rc);
+ }
+
+ if (hmv->magic != le32_to_cpu(PORTALS_PROTO_MAGIC)) {
+ CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
+ cpu_to_le32(hmv->magic), PORTALS_PROTO_MAGIC, *nid);
+ return (-EPROTO);
+ }
+
+ if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
+ hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
+ CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
+ " from "LPX64"\n",
+ le16_to_cpu (hmv->version_major),
+ le16_to_cpu (hmv->version_minor),
+ PORTALS_PROTO_VERSION_MAJOR,
+ PORTALS_PROTO_VERSION_MINOR,
+ *nid);
+ return (-EPROTO);
+ }
+
+#if (PORTALS_PROTO_VERSION_MAJOR != 1)
+# error "This code only understands protocol version 1.x"
+#endif
+ /* version 1 sends magic/version as the dest_nid of a 'hello' header,
+ * so read the rest of it in now... */
+
+ rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
+ if (rc <= 0) {
+ CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
+ rc, *nid);
+ return (rc);
+ }
+
+ /* ...and check we got what we expected */
+ if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
+ CERROR ("Expecting a HELLO hdr "
+ " but got type %d with %d payload from "LPX64"\n",
+ le32_to_cpu (hdr.type),
+ le32_to_cpu (hdr.payload_length), *nid);
+ return (-EPROTO);
+ }
+
+ if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
+ CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
+ return (-EPROTO);
+ }
+
+ if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */
+ *nid = le64_to_cpu(hdr.src_nid);
+ } else if (*nid != le64_to_cpu (hdr.src_nid)) {
+ CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
+ le64_to_cpu (hdr.src_nid), *nid);
+ return (-EPROTO);
+ }
+
+ /* Ignore any interface info in the payload */
+ nob = le32_to_cpu(hdr.payload_length);
+ if (nob > getpagesize()) {
+ CERROR("Unexpected HELLO payload %d from "LPX64"\n",
+ nob, *nid);
+ return (-EPROTO);
+ }
+ if (nob > 0) {
+ char *space = (char *)malloc(nob);
+
+ if (space == NULL) {
+ CERROR("Can't allocate scratch buffer %d\n", nob);
+ return (-ENOMEM);
+ }
+
+ rc = syscall(SYS_read, sockfd, space, nob);
+ if (rc <= 0) {
+ CERROR("Error %d skipping HELLO payload from "
+ LPX64"\n", rc, *nid);
+ return (rc);
+ }
+ }
+
+ return (0);
+}
/* Function: force_tcp_connection
* Arguments: t: tcpnal
*/
connection force_tcp_connection(manager m,
unsigned int ip,
- unsigned short port)
+ unsigned short port,
+ procbridge pb)
{
- connection c;
+ connection conn;
struct sockaddr_in addr;
+ struct sockaddr_in locaddr;
unsigned int id[2];
+ struct timeval tv;
+ __u64 incarnation;
+
+ int fd;
+ int option;
+ int rc;
+ int rport;
+ ptl_nid_t peernid = PTL_NID_ANY;
port = tcpnal_acceptor_port;
- id[0]=ip;
- id[1]=port;
+ id[0] = ip;
+ id[1] = port;
- if (!(c=hash_table_find(m->connections,id))){
- int fd;
+ pthread_mutex_lock(&m->conn_lock);
- bzero((char *) &addr, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl(ip);
- addr.sin_port = htons(port);
+ conn = hash_table_find(m->connections, id);
+ if (conn)
+ goto out;
- if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("tcpnal socket failed");
- exit(-1);
- }
- if (connect(fd,
- (struct sockaddr *)&addr,
- sizeof(struct sockaddr_in)))
- {
- perror("tcpnal connect");
- return(0);
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(ip);
+ addr.sin_port = htons(port);
+
+ memset(&locaddr, 0, sizeof(locaddr));
+ locaddr.sin_family = AF_INET;
+ locaddr.sin_addr.s_addr = INADDR_ANY;
+
+ for (rport = IPPORT_RESERVED - 1; rport > IPPORT_RESERVED / 2; --rport) {
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd < 0) {
+ perror("tcpnal socket failed");
+ goto out;
+ }
+
+ option = 1;
+ rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
+ &option, sizeof(option));
+ if (rc != 0) {
+ perror ("Can't set SO_REUSEADDR for socket");
+ close(fd);
+ goto out;
+ }
+
+ locaddr.sin_port = htons(rport);
+ rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
+ if (rc == 0 || errno == EACCES) {
+ rc = connect(fd, (struct sockaddr *)&addr,
+ sizeof(struct sockaddr_in));
+ if (rc == 0) {
+ break;
+ } else if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) {
+ perror("Error connecting to remote host");
+ close(fd);
+ goto out;
+ }
+ } else if (errno != EADDRINUSE) {
+ perror("Error binding to privileged port");
+ close(fd);
+ goto out;
}
- return(allocate_connection(m,ip,port,fd));
+ close(fd);
}
- return(c);
+
+ if (rport == IPPORT_RESERVED / 2) {
+ fprintf(stderr, "Out of ports trying to bind to a reserved port\n");
+ goto out;
+ }
+
+#if 1
+ option = 1;
+ setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
+ option = 1<<20;
+ setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
+ option = 1<<20;
+ setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
+#endif
+
+ gettimeofday(&tv, NULL);
+ incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+
+ /* say hello */
+ if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, incarnation))
+ exit(-1);
+
+ conn = allocate_connection(m, ip, port, fd);
+
+ /* let nal thread know this event right away */
+ if (conn)
+ procbridge_wakeup_nal(pb);
+
+out:
+ pthread_mutex_unlock(&m->conn_lock);
+ return (conn);
}
bzero((char *) &addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = 0;
- addr.sin_port = port;
-
+ addr.sin_port = htons(port);
+
if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
- perror ("tcpnal bind");
+ fprintf(stderr, "tcpnal bind: %s port %u\n", strerror(errno), port);
return(0);
}
int (*input)(void *, void *),
void *a)
{
- manager m=(manager)malloc(sizeof(struct manager));
- m->connections=hash_create_table(compare_connection,connection_key);
- m->handler=input;
- m->handler_arg=a;
- if (bind_socket(m,pid)) return(m);
+ manager m = (manager)malloc(sizeof(struct manager));
+ m->connections = hash_create_table(compare_connection,connection_key);
+ m->handler = input;
+ m->handler_arg = a;
+ pthread_mutex_init(&m->conn_lock, 0);
+
+ if (bind_socket(m,pid))
+ return(m);
+
free(m);
return(0);
}