Whamcloud - gitweb
Land b_release_1_4_4 onto HEAD (20050810_0253)
[fs/lustre-release.git] / lnet / ulnds / socklnd / connection.c
index 310e899..49cca96 100644 (file)
@@ -3,19 +3,19 @@
  *
  *  Copyright (c) 2002 Cray Inc.
  *
- *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
+ *   This file is part of Lustre, http://www.lustre.org.
  *
- *   Portals is free software; you can redistribute it and/or
- *   modify it under the terms of version 2.1 of the GNU Lesser General
- *   Public License as published by the Free Software Foundation.
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
  *
- *   Portals is distributed in the hope that it will be useful,
+ *   Lustre is distributed in the hope that it will be useful,
  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU Lesser General Public License for more details.
+ *   GNU General Public License for more details.
  *
- *   You should have received a copy of the GNU Lesser General Public
- *   License along with Portals; if not, write to the Free Software
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
 #include <stdarg.h>
 #include <string.h>
 #include <unistd.h>
-#include <syscall.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <portals/types.h>
+#include <portals/lib-types.h>
+#include <portals/socknal.h>
+#include <libcfs/kp30.h>
 #include <connection.h>
+#include <pthread.h>
 #include <errno.h>
-
+#ifndef __CYGWIN__
+#include <syscall.h>
+#endif
 
 /* global variable: acceptor port */
 unsigned short tcpnal_acceptor_port = 988;
@@ -55,9 +62,14 @@ unsigned short tcpnal_acceptor_port = 988;
  */
 static int compare_connection(void *arg1, void *arg2)
 {
-        connection c = arg1;
-        unsigned int * id = arg2;
-        return((c->ip==id[0]) && (c->port==id[1]));
+    connection c = arg1;
+    unsigned int * id = arg2;
+#if 0
+    return((c->ip==id[0]) && (c->port==id[1]));
+#else
+    /* CFS specific hacking */
+    return (c->ip == id[0]);
+#endif
 }
 
 
@@ -68,7 +80,12 @@ static int compare_connection(void *arg1, void *arg2)
  */
 static unsigned int connection_key(unsigned int *id)
 {
+#if 0
     return(id[0]^id[1]);
+#else
+    /* CFS specific hacking */
+    return (unsigned int) id[0];
+#endif
 }
 
 
@@ -102,22 +119,27 @@ int read_connection(connection c,
                     unsigned char *dest,
                     int len)
 {
-    int offset=0,rc;
+    int offset = 0,rc;
 
-    if (len){
+    if (len) {
         do {
-            if((rc=syscall(SYS_read, c->fd, dest+offset, len-offset))<=0){
-                if (errno==EINTR) {
-                    rc=0;
+#ifndef __CYGWIN__
+            rc = syscall(SYS_read, c->fd, dest+offset, len-offset);
+#else
+            rc = recv(c->fd, dest+offset, len-offset, 0);
+#endif
+            if (rc <= 0) {
+                if (errno == EINTR) {
+                    rc = 0;
                 } else {
                     remove_connection(c);
-                    return(0);
+                    return (0);
                 }
             }
-            offset+=rc;
-        } while (offset<len);
+            offset += rc;
+        } while (offset < len);
     }
-    return(1);
+    return (1);
 }
 
 static int connection_input(void *d)
@@ -172,10 +194,128 @@ static int new_connection(void *z)
     unsigned int nid=*((unsigned int *)&s.sin_addr);
     /* cfs specific hack */
     //unsigned short pid=s.sin_port;
+    pthread_mutex_lock(&m->conn_lock);
     allocate_connection(m,htonl(nid),0/*pid*/,fd);
+    pthread_mutex_unlock(&m->conn_lock);
     return(1);
 }
 
+extern ptl_nid_t tcpnal_mynid;
+
+int
+tcpnal_hello (int sockfd, ptl_nid_t *nid, int type, __u64 incarnation)
+{
+        int                 rc;
+        int                 nob;
+        ptl_hdr_t           hdr;
+        ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
+
+        LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
+
+        memset (&hdr, 0, sizeof (hdr));
+        hmv->magic         = cpu_to_le32(PORTALS_PROTO_MAGIC);
+        hmv->version_major = cpu_to_le32(PORTALS_PROTO_VERSION_MAJOR);
+        hmv->version_minor = cpu_to_le32(PORTALS_PROTO_VERSION_MINOR);
+        
+        hdr.src_nid = cpu_to_le64(tcpnal_mynid);
+        hdr.type    = cpu_to_le32(PTL_MSG_HELLO);
+
+        hdr.msg.hello.type = cpu_to_le32(type);
+        hdr.msg.hello.incarnation = cpu_to_le64(incarnation);
+
+        /* I don't send any interface info */
+
+        /* Assume sufficient socket buffering for this message */
+        rc = syscall(SYS_write, sockfd, &hdr, sizeof(hdr));
+        if (rc <= 0) {
+                CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
+                return (rc);
+        }
+
+        rc = syscall(SYS_read, sockfd, hmv, sizeof(*hmv));
+        if (rc <= 0) {
+                CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
+                return (rc);
+        }
+        
+        if (hmv->magic != le32_to_cpu(PORTALS_PROTO_MAGIC)) {
+                CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
+                        cpu_to_le32(hmv->magic), PORTALS_PROTO_MAGIC, *nid);
+                return (-EPROTO);
+        }
+
+        if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
+            hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
+                CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
+                        " from "LPX64"\n",
+                        le16_to_cpu (hmv->version_major),
+                        le16_to_cpu (hmv->version_minor),
+                        PORTALS_PROTO_VERSION_MAJOR,
+                        PORTALS_PROTO_VERSION_MINOR,
+                        *nid);
+                return (-EPROTO);
+        }
+
+#if (PORTALS_PROTO_VERSION_MAJOR != 1)
+# error "This code only understands protocol version 1.x"
+#endif
+        /* version 1 sends magic/version as the dest_nid of a 'hello' header,
+         * so read the rest of it in now... */
+
+        rc = syscall(SYS_read, sockfd, hmv + 1, sizeof(hdr) - sizeof(*hmv));
+        if (rc <= 0) {
+                CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
+                        rc, *nid);
+                return (rc);
+        }
+
+        /* ...and check we got what we expected */
+        if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
+                CERROR ("Expecting a HELLO hdr "
+                        " but got type %d with %d payload from "LPX64"\n",
+                        le32_to_cpu (hdr.type),
+                        le32_to_cpu (hdr.payload_length), *nid);
+                return (-EPROTO);
+        }
+
+        if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
+                CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
+                return (-EPROTO);
+        }
+
+        if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
+                *nid = le64_to_cpu(hdr.src_nid);
+        } else if (*nid != le64_to_cpu (hdr.src_nid)) {
+                CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
+                        le64_to_cpu (hdr.src_nid), *nid);
+                return (-EPROTO);
+        }
+
+        /* Ignore any interface info in the payload */
+        nob = le32_to_cpu(hdr.payload_length);
+        if (nob > getpagesize()) {
+                CERROR("Unexpected HELLO payload %d from "LPX64"\n",
+                       nob, *nid);
+                return (-EPROTO);
+        }
+        if (nob > 0) {
+                char *space = (char *)malloc(nob);
+                
+                if (space == NULL) {
+                        CERROR("Can't allocate scratch buffer %d\n", nob);
+                        return (-ENOMEM);
+                }
+                
+                rc = syscall(SYS_read, sockfd, space, nob);
+                if (rc <= 0) {
+                        CERROR("Error %d skipping HELLO payload from "
+                               LPX64"\n", rc, *nid);
+                        return (rc);
+                }
+        }
+
+        return (0);
+}
 
 /* Function:  force_tcp_connection
  * Arguments: t: tcpnal
@@ -185,39 +325,108 @@ static int new_connection(void *z)
  */
 connection force_tcp_connection(manager m,
                                 unsigned int ip,
-                                unsigned short port)
+                                unsigned short port,
+                                procbridge pb)
 {
-    connection c;
+    connection conn;
     struct sockaddr_in addr;
+    struct sockaddr_in locaddr; 
     unsigned int id[2];
+    struct timeval tv;
+    __u64 incarnation;
+
+    int fd;
+    int option;
+    int rc;
+    int rport;
+    ptl_nid_t peernid = PTL_NID_ANY;
 
     port = tcpnal_acceptor_port;
 
-    id[0]=ip;
-    id[1]=port;
+    id[0] = ip;
+    id[1] = port;
 
-    if (!(c=hash_table_find(m->connections,id))){
-        int fd;
+    pthread_mutex_lock(&m->conn_lock);
 
-        bzero((char *) &addr, sizeof(addr));
-        addr.sin_family      = AF_INET;
-        addr.sin_addr.s_addr = htonl(ip);
-        addr.sin_port        = htons(port);
+    conn = hash_table_find(m->connections, id);
+    if (conn)
+            goto out;
 
-        if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { 
-            perror("tcpnal socket failed");
-            exit(-1);
-        }
-        if (connect(fd,
-                    (struct sockaddr *)&addr,
-                    sizeof(struct sockaddr_in)))
-            {
-                perror("tcpnal connect");
-                return(0);
+    memset(&addr, 0, sizeof(addr));
+    addr.sin_family      = AF_INET;
+    addr.sin_addr.s_addr = htonl(ip);
+    addr.sin_port        = htons(port);
+
+    memset(&locaddr, 0, sizeof(locaddr)); 
+    locaddr.sin_family = AF_INET; 
+    locaddr.sin_addr.s_addr = INADDR_ANY;
+
+    for (rport = IPPORT_RESERVED - 1; rport > IPPORT_RESERVED / 2; --rport) {
+            fd = socket(AF_INET, SOCK_STREAM, 0);
+            if (fd < 0) {
+                    perror("tcpnal socket failed");
+                    goto out;
+            } 
+            
+            option = 1;
+            rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 
+                            &option, sizeof(option));
+            if (rc != 0) {
+                    perror ("Can't set SO_REUSEADDR for socket"); 
+                    close(fd);
+                    goto out;
+            } 
+
+            locaddr.sin_port = htons(rport);
+            rc = bind(fd, (struct sockaddr *)&locaddr, sizeof(locaddr));
+            if (rc == 0 || errno == EACCES) {
+                    rc = connect(fd, (struct sockaddr *)&addr,
+                                 sizeof(struct sockaddr_in));
+                    if (rc == 0) {
+                            break;
+                    } else if (errno != EADDRINUSE && errno != EADDRNOTAVAIL) {
+                            perror("Error connecting to remote host");
+                            close(fd);
+                            goto out;
+                    }
+            } else if (errno != EADDRINUSE) {
+                    perror("Error binding to privileged port");
+                    close(fd);
+                    goto out;
             }
-        return(allocate_connection(m,ip,port,fd));
+            close(fd);
     }
-    return(c);
+    
+    if (rport == IPPORT_RESERVED / 2) {
+            fprintf(stderr, "Out of ports trying to bind to a reserved port\n");
+            goto out;
+    }
+    
+#if 1
+    option = 1;
+    setsockopt(fd, SOL_TCP, TCP_NODELAY, &option, sizeof(option));
+    option = 1<<20;
+    setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &option, sizeof(option));
+    option = 1<<20;
+    setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &option, sizeof(option));
+#endif
+   
+    gettimeofday(&tv, NULL);
+    incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
+
+    /* say hello */
+    if (tcpnal_hello(fd, &peernid, SOCKNAL_CONN_ANY, incarnation))
+            exit(-1);
+    
+    conn = allocate_connection(m, ip, port, fd);
+    
+    /* let nal thread know this event right away */
+    if (conn)
+            procbridge_wakeup_nal(pb);
+
+out:
+    pthread_mutex_unlock(&m->conn_lock);
+    return (conn);
 }
 
 
@@ -243,10 +452,10 @@ static int bind_socket(manager m,unsigned short port)
     bzero((char *) &addr, sizeof(addr));
     addr.sin_family      = AF_INET;
     addr.sin_addr.s_addr = 0;
-    addr.sin_port        = port; 
-    
+    addr.sin_port        = htons(port);
+
     if (bind(m->bound,(struct sockaddr *)&addr,alen)<0){
-        perror ("tcpnal bind"); 
+        fprintf(stderr, "tcpnal bind: %s port %u\n", strerror(errno), port); 
         return(0);
     }
     
@@ -284,11 +493,15 @@ manager init_connections(unsigned short pid,
                          int (*input)(void *, void *),
                          void *a)
 {
-    manager m=(manager)malloc(sizeof(struct manager));
-    m->connections=hash_create_table(compare_connection,connection_key);
-    m->handler=input;
-    m->handler_arg=a;
-    if (bind_socket(m,pid)) return(m);
+    manager m = (manager)malloc(sizeof(struct manager));
+    m->connections = hash_create_table(compare_connection,connection_key);
+    m->handler = input;
+    m->handler_arg = a;
+    pthread_mutex_init(&m->conn_lock, 0);
+
+    if (bind_socket(m,pid))
+        return(m);
+
     free(m);
     return(0);
 }