Whamcloud - gitweb
* Updated ranal prior to the Great Schism
authoreeb <eeb>
Fri, 17 Dec 2004 18:22:35 +0000 (18:22 +0000)
committereeb <eeb>
Fri, 17 Dec 2004 18:22:35 +0000 (18:22 +0000)
lnet/archdep.m4
lnet/klnds/ralnd/ralnd.c
lnet/klnds/ralnd/ralnd.h
lnet/klnds/ralnd/ralnd_cb.c
lnet/utils/portals.c
lustre/portals/archdep.m4
lustre/portals/knals/ranal/ranal.c
lustre/portals/knals/ranal/ranal.h
lustre/portals/knals/ranal/ranal_cb.c
lustre/portals/utils/portals.c

index 7d807da..2f1d563 100644 (file)
@@ -545,7 +545,7 @@ if test x$enable_modules != xno ; then
                        RAP_RETURN          rc;
                        RAP_PVOID           dev_handle;
 
-                       rc = RapkGetDeviceByIndex(0, NULL, NULL, &dev_handle);
+                       rc = RapkGetDeviceByIndex(0, NULL, &dev_handle);
 
                        return rc == RAP_SUCCESS ? 0 : 1;
                ],[
index a59757d..c924827 100644 (file)
@@ -122,7 +122,7 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
                /* Set receive timeout to remaining time */
                tv = (struct timeval) {
                        .tv_sec = ticks / HZ,
-                       .tv_usec = ((ticks % HZ) * 1000000) / HZ;
+                       .tv_usec = ((ticks % HZ) * 1000000) / HZ
                };
                set_fs(KERNEL_DS);
                rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
@@ -130,7 +130,7 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
                set_fs(oldmm);
                if (rc != 0) {
                        CERROR("Can't set socket recv timeout %d: %d\n",
-                              send_timeout, rc);
+                              timeout, rc);
                        return rc;
                }
 
@@ -211,6 +211,8 @@ kranal_pause(int ticks)
 void
 kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
 {
+        RAP_RETURN   rrc;
+
         memset(connreq, 0, sizeof(*connreq));
 
         connreq->racr_magic       = RANAL_MSG_MAGIC;
@@ -225,12 +227,12 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
 }
 
 int
-kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout)
+kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
 {
         int         i;
        int         rc;
 
-       rc = kranal_sock_read(newsock, connreq, sizeof(*connreq), timeout);
+       rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
        if (rc != 0) {
                CERROR("Read failed: %d\n", rc);
                return rc;
@@ -273,7 +275,7 @@ kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout)
         
         for (i = 0; i < kranal_data.kra_ndevs; i++)
                 if (connreq->racr_devid == 
-                    kranal_data.kra_devices[i]->rad_id)
+                    kranal_data.kra_devices[i].rad_id)
                         break;
 
         if (i == kranal_data.kra_ndevs) {
@@ -285,7 +287,7 @@ kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout)
 }
 
 int
-kranal_conn_isdup_locked(kranal_peer_t *peer, __u64 incarnation)
+kranal_conn_isdup_locked(kra_peer_t *peer, __u64 incarnation)
 {
        kra_conn_t       *conn;
        struct list_head *tmp;
@@ -294,13 +296,13 @@ kranal_conn_isdup_locked(kranal_peer_t *peer, __u64 incarnation)
        list_for_each(tmp, &peer->rap_conns) {
                conn = list_entry(tmp, kra_conn_t, rac_list);
 
-                if (conn->rac_incarnation < incarnation) {
+                if (conn->rac_peer_incarnation < incarnation) {
                         /* Conns with an older incarnation get culled later */
                         continue;
                 }
 
                 if (!loopback &&
-                    conn->rac_incarnation == incarnation &&
+                    conn->rac_peer_incarnation == incarnation &&
                     peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) {
                         /* loopback creates 2 conns */
                         loopback = 1;
@@ -324,7 +326,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn)
 
         do {    /* allocate a unique cqid */
                 conn->rac_cqid = kranal_data.kra_next_cqid++;
-        } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL)
+        } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
         
 
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
@@ -343,7 +345,6 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
                return -ENOMEM;
 
        memset(conn, 0, sizeof(*conn));
-        conn->rac_cqid = cqid;
        atomic_set(&conn->rac_refcount, 1);
        INIT_LIST_HEAD(&conn->rac_list);
        INIT_LIST_HEAD(&conn->rac_hashlist);
@@ -352,10 +353,13 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
        INIT_LIST_HEAD(&conn->rac_replyq);
        spin_lock_init(&conn->rac_lock);
 
+        kranal_set_conn_uniqueness(conn);
+
         conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
         kranal_update_reaper_timeout(conn->rac_timeout);
 
-        rrc = RapkCreateRi(dev->rad_handle, cqid, dev->rad_ptag,
+        rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
+                           dev->rad_ptag,
                            dev->rad_rdma_cq, dev->rad_fma_cq,
                            &conn->rac_rihandle);
         if (rrc != RAP_SUCCESS) {
@@ -412,7 +416,7 @@ __kranal_conn_decref(kra_conn_t *conn)
 void
 kranal_terminate_conn_locked (kra_conn_t *conn)
 {
-        kra_peer_t     *peer - conn->rac_peer;
+        kra_peer_t *peer = conn->rac_peer;
 
         LASSERT (!in_interrupt());
         LASSERT (conn->rac_closing);
@@ -469,7 +473,7 @@ kranal_close_conn (kra_conn_t *conn, int error)
 
 int
 kranal_passive_conn_handshake (struct socket *sock, 
-                               ptl_nid_t **peer_nidp, kra_conn_t **connp)
+                               ptl_nid_t *peer_nidp, kra_conn_t **connp)
 {
        struct sockaddr_in   addr;
        __u32                peer_ip;
@@ -480,16 +484,18 @@ kranal_passive_conn_handshake (struct socket *sock,
         kra_device_t        *dev;
        RAP_RETURN           rrc;
        int                  rc;
+        int                  len;
         int                  i;
 
-       rc = sock->ops->getname(newsock, (struct sockaddr *)addr, &len, 2);
+        len = sizeof(addr);
+       rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2);
         if (rc != 0) {
                 CERROR("Can't get peer's IP: %d\n", rc);
                 return rc;
         }
 
-        peer_ip = ntohl(sin.sin_addr.s_addr);
-        peer_port = ntohs(sin.sin_port);
+        peer_ip = ntohl(addr.sin_addr.s_addr);
+        peer_port = ntohs(addr.sin_port);
 
         if (peer_port >= 1024) {
                 CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n",
@@ -498,7 +504,7 @@ kranal_passive_conn_handshake (struct socket *sock,
         }
 
         rc = kranal_recv_connreq(sock, &connreq, 
-                                 kranal_data.kra_listener_timeout);
+                                 kranal_tunables.kra_listener_timeout);
         if (rc != 0) {
                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
                        HIPQUAD(peer_ip), peer_port, rc);
@@ -511,11 +517,11 @@ kranal_passive_conn_handshake (struct socket *sock,
         for (i = 0;;i++) {
                 LASSERT(i < kranal_data.kra_ndevs);
                 dev = &kranal_data.kra_devices[i];
-                if (dev->rad_id == connreq->racr_devid)
+                if (dev->rad_id == connreq.racr_devid)
                         break;
         }
 
-        rc = kranal_alloc_conn(&conn, dev,(__u32)(peer_nid & 0xffffffff));
+        rc = kranal_alloc_conn(&conn, dev);
         if (rc != 0)
                 return rc;
 
@@ -523,7 +529,7 @@ kranal_passive_conn_handshake (struct socket *sock,
         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
         kranal_update_reaper_timeout(conn->rac_keepalive);
         
-        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc);
                 kranal_conn_decref(conn);
@@ -534,7 +540,7 @@ kranal_passive_conn_handshake (struct socket *sock,
 
         rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
         if (rc != 0) {
-                CERROR("Can't tx connreq to %u.%u.%u.%u/%p: %d\n", 
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
                        HIPQUAD(peer_ip), peer_port, rc);
                 kranal_conn_decref(conn);
                 return rc;
@@ -606,36 +612,33 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
                 CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", 
                        port, HIPQUAD(peer->rap_ip), peer->rap_port);
         }
+
+        /* all ports busy */
+        return -EHOSTUNREACH;
 }
 
 
 int
 kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
 {
+        struct sockaddr_in  dstaddr;
        kra_connreq_t       connreq;
         kra_conn_t         *conn;
         kra_device_t       *dev;
         struct socket      *sock;
-        __u32               id32;
        RAP_RETURN          rrc;
        int                 rc;
+        int                 idx;
+        
+        idx = peer->rap_nid & 0x7fffffff;
+        dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
 
-        id32 = (peer_nid & 0xffffffff);
-        dev = &kranal_data.kra_devices[id32 % kranal_data.kra_ndevs];
-
-        rc = kranal_alloc_conn(&conn, dev, id32);
+        rc = kranal_alloc_conn(&conn, dev);
         if (rc != 0)
                 return rc;
 
         kranal_pack_connreq(&connreq, conn);
         
-        memset(&dstaddr, 0, sizeof(addr));
-        dstaddr.sin_family      = AF_INET;
-        dstaddr.sin_port        = htons(peer->rap_port);
-        dstaddr.sin_addr.s_addr = htonl(peer->rap_ip);
-
-        memset(&srcaddr, 0, sizeof(addr));
-
         rc = ranal_connect_sock(peer, &sock);
         if (rc != 0)
                 goto failed_0;
@@ -651,7 +654,7 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
                 goto failed_1;
         }
 
-        rc = kranal_recv_connreq(sock, &connreq, kranal_data.kra_timeout);
+        rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout);
         if (rc != 0) {
                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
@@ -682,11 +685,10 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
         kranal_update_reaper_timeout(conn->rac_keepalive);
 
         rc = -ENETDOWN;
-        rrc = RapkSetRiParams(conn->rac_rihandle,
-                              &connreq->racr_riparams);
+        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't set riparams for "LPX64": %d\n",
-                       peer_nid, rrc);
+                       peer->rap_nid, rrc);
                 goto failed_0;
         }
 
@@ -694,16 +696,17 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
        return 0;
 
  failed_1:
-        release_sock(sock);
+        sock_release(sock);
  failed_0:
         kranal_conn_decref(conn);
         return rc;
 }
 
 int
-kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer)
+kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
 {
-        kranal_peer_t     *peer2;
+        kra_peer_t        *peer2;
+        kra_tx_t          *tx;
        ptl_nid_t          peer_nid;
        unsigned long      flags;
         unsigned long      timeout;
@@ -770,7 +773,7 @@ kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer)
          * to connect at once).  NB we return success!  We _do_ have a
          * connection (so we don't need to remove the peer from the peer
          * table) and we _don't_ have any blocked txs to complete */
-       if (kranal_conn_isdup_locked(peer, conn->rac_incarnation)) {
+       if (kranal_conn_isdup_locked(peer, conn->rac_peer_incarnation)) {
                 LASSERT (!list_empty(&peer->rap_conns));
                 LASSERT (list_empty(&peer->rap_tx_queue));
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
@@ -794,10 +797,10 @@ kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer)
                                 kra_tx_t, tx_list);
 
                 list_del(&tx->tx_list);
-                kranal_queue_tx_locked(tx, conn);
+                kranal_post_fma(conn, tx);
         }
 
-       nstale = kranal_close_stale_conns_locked(peer, conn->rac_incarnation);
+       nstale = kranal_close_stale_conns_locked(peer, conn->rac_peer_incarnation);
 
        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
@@ -824,7 +827,7 @@ kranal_connect (kra_peer_t *peer)
 
         rc = kranal_conn_handshake(NULL, peer);
 
-        write_lock_irqqsave(&kranal_data.kra_global_lock, flags);
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
         LASSERT (peer->rap_connecting);
         peer->rap_connecting = 0;
@@ -838,7 +841,7 @@ kranal_connect (kra_peer_t *peer)
                 peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
                 peer->rap_reconnect_time = CURRENT_TIME;
 
-                write_unlock_irqrestore(&kranal-data.kra_global_lock, flags);
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
                 return;
         }
 
@@ -876,16 +879,15 @@ kranal_listener(void *arg)
        struct socket     *sock;
        struct socket     *newsock;
        int                port;
-       int                backlog;
-       int                timeout;
        kra_connreq_t     *connreqs;
        char               name[16];
+        int                rc;
 
        /* Parent thread holds kra_nid_mutex, and is, or is about to
         * block on kra_listener_signal */
 
-       port = kra_tunables.kra_port;
-       snprintf(name, "kranal_lstn%03d", port);
+       port = kranal_tunables.kra_port;
+       snprintf(name, sizeof(name), "kranal_lstn%03d", port);
        kportal_daemonize(name);
        kportal_blockallsigs();
 
@@ -896,24 +898,25 @@ kranal_listener(void *arg)
        if (connreqs == NULL)
                goto out_0;
 
-       rc = kranal_create_sock(&sock, port);
+       rc = kranal_create_sock(&sock);
        if (rc != 0)
                goto out_1;
 
         memset(&addr, 0, sizeof(addr));
         addr.sin_family      = AF_INET;
         addr.sin_port        = htons(port);
-        addr.sin_addr.s_addr = INADDR_ANY
+        addr.sin_addr.s_addr = INADDR_ANY;
 
-       rc = sock->ops->bind(sock, &addr, sizeof(addr));
+       rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
        if (rc != 0) {
                CERROR("Can't bind to port %d\n", port);
                goto out_2;
        }
 
-       rc = sock->ops->listen(sock, kra_tunalbes.kra_backlog);
+       rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
        if (rc != 0) {
-               CERROR("Can't set listen backlog %d: %d\n", backlog, rc);
+               CERROR("Can't set listen backlog %d: %d\n", 
+                       kranal_tunables.kra_backlog, rc);
                goto out_2;
        }
 
@@ -976,7 +979,7 @@ kranal_listener(void *arg)
 }
 
 int
-kranal_start_listener ()
+kranal_start_listener (void)
 {
        long           pid;
        int            rc;
@@ -987,7 +990,7 @@ kranal_start_listener ()
        LASSERT (kranal_data.kra_listener_sock == NULL);
 
        kranal_data.kra_listener_shutdown == 0;
-       pid = kernel_thread(kranal_listener, sock, 0);
+       pid = kernel_thread(kranal_listener, NULL, 0);
        if (pid < 0) {
                CERROR("Can't spawn listener: %ld\n", pid);
                return (int)pid;
@@ -1004,7 +1007,7 @@ kranal_start_listener ()
 }
 
 void
-kranal_stop_listener()
+kranal_stop_listener(void)
 {
         CDEBUG(D_WARNING, "Stopping listener\n");
 
@@ -1012,7 +1015,7 @@ kranal_stop_listener()
        LASSERT (kranal_data.kra_listener_sock != NULL);
 
        kranal_data.kra_listener_shutdown = 1;
-       wake_up_all(kranal_data->kra_listener_sock->sk->sk_sleep);
+       wake_up_all(kranal_data.kra_listener_sock->sk->sk_sleep);
 
        /* Block until listener has torn down. */
        down(&kranal_data.kra_listener_signal);
@@ -1031,8 +1034,8 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp,
 
        down(&kranal_data.kra_nid_mutex);
 
-       LASSERT (tunable == &kranal_data.kra_port ||
-                tunable == &kranal_data.kra_backlog);
+       LASSERT (tunable == &kranal_tunables.kra_port ||
+                tunable == &kranal_tunables.kra_backlog);
        old_val = *tunable;
 
        rc = proc_dointvec(table, write, filp, buffer, lenp);
@@ -1060,7 +1063,7 @@ int
 kranal_set_mynid(ptl_nid_t nid)
 {
         lib_ni_t      *ni = &kranal_lib.libnal_ni;
-        int            rc;
+        int            rc = 0;
 
         CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n",
                nid, ni->ni_pid.nid);
@@ -1190,7 +1193,8 @@ kranal_unlink_peer_locked (kra_peer_t *peer)
 }
 
 int
-kranal_get_peer_info (int index, ptl_nid_t *nidp, int *portp, int *persistencep)
+kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, 
+                      int *persistencep)
 {
         kra_peer_t        *peer;
         struct list_head  *ptmp;
@@ -1210,6 +1214,7 @@ kranal_get_peer_info (int index, ptl_nid_t *nidp, int *portp, int *persistencep)
                                 continue;
 
                         *nidp = peer->rap_nid;
+                        *ipp = peer->rap_ip;
                         *portp = peer->rap_port;
                         *persistencep = peer->rap_persistence;
 
@@ -1240,7 +1245,7 @@ kranal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port)
 
         peer2 = kranal_find_peer_locked(nid);
         if (peer2 != NULL) {
-                kranal_put_peer(peer);
+                kranal_peer_decref(peer);
                 peer = peer2;
         } else {
                 /* peer table takes existing ref on peer */
@@ -1392,12 +1397,12 @@ kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation)
         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
                 conn = list_entry(ctmp, kra_conn_t, rac_list);
 
-                if (conn->rac_incarnation == incarnation)
+                if (conn->rac_peer_incarnation == incarnation)
                         continue;
 
                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
-                       peer->rap_nid, conn->rac_incarnation, incarnation);
-                LASSERT (conn->rac_incarnation < incarnation);
+                       peer->rap_nid, conn->rac_peer_incarnation, incarnation);
+                LASSERT (conn->rac_peer_incarnation < incarnation);
 
                 count++;
                 kranal_close_conn_locked(conn, -ESTALE);
@@ -1497,7 +1502,7 @@ kranal_cmd(struct portals_cfg *pcfg, void * private)
                         pcfg->pcfg_id    = 0;
                         pcfg->pcfg_misc  = 0;
                         pcfg->pcfg_flags = 0;
-                        kranal_put_conn(conn);
+                        kranal_conn_decref(conn);
                 }
                 break;
         }
@@ -1546,24 +1551,24 @@ kranal_alloc_txdescs(struct list_head *freelist, int n)
 
                 PORTAL_ALLOC(tx, sizeof(*tx));
                 if (tx == NULL) {
-                        CERROR("Can't allocate %stx[%d]\n", 
-                               isnblk ? "nblk ", i);
-                        kranal_free_txdescs();
+                        CERROR("Can't allocate %stx[%d]\n",
+                               isnblk ? "nblk " : "", i);
+                        kranal_free_txdescs(freelist);
                         return -ENOMEM;
                 }
 
                 PORTAL_ALLOC(tx->tx_phys,
-                             PLT_MD_MAX_IOV * sizeof(*tx->tx_phys));
+                             PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
                 if (tx->tx_phys == NULL) {
                         CERROR("Can't allocate %stx[%d]->tx_phys\n", 
-                               isnblk ? "nblk ", i);
+                               isnblk ? "nblk " : "", i);
 
                         PORTAL_FREE(tx, sizeof(*tx));
                         kranal_free_txdescs(freelist);
                         return -ENOMEM;
                 }
 
-                tx->tx_isnblk = isnblk
+                tx->tx_isnblk = isnblk;
                 tx->tx_buftype = RANAL_BUF_NONE;
 
                 list_add(&tx->tx_list, freelist);
@@ -1579,17 +1584,17 @@ kranal_device_init(int id, kra_device_t *dev)
         RAP_RETURN        rrc;
 
         dev->rad_id = id;
-        rrc = RapkGetDeviceByIndex(id, NULL, kranal_device_callback,
+        rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
                                    &dev->rad_handle);
         if (rrc != RAP_SUCCESS) {
-                CERROR("Can't get Rapidarray Device %d: %d\n", idx, rrc);
+                CERROR("Can't get Rapidarray Device %d: %d\n", id, rrc);
                 goto failed_0;
         }
 
         rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't reserve %d RDMA descriptors"
-                       " for device[%d]: %d\n", total_ntx, i, rrc);
+                       " for device %d: %d\n", total_ntx, id, rrc);
                 goto failed_1;
         }
 
@@ -1597,7 +1602,7 @@ kranal_device_init(int id, kra_device_t *dev)
                              &dev->rad_ptag);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create ptag"
-                       " for device[%d]: %d\n", i, rrc);
+                       " for device %d: %d\n", id, rrc);
                 goto failed_1;
         }
 
@@ -1605,7 +1610,7 @@ kranal_device_init(int id, kra_device_t *dev)
                            &dev->rad_rdma_cq);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create rdma cq size %d"
-                       " for device[%d]: %d\n", total_ntx, i, rrc);
+                       " for device %d: %d\n", total_ntx, id, rrc);
                 goto failed_2;
         }
 
@@ -1613,7 +1618,7 @@ kranal_device_init(int id, kra_device_t *dev)
                            dev->rad_ptag, &dev->rad_fma_cq);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create fma cq size %d"
-                       " for device[%d]: %d\n", RANAL_RX_CQ_SIZE, i, rrc);
+                       " for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc);
                 goto failed_3;
         }
 
@@ -1632,7 +1637,7 @@ kranal_device_init(int id, kra_device_t *dev)
 void
 kranal_device_fini(kra_device_t *dev)
 {
-        RapkDestroyCQ(dev->rad_handle, dev->rad_rx_cq, dev->rad_ptag);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
         RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
         RapkReleaseDevice(dev->rad_handle);
@@ -1663,7 +1668,7 @@ kranal_api_shutdown (nal_t *nal)
 
         case RANAL_INIT_ALL:
                 /* stop calls to nal_cmd */
-                libcfs_nal_cmd_unregister(OPENRANAL);
+                libcfs_nal_cmd_unregister(RANAL);
                 /* No new persistent peers */
 
                 /* resetting my NID to unadvertises me, removes my
@@ -1674,7 +1679,7 @@ kranal_api_shutdown (nal_t *nal)
                 /* Wait for all peer/conn state to clean up */
                 i = 2;
                 while (atomic_read(&kranal_data.kra_nconns) != 0 ||
-                       atomic_read(&kranal-data.kra_npeers) != 0) {
+                       atomic_read(&kranal_data.kra_npeers) != 0) {
                         i++;
                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
                                "waiting for %d peers and %d conns to close down\n",
@@ -1710,9 +1715,9 @@ kranal_api_shutdown (nal_t *nal)
         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
         LASSERT (list_empty(&kranal_data.kra_connd_peers));
-        spin_lock_irqsave(&kranal-data.kra_connd_lock, flags); 
+        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); 
         wake_up_all(&kranal_data.kra_connd_waitq);
-        spin_unlock_irqrestore(&kranal-data.kra_connd_lock, flags); 
+        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); 
 
         i = 2;
         while (atomic_read(&kranal_data.kra_nthreads) != 0) {
@@ -1897,7 +1902,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         if (kranal_data.kra_ndevs == 0)
                 goto failed;
 
-        rc = libcfs_nal_cmd_register(OPENRANAL, &kranal_cmd, NULL);
+        rc = libcfs_nal_cmd_register(RANAL, &kranal_cmd, NULL);
         if (rc != 0) {
                 CERROR("Can't initialise command interface (rc = %d)\n", rc);
                 goto failed;
@@ -1927,7 +1932,7 @@ kranal_module_fini (void)
 #endif
         PtlNIFini(kranal_ni);
 
-        ptl_unregister_nal(OPENRANAL);
+        ptl_unregister_nal(RANAL);
 }
 
 int __init
@@ -1949,16 +1954,16 @@ kranal_module_init (void)
         /* Initialise dynamic tunables to defaults once only */
         kranal_tunables.kra_timeout = RANAL_TIMEOUT;
 
-        rc = ptl_register_nal(OPENRANAL, &kranal_api);
+        rc = ptl_register_nal(RANAL, &kranal_api);
         if (rc != PTL_OK) {
                 CERROR("Can't register RANAL: %d\n", rc);
                 return -ENOMEM;               /* or something... */
         }
 
         /* Pure gateways want the NAL started up at module load time... */
-        rc = PtlNIInit(OPENRANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni);
+        rc = PtlNIInit(RANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni);
         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
-                ptl_unregister_nal(OPENRANAL);
+                ptl_unregister_nal(RANAL);
                 return -ENODEV;
         }
 
index c134179..fe130b7 100644 (file)
@@ -48,6 +48,8 @@
 #include <linux/kmod.h>
 #include <linux/sysctl.h>
 
+#include <net/sock.h>
+
 #define DEBUG_SUBSYSTEM S_NAL
 
 #include <linux/kp30.h>
@@ -79,7 +81,7 @@
 #define RANAL_NTX             64                /* # tx descs */
 #define RANAL_NTX_NBLK        256               /* # reserved tx descs */
 
-#define RANAL_RX_CQ_SIZE      1024              /* # entries in receive CQ 
+#define RANAL_FMA_CQ_SIZE     8192              /* # entries in receive CQ 
                                                  * (overflow is a performance hit) */
 
 #define RANAL_RESCHED         100               /* # scheduler loops before reschedule */
@@ -159,8 +161,8 @@ typedef struct
 
 #define RANAL_INIT_NOTHING         0
 #define RANAL_INIT_DATA            1
-
-#define RANAL_INIT_ALL             7
+#define RANAL_INIT_LIB             2
+#define RANAL_INIT_ALL             3
 
 /************************************************************************
  * Wire message structs.  These are sent in sender's byte order
@@ -339,11 +341,26 @@ typedef struct kra_peer
         unsigned long       rap_reconnect_interval; /* exponential backoff */
 } kra_peer_t;
 
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0))
+# define sk_allocation  allocation
+# define sk_data_ready data_ready
+# define sk_write_space write_space
+# define sk_user_data   user_data
+# define sk_prot        prot
+# define sk_sndbuf      sndbuf
+# define sk_socket      socket
+# define sk_wmem_queued wmem_queued
+# define sk_err         err
+# define sk_sleep       sleep
+#endif
 
 extern lib_nal_t       kranal_lib;
 extern kra_data_t      kranal_data;
 extern kra_tunables_t  kranal_tunables;
 
+extern void __kranal_peer_decref(kra_peer_t *peer);
+extern void __kranal_conn_decref(kra_conn_t *conn);
+
 static inline void
 kranal_peer_addref(kra_peer_t *peer)
 {
@@ -404,8 +421,9 @@ kranal_cqid2connlist (__u32 cqid)
 static inline kra_conn_t *
 kranal_cqid2conn_locked (__u32 cqid) 
 {
-        struct list_head  conns = kranal_cqid2connlist(cqid);
+        struct list_head *conns = kranal_cqid2connlist(cqid);
         struct list_head *tmp;
+        kra_conn_t       *conn;
         
         list_for_each(tmp, conns) {
                 conn = list_entry(tmp, kra_conn_t, rac_hashlist);
@@ -436,3 +454,24 @@ kranal_page2phys (struct page *p)
 # error "no page->phys"
 #endif
 
+extern int kranal_listener_procint(ctl_table *table, 
+                                   int write, struct file *filp, 
+                                   void *buffer, size_t *lenp);
+extern int kranal_close_stale_conns_locked (kra_peer_t *peer, 
+                                            __u64 incarnation);
+extern void kranal_update_reaper_timeout(long timeout);
+extern void kranal_tx_done (kra_tx_t *tx, int completion);
+extern void kranal_unlink_peer_locked (kra_peer_t *peer);
+extern void kranal_schedule_conn(kra_conn_t *conn);
+extern kra_peer_t *kranal_create_peer (ptl_nid_t nid);
+extern kra_peer_t *kranal_find_peer_locked (ptl_nid_t nid);
+extern void kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx);
+extern int kranal_del_peer (ptl_nid_t nid, int single_share);
+extern void kranal_device_callback(RAP_INT32 devid);
+extern int kranal_thread_start (int(*fn)(void *arg), void *arg);
+extern int kranal_connd (void *arg);
+extern int kranal_reaper (void *arg);
+extern int kranal_scheduler (void *arg);
+extern void kranal_close_conn_locked (kra_conn_t *conn, int error);
+extern void kranal_terminate_conn_locked (kra_conn_t *conn);
+extern void kranal_connect (kra_peer_t *peer);
index b491d71..9490b56 100644 (file)
@@ -42,6 +42,7 @@ kranal_device_callback(RAP_INT32 devid)
 {
         kra_device_t *dev;
         int           i;
+        unsigned long flags;
         
         for (i = 0; i < kranal_data.kra_ndevs; i++) {
 
@@ -157,7 +158,7 @@ kranal_tx_done (kra_tx_t *tx, int completion)
 
         case RANAL_BUF_PHYS_MAPPED:
                 LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_con->rac_device;
+                dev = tx->tx_conn->rac_device;
                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
                                            dev->rad_ptag, &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
@@ -165,8 +166,8 @@ kranal_tx_done (kra_tx_t *tx, int completion)
 
         case RANAL_BUF_VIRT_MAPPED:
                 LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_con->rac_device;
-                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer
+                dev = tx->tx_conn->rac_device;
+                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
                                            dev->rad_ptag, &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 break;
@@ -261,7 +262,7 @@ kranal_init_msg(kra_msg_t *msg, int type)
         /* ram_incarnation gets set when FMA is sent */
 }
 
-kra_tx_t
+kra_tx_t *
 kranal_new_tx_msg (int may_block, int type)
 {
         kra_tx_t *tx = kranal_get_idle_tx(may_block);
@@ -294,7 +295,7 @@ kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                 return -EMSGSIZE;
         }
 
-        tx->tx_bufftype = RANAL_BUF_IMMEDIATE;
+        tx->tx_buftype = RANAL_BUF_IMMEDIATE;
         tx->tx_nob = nob;
         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
         return 0;
@@ -321,7 +322,7 @@ kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                 return -EMSGSIZE;
         }
 
-        tx->tx_bufftype = RANAL_BUF_VIRT_UNMAPPED;
+        tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
         tx->tx_nob = nob;
         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
         return 0;
@@ -347,10 +348,9 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
                 LASSERT (nkiov > 0);
         }
 
-        tx->tx_bufftype = RANAL_BUF_PHYS_UNMAPPED;
+        tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
         tx->tx_nob = nob;
-        tx->tx_buffer = NULL;
-        tx->tx_phys_offset = kiov->kiov_offset + offset;
+        tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
         
         phys->Address = kranal_page2phys(kiov->kiov_page);
         phys->Length  = PAGE_SIZE;
@@ -368,15 +368,9 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
                         int i;
                         /* Can't have gaps */
                         CERROR("Can't make payload contiguous in I/O VM:"
-                               "page %d, offset %d, len %d \n", nphys, 
-                               kiov->kiov_offset, kiov->kiov_len);
-
-                        for (i = -nphys; i < nkiov; i++) {
-                                CERROR("kiov[%d] %p +%d for %d\n",
-                                       i, kiov[i].kiov_page, 
-                                       kiov[i].kiov_offset, kiov[i].kiov_len);
-                        }
-                        
+                               "page %d, offset %d, len %d \n", 
+                               phys - tx->tx_phys, 
+                               kiov->kiov_offset, kiov->kiov_len);                        
                         return -EINVAL;
                 }
 
@@ -406,7 +400,7 @@ kranal_setup_buffer (kra_tx_t *tx, int niov,
         if (kiov != NULL)
                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
         
-        return kranal_setup_virt_buffer(tx, niov, kiov, offset, nob);
+        return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
 }
 
 void
@@ -414,6 +408,7 @@ kranal_map_buffer (kra_tx_t *tx)
 {
         kra_conn_t     *conn = tx->tx_conn;
         kra_device_t   *dev = conn->rac_device;
+        RAP_RETURN      rrc;
 
         switch (tx->tx_buftype) {
         default:
@@ -503,7 +498,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
         peer = kranal_find_peer_locked(nid);
         if (peer == NULL) {
                 write_unlock_irqrestore(g_lock, flags);
-                kranal_tx_done(tx -EHOSTUNREACH);
+                kranal_tx_done(tx, -EHOSTUNREACH);
                 return;
         }
 
@@ -547,8 +542,9 @@ static void
 kranal_rdma(kra_tx_t *tx, int type, 
             kra_rdma_desc_t *rard, int nob, __u64 cookie)
 {
-        kra_conn_t *conn = tx->tx_conn;
-        RAP_RETURN  rrc;
+        kra_conn_t   *conn = tx->tx_conn;
+        RAP_RETURN    rrc;
+        unsigned long flags;
 
         /* prep final completion message */
         kranal_init_msg(&tx->tx_msg, type);
@@ -559,7 +555,7 @@ kranal_rdma(kra_tx_t *tx, int type,
         LASSERT (nob <= rard->rard_nob);
 
         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
-        tx->tx_rdma_desc.SrcPtr = tx->tx_buffer;
+        tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
         tx->tx_rdma_desc.DstPtr = rard->rard_addr;
         tx->tx_rdma_desc.DstKey = rard->rard_key;
@@ -619,6 +615,7 @@ kranal_do_send (lib_nal_t    *nal,
 {
         kra_conn_t *conn;
         kra_tx_t   *tx;
+        int         rc;
 
         /* NB 'private' is different depending on what we're sending.... */
 
@@ -672,8 +669,8 @@ kranal_do_send (lib_nal_t    *nal,
 
                 kranal_map_buffer(tx);
                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
-                            &conn->rac_rxmsg->ram_u.getreq.ragm_desc, nob,
-                            &conn->rac_rxmsg->ram_u.getreq.ragm_cookie);
+                            &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
+                            conn->rac_rxmsg->ram_u.get.ragm_cookie);
                 return PTL_OK;
         }
 
@@ -704,7 +701,7 @@ kranal_do_send (lib_nal_t    *nal,
                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
                 /* rest of tx_msg is setup just before it is sent */
                 kranal_launch_tx(tx, nid);
-                return PTL_OK
+                return PTL_OK;
 
         case PTL_MSG_ACK:
                 LASSERT (nob == 0);
@@ -716,7 +713,7 @@ kranal_do_send (lib_nal_t    *nal,
                     nob <= kranal_tunables.kra_max_immediate)
                         break;                  /* send IMMEDIATE */
                 
-                tx = kranal_new_tx_msg(!in_interrupt(), RANA_MSG_PUT_REQ);
+                tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
                 if (tx == NULL)
                         return PTL_NO_SPACE;
 
@@ -786,6 +783,7 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
 {
         kra_conn_t  *conn = private;
         kra_msg_t   *rxmsg = conn->rac_rxmsg;
+        kra_tx_t    *tx;
         void        *buffer;
         int          rc;
         
@@ -834,7 +832,7 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
 
         case RANAL_MSG_PUT_REQ:
                 if (libmsg == NULL) {           /* PUT didn't match... */
-                        lib_finalize(null, NULL, libmsg, PTL_OK);
+                        lib_finalize(nal, NULL, libmsg, PTL_OK);
                         return PTL_OK;
                 }
                 
@@ -853,9 +851,10 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
-                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_addr = tx->tx_buffer;
-                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_nob = mlen;
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = 
+                        (__u64)((unsigned long)tx->tx_buffer);
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
 
                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
 
@@ -913,7 +912,7 @@ kranal_check_conn (kra_conn_t *conn)
         unsigned long      now = jiffies;
 
         if (!conn->rac_closing &&
-            time_after_eq(now, conn->rac_last_sent + conn->rac_keepalive * HZ)) {
+            time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
                 kranal_schedule_conn(conn);
         }
@@ -923,7 +922,7 @@ kranal_check_conn (kra_conn_t *conn)
 
         if (!conn->rac_close_recvd &&
             time_after_eq(now, conn->rac_last_rx + timeout)) {
-                CERROR("Nothing received from "LPX64" within %d seconds\n",
+                CERROR("Nothing received from "LPX64" within %lu seconds\n",
                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
                 return -ETIMEDOUT;
         }
@@ -942,8 +941,8 @@ kranal_check_conn (kra_conn_t *conn)
                 
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CERROR("tx on fmaq for "LPX64" blocked %d seconds\n",
-                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
+                               conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
                         return -ETIMEDOUT;
                 }
         }
@@ -953,8 +952,8 @@ kranal_check_conn (kra_conn_t *conn)
                 
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CERROR("tx on rdmaq for "LPX64" blocked %d seconds\n",
-                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
+                               conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
                         return -ETIMEDOUT;
                 }
         }
@@ -964,8 +963,8 @@ kranal_check_conn (kra_conn_t *conn)
                 
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CERROR("tx on replyq for "LPX64" blocked %d seconds\n",
-                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
+                               conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
                         return -ETIMEDOUT;
                 }
         }
@@ -980,6 +979,8 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp)
         struct list_head  *conns = &kranal_data.kra_conns[idx];
         struct list_head  *ctmp;
         kra_conn_t        *conn;
+        unsigned long      flags;
+        int                rc;
 
  again:
         /* NB. We expect to check all the conns and not find any problems, so
@@ -987,7 +988,7 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp)
         read_lock(&kranal_data.kra_global_lock);
 
         list_for_each (ctmp, conns) {
-                conn = list_entry(ptmp, kra_conn_t, rac_hashlist);
+                conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
 
                 if (conn->rac_timeout < *min_timeoutp )
                         *min_timeoutp = conn->rac_timeout;
@@ -1004,13 +1005,15 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp)
                 CERROR("Check on conn to "LPX64"failed: %d\n",
                        conn->rac_peer->rap_nid, rc);
 
-                write_lock_irqsave(&kranal_data.kra_global_lock);
+                write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
                 if (!conn->rac_closing)
                         kranal_close_conn_locked(conn, -ETIMEDOUT);
                 else
                         kranal_terminate_conn_locked(conn);
                         
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
                 kranal_conn_decref(conn);
 
                 /* start again now I've dropped the lock */
@@ -1048,7 +1051,7 @@ kranal_connd (void *arg)
                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
                         kranal_connect(peer);
-                        kranal_put_peer(peer);
+                        kranal_peer_decref(peer);
 
                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
                        continue;
@@ -1095,7 +1098,6 @@ kranal_reaper (void *arg)
         unsigned long      flags;
         kra_conn_t        *conn;
         kra_peer_t        *peer;
-        unsigned long      flags;
         long               timeout;
         int                i;
         int                conn_entries = kranal_data.kra_conn_hash_size;
@@ -1197,7 +1199,6 @@ kranal_reaper (void *arg)
 
                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
-                busy_loops = 0;
                 schedule_timeout(timeout);
 
                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
@@ -1230,12 +1231,12 @@ kranal_process_rdmaq (__u32 cqid)
         spin_lock_irqsave(&conn->rac_lock, flags);
 
         LASSERT (!list_empty(&conn->rac_rdmaq));
-        tx = list_entry(con->rac_rdmaq.next, kra_tx_t, tx_list);
+        tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
         list_del(&tx->tx_list);
 
         LASSERT(desc->AppPtr == (void *)tx);
-        LASSERT(desc->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
-                desc->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+        LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
+                tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
 
         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
         tx->tx_qtime = jiffies;
@@ -1252,26 +1253,30 @@ int
 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
                void *immediate, int immediatenob)
 {
-        int   sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
-
+        int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
+        RAP_RETURN rrc;
+        
         LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
                  immediatenob <= RANAL_FMA_MAX_DATA_LEN :
                  immediatenob == 0);
 
-        msg->ram_incarnation = conn->rac_incarnation;
+        msg->ram_incarnation = conn->rac_my_incarnation;
         msg->ram_seq = conn->rac_tx_seq;
 
         if (sync)
-                rrc = RapkFmaSyncSend(conn->rac_device.rad_handle,
+                rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
                                       immediate, immediatenob,
                                       msg, sizeof(*msg));
         else
-                rrc = RapkFmaSend(conn->rac_device.rad_handle,
+                rrc = RapkFmaSend(conn->rac_device->rad_handle,
                                   immediate, immediatenob,
                                   msg, sizeof(*msg));
 
         switch (rrc) {
+        default:
+                LBUG();
+
         case RAP_SUCCESS:
                 conn->rac_last_tx = jiffies;
                 conn->rac_tx_seq++;
@@ -1279,9 +1284,6 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
                 
         case RAP_NOT_DONE:
                 return -EAGAIN;
-
-        default:
-                LBUG();
         }
 }
 
@@ -1323,7 +1325,8 @@ kranal_process_fmaq (kra_conn_t *conn)
 
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
 
-                if (time_after_eq(conn->rac_last_tx + conn->rac_keepalive)) {
+                if (time_after_eq(jiffies, 
+                                  conn->rac_last_tx + conn->rac_keepalive)) {
                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                 }
@@ -1367,7 +1370,8 @@ kranal_process_fmaq (kra_conn_t *conn)
                 kranal_map_buffer(tx);
                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.get.ragm_desc.rard_addr = tx->tx_buffer;
+                tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = 
+                        (__u64)((unsigned long)tx->tx_buffer);
                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
                 expect_reply = 1;
@@ -1404,7 +1408,7 @@ kranal_swab_rdma_desc (kra_rdma_desc_t *d)
         __swab16s(&d->rard_key.Cookie);
         __swab16s(&d->rard_key.MdHandle);
         __swab32s(&d->rard_key.Flags);
-        __swab64s(&d->rard_addr);
+        __swab64s(&d->rard_addr.AddressBits);
         __swab32s(&d->rard_nob);
 }
 
@@ -1440,8 +1444,10 @@ kranal_process_receives(kra_conn_t *conn)
         unsigned long flags;
         __u32         seq;
         __u32         nob;
+        kra_tx_t     *tx;
         kra_msg_t    *msg;
-        RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &msg);
+        void         *prefix;
+        RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
         kra_peer_t   *peer = conn->rac_peer;
 
         if (rrc == RAP_NOT_DONE)
@@ -1449,7 +1455,8 @@ kranal_process_receives(kra_conn_t *conn)
         
         LASSERT (rrc == RAP_SUCCESS);
         conn->rac_last_rx = jiffies;
-        seq = conn->rac_seq++;
+        seq = conn->rac_rx_seq++;
+        msg = (kra_msg_t *)prefix;
 
         if (msg->ram_magic != RANAL_MSG_MAGIC) {
                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
@@ -1492,10 +1499,10 @@ kranal_process_receives(kra_conn_t *conn)
                 goto out;
         }
         
-        if (msg->ram_incarnation != conn->rac_incarnation) {
+        if (msg->ram_incarnation != conn->rac_peer_incarnation) {
                 CERROR("Unexpected incarnation "LPX64"("LPX64
                        " expected) from "LPX64"\n",
-                       msg->ram_incarnation, conn->rac_incarnation,
+                       msg->ram_incarnation, conn->rac_peer_incarnation,
                        peer->rap_nid);
                 goto out;
         }
@@ -1514,13 +1521,14 @@ kranal_process_receives(kra_conn_t *conn)
  
         if (msg->ram_type == RANAL_MSG_CLOSE) {
                 conn->rac_close_recvd = 1;
-                write_lock_irqsave(&kranal_data.kra_global_lock);
+                write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
                 if (!conn->rac_closing)
                         kranal_close_conn_locked(conn, -ETIMEDOUT);
                 else if (conn->rac_close_sent)
                         kranal_terminate_conn_locked(conn);
-                
+
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
                 goto out;
         }
 
@@ -1548,7 +1556,8 @@ kranal_process_receives(kra_conn_t *conn)
                 if (tx == NULL)
                         break;
                 
-                tx->tx_msg.ram_u.racm_cookie = msg->msg_u.putreq.raprm_cookie;
+                tx->tx_msg.ram_u.completion.racm_cookie = 
+                        msg->ram_u.putreq.raprm_cookie;
                 kranal_post_fma(conn, tx);
                 break;
 
@@ -1571,7 +1580,7 @@ kranal_process_receives(kra_conn_t *conn)
 
                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
                             &msg->ram_u.putack.rapam_desc, 
-                            msg->msg_u.putack.rapam_desc.rard_nob,
+                            msg->ram_u.putack.rapam_desc.rard_nob,
                             msg->ram_u.putack.rapam_dst_cookie);
                 break;
 
@@ -1587,7 +1596,7 @@ kranal_process_receives(kra_conn_t *conn)
                 break;
 
         case RANAL_MSG_GET_REQ:
-                lib_parse(&kranal_lib, &msg->ram_u.getreq.ragm_hdr, conn);
+                lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
                 
                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
                         break;
@@ -1596,7 +1605,7 @@ kranal_process_receives(kra_conn_t *conn)
                 if (tx == NULL)
                         break;
 
-                tx->tx_msg.ram_u.racm_cookie = msg->msg_u.getreq.ragm_cookie;
+                tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
                 kranal_post_fma(conn, tx);
                 break;
                 
@@ -1624,7 +1633,7 @@ kranal_process_receives(kra_conn_t *conn)
         }
 
  out:
-        if (conn->rac_msg != NULL)
+        if (conn->rac_rxmsg != NULL)
                 kranal_consume_rxmsg(conn, NULL, 0);
 
         return 1;
@@ -1638,13 +1647,16 @@ kranal_scheduler (void *arg)
         char            name[16];
         kra_conn_t     *conn;
         unsigned long   flags;
+        RAP_RETURN      rrc;
         int             rc;
+        int             resched;
         int             i;
         __u32           cqid;
+        __u32           event_type;
         int             did_something;
         int             busy_loops = 0;
 
-        snprintf(name, sizeof(name), "kranal_sd_%02ld", dev->rad_idx);
+        snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
         kportal_daemonize(name);
         kportal_blockallsigs();
 
index 00a0c4b..9c1537b 100644 (file)
@@ -676,7 +676,7 @@ jt_ptl_print_peers (int argc, char **argv)
         int                      index;
         int                      rc;
 
-        if (!g_nal_is_compatible (argv[0], SOCKNAL, OPENIBNAL, IIBNAL, 0))
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, OPENIBNAL, IIBNAL, RANAL, 0))
                 return -1;
 
         for (index = 0;;index++) {
@@ -832,7 +832,7 @@ jt_ptl_print_connections (int argc, char **argv)
         int                      index;
         int                      rc;
 
-        if (!g_nal_is_compatible (argv[0], SOCKNAL, OPENIBNAL, IIBNAL, 0))
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, OPENIBNAL, IIBNAL, RANAL, 0))
                 return -1;
 
         for (index = 0;;index++) {
@@ -1023,7 +1023,7 @@ int jt_ptl_disconnect(int argc, char **argv)
                 return 0;
         }
 
-        if (!g_nal_is_compatible (NULL, SOCKNAL, OPENIBNAL, IIBNAL, 0))
+        if (!g_nal_is_compatible (NULL, SOCKNAL, OPENIBNAL, IIBNAL, RANAL, 0))
                 return 0;
 
         if (argc >= 2 &&
index 7d807da..2f1d563 100644 (file)
@@ -545,7 +545,7 @@ if test x$enable_modules != xno ; then
                        RAP_RETURN          rc;
                        RAP_PVOID           dev_handle;
 
-                       rc = RapkGetDeviceByIndex(0, NULL, NULL, &dev_handle);
+                       rc = RapkGetDeviceByIndex(0, NULL, &dev_handle);
 
                        return rc == RAP_SUCCESS ? 0 : 1;
                ],[
index a59757d..c924827 100644 (file)
@@ -122,7 +122,7 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
                /* Set receive timeout to remaining time */
                tv = (struct timeval) {
                        .tv_sec = ticks / HZ,
-                       .tv_usec = ((ticks % HZ) * 1000000) / HZ;
+                       .tv_usec = ((ticks % HZ) * 1000000) / HZ
                };
                set_fs(KERNEL_DS);
                rc = sock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
@@ -130,7 +130,7 @@ kranal_sock_read (struct socket *sock, void *buffer, int nob, int timeout)
                set_fs(oldmm);
                if (rc != 0) {
                        CERROR("Can't set socket recv timeout %d: %d\n",
-                              send_timeout, rc);
+                              timeout, rc);
                        return rc;
                }
 
@@ -211,6 +211,8 @@ kranal_pause(int ticks)
 void
 kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
 {
+        RAP_RETURN   rrc;
+
         memset(connreq, 0, sizeof(*connreq));
 
         connreq->racr_magic       = RANAL_MSG_MAGIC;
@@ -225,12 +227,12 @@ kranal_pack_connreq(kra_connreq_t *connreq, kra_conn_t *conn)
 }
 
 int
-kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout)
+kranal_recv_connreq(struct socket *sock, kra_connreq_t *connreq, int timeout)
 {
         int         i;
        int         rc;
 
-       rc = kranal_sock_read(newsock, connreq, sizeof(*connreq), timeout);
+       rc = kranal_sock_read(sock, connreq, sizeof(*connreq), timeout);
        if (rc != 0) {
                CERROR("Read failed: %d\n", rc);
                return rc;
@@ -273,7 +275,7 @@ kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout)
         
         for (i = 0; i < kranal_data.kra_ndevs; i++)
                 if (connreq->racr_devid == 
-                    kranal_data.kra_devices[i]->rad_id)
+                    kranal_data.kra_devices[i].rad_id)
                         break;
 
         if (i == kranal_data.kra_ndevs) {
@@ -285,7 +287,7 @@ kranal_recv_connreq(struct sock *sock, kra_connreq_t *connreq, int timeout)
 }
 
 int
-kranal_conn_isdup_locked(kranal_peer_t *peer, __u64 incarnation)
+kranal_conn_isdup_locked(kra_peer_t *peer, __u64 incarnation)
 {
        kra_conn_t       *conn;
        struct list_head *tmp;
@@ -294,13 +296,13 @@ kranal_conn_isdup_locked(kranal_peer_t *peer, __u64 incarnation)
        list_for_each(tmp, &peer->rap_conns) {
                conn = list_entry(tmp, kra_conn_t, rac_list);
 
-                if (conn->rac_incarnation < incarnation) {
+                if (conn->rac_peer_incarnation < incarnation) {
                         /* Conns with an older incarnation get culled later */
                         continue;
                 }
 
                 if (!loopback &&
-                    conn->rac_incarnation == incarnation &&
+                    conn->rac_peer_incarnation == incarnation &&
                     peer->rap_nid == kranal_lib.libnal_ni.ni_pid.nid) {
                         /* loopback creates 2 conns */
                         loopback = 1;
@@ -324,7 +326,7 @@ kranal_set_conn_uniqueness (kra_conn_t *conn)
 
         do {    /* allocate a unique cqid */
                 conn->rac_cqid = kranal_data.kra_next_cqid++;
-        } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL)
+        } while (kranal_cqid2conn_locked(conn->rac_cqid) != NULL);
         
 
         write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
@@ -343,7 +345,6 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
                return -ENOMEM;
 
        memset(conn, 0, sizeof(*conn));
-        conn->rac_cqid = cqid;
        atomic_set(&conn->rac_refcount, 1);
        INIT_LIST_HEAD(&conn->rac_list);
        INIT_LIST_HEAD(&conn->rac_hashlist);
@@ -352,10 +353,13 @@ kranal_alloc_conn(kra_conn_t **connp, kra_device_t *dev)
        INIT_LIST_HEAD(&conn->rac_replyq);
        spin_lock_init(&conn->rac_lock);
 
+        kranal_set_conn_uniqueness(conn);
+
         conn->rac_timeout = MAX(kranal_tunables.kra_timeout, RANAL_MIN_TIMEOUT);
         kranal_update_reaper_timeout(conn->rac_timeout);
 
-        rrc = RapkCreateRi(dev->rad_handle, cqid, dev->rad_ptag,
+        rrc = RapkCreateRi(dev->rad_handle, conn->rac_cqid,
+                           dev->rad_ptag,
                            dev->rad_rdma_cq, dev->rad_fma_cq,
                            &conn->rac_rihandle);
         if (rrc != RAP_SUCCESS) {
@@ -412,7 +416,7 @@ __kranal_conn_decref(kra_conn_t *conn)
 void
 kranal_terminate_conn_locked (kra_conn_t *conn)
 {
-        kra_peer_t     *peer - conn->rac_peer;
+        kra_peer_t *peer = conn->rac_peer;
 
         LASSERT (!in_interrupt());
         LASSERT (conn->rac_closing);
@@ -469,7 +473,7 @@ kranal_close_conn (kra_conn_t *conn, int error)
 
 int
 kranal_passive_conn_handshake (struct socket *sock, 
-                               ptl_nid_t **peer_nidp, kra_conn_t **connp)
+                               ptl_nid_t *peer_nidp, kra_conn_t **connp)
 {
        struct sockaddr_in   addr;
        __u32                peer_ip;
@@ -480,16 +484,18 @@ kranal_passive_conn_handshake (struct socket *sock,
         kra_device_t        *dev;
        RAP_RETURN           rrc;
        int                  rc;
+        int                  len;
         int                  i;
 
-       rc = sock->ops->getname(newsock, (struct sockaddr *)addr, &len, 2);
+        len = sizeof(addr);
+       rc = sock->ops->getname(sock, (struct sockaddr *)&addr, &len, 2);
         if (rc != 0) {
                 CERROR("Can't get peer's IP: %d\n", rc);
                 return rc;
         }
 
-        peer_ip = ntohl(sin.sin_addr.s_addr);
-        peer_port = ntohs(sin.sin_port);
+        peer_ip = ntohl(addr.sin_addr.s_addr);
+        peer_port = ntohs(addr.sin_port);
 
         if (peer_port >= 1024) {
                 CERROR("Refusing unprivileged connection from %u.%u.%u.%u/%d\n",
@@ -498,7 +504,7 @@ kranal_passive_conn_handshake (struct socket *sock,
         }
 
         rc = kranal_recv_connreq(sock, &connreq, 
-                                 kranal_data.kra_listener_timeout);
+                                 kranal_tunables.kra_listener_timeout);
         if (rc != 0) {
                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
                        HIPQUAD(peer_ip), peer_port, rc);
@@ -511,11 +517,11 @@ kranal_passive_conn_handshake (struct socket *sock,
         for (i = 0;;i++) {
                 LASSERT(i < kranal_data.kra_ndevs);
                 dev = &kranal_data.kra_devices[i];
-                if (dev->rad_id == connreq->racr_devid)
+                if (dev->rad_id == connreq.racr_devid)
                         break;
         }
 
-        rc = kranal_alloc_conn(&conn, dev,(__u32)(peer_nid & 0xffffffff));
+        rc = kranal_alloc_conn(&conn, dev);
         if (rc != 0)
                 return rc;
 
@@ -523,7 +529,7 @@ kranal_passive_conn_handshake (struct socket *sock,
         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq.racr_timeout);
         kranal_update_reaper_timeout(conn->rac_keepalive);
         
-        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
+        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't set riparams for "LPX64": %d\n", peer_nid, rrc);
                 kranal_conn_decref(conn);
@@ -534,7 +540,7 @@ kranal_passive_conn_handshake (struct socket *sock,
 
         rc = kranal_sock_write(sock, &connreq, sizeof(connreq));
         if (rc != 0) {
-                CERROR("Can't tx connreq to %u.%u.%u.%u/%p: %d\n", 
+                CERROR("Can't tx connreq to %u.%u.%u.%u/%d: %d\n", 
                        HIPQUAD(peer_ip), peer_port, rc);
                 kranal_conn_decref(conn);
                 return rc;
@@ -606,36 +612,33 @@ ranal_connect_sock(kra_peer_t *peer, struct socket **sockp)
                 CDEBUG(D_NET, "Port %d not available for %u.%u.%u.%u/%d\n", 
                        port, HIPQUAD(peer->rap_ip), peer->rap_port);
         }
+
+        /* all ports busy */
+        return -EHOSTUNREACH;
 }
 
 
 int
 kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
 {
+        struct sockaddr_in  dstaddr;
        kra_connreq_t       connreq;
         kra_conn_t         *conn;
         kra_device_t       *dev;
         struct socket      *sock;
-        __u32               id32;
        RAP_RETURN          rrc;
        int                 rc;
+        int                 idx;
+        
+        idx = peer->rap_nid & 0x7fffffff;
+        dev = &kranal_data.kra_devices[idx % kranal_data.kra_ndevs];
 
-        id32 = (peer_nid & 0xffffffff);
-        dev = &kranal_data.kra_devices[id32 % kranal_data.kra_ndevs];
-
-        rc = kranal_alloc_conn(&conn, dev, id32);
+        rc = kranal_alloc_conn(&conn, dev);
         if (rc != 0)
                 return rc;
 
         kranal_pack_connreq(&connreq, conn);
         
-        memset(&dstaddr, 0, sizeof(addr));
-        dstaddr.sin_family      = AF_INET;
-        dstaddr.sin_port        = htons(peer->rap_port);
-        dstaddr.sin_addr.s_addr = htonl(peer->rap_ip);
-
-        memset(&srcaddr, 0, sizeof(addr));
-
         rc = ranal_connect_sock(peer, &sock);
         if (rc != 0)
                 goto failed_0;
@@ -651,7 +654,7 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
                 goto failed_1;
         }
 
-        rc = kranal_recv_connreq(sock, &connreq, kranal_data.kra_timeout);
+        rc = kranal_recv_connreq(sock, &connreq, kranal_tunables.kra_timeout);
         if (rc != 0) {
                 CERROR("Can't rx connreq from %u.%u.%u.%u/%d: %d\n", 
                        HIPQUAD(peer->rap_ip), peer->rap_port, rc);
@@ -682,11 +685,10 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
         kranal_update_reaper_timeout(conn->rac_keepalive);
 
         rc = -ENETDOWN;
-        rrc = RapkSetRiParams(conn->rac_rihandle,
-                              &connreq->racr_riparams);
+        rrc = RapkSetRiParams(conn->rac_rihandle, &connreq.racr_riparams);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't set riparams for "LPX64": %d\n",
-                       peer_nid, rrc);
+                       peer->rap_nid, rrc);
                 goto failed_0;
         }
 
@@ -694,16 +696,17 @@ kranal_active_conn_handshake(kra_peer_t *peer, kra_conn_t **connp)
        return 0;
 
  failed_1:
-        release_sock(sock);
+        sock_release(sock);
  failed_0:
         kranal_conn_decref(conn);
         return rc;
 }
 
 int
-kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer)
+kranal_conn_handshake (struct socket *sock, kra_peer_t *peer)
 {
-        kranal_peer_t     *peer2;
+        kra_peer_t        *peer2;
+        kra_tx_t          *tx;
        ptl_nid_t          peer_nid;
        unsigned long      flags;
         unsigned long      timeout;
@@ -770,7 +773,7 @@ kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer)
          * to connect at once).  NB we return success!  We _do_ have a
          * connection (so we don't need to remove the peer from the peer
          * table) and we _don't_ have any blocked txs to complete */
-       if (kranal_conn_isdup_locked(peer, conn->rac_incarnation)) {
+       if (kranal_conn_isdup_locked(peer, conn->rac_peer_incarnation)) {
                 LASSERT (!list_empty(&peer->rap_conns));
                 LASSERT (list_empty(&peer->rap_tx_queue));
                 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
@@ -794,10 +797,10 @@ kranal_conn_handshake (struct socket *sock, kranal_peer_t *peer)
                                 kra_tx_t, tx_list);
 
                 list_del(&tx->tx_list);
-                kranal_queue_tx_locked(tx, conn);
+                kranal_post_fma(conn, tx);
         }
 
-       nstale = kranal_close_stale_conns_locked(peer, conn->rac_incarnation);
+       nstale = kranal_close_stale_conns_locked(peer, conn->rac_peer_incarnation);
 
        write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
 
@@ -824,7 +827,7 @@ kranal_connect (kra_peer_t *peer)
 
         rc = kranal_conn_handshake(NULL, peer);
 
-        write_lock_irqqsave(&kranal_data.kra_global_lock, flags);
+        write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
         LASSERT (peer->rap_connecting);
         peer->rap_connecting = 0;
@@ -838,7 +841,7 @@ kranal_connect (kra_peer_t *peer)
                 peer->rap_reconnect_interval = RANAL_MIN_RECONNECT_INTERVAL;
                 peer->rap_reconnect_time = CURRENT_TIME;
 
-                write_unlock_irqrestore(&kranal-data.kra_global_lock, flags);
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
                 return;
         }
 
@@ -876,16 +879,15 @@ kranal_listener(void *arg)
        struct socket     *sock;
        struct socket     *newsock;
        int                port;
-       int                backlog;
-       int                timeout;
        kra_connreq_t     *connreqs;
        char               name[16];
+        int                rc;
 
        /* Parent thread holds kra_nid_mutex, and is, or is about to
         * block on kra_listener_signal */
 
-       port = kra_tunables.kra_port;
-       snprintf(name, "kranal_lstn%03d", port);
+       port = kranal_tunables.kra_port;
+       snprintf(name, sizeof(name), "kranal_lstn%03d", port);
        kportal_daemonize(name);
        kportal_blockallsigs();
 
@@ -896,24 +898,25 @@ kranal_listener(void *arg)
        if (connreqs == NULL)
                goto out_0;
 
-       rc = kranal_create_sock(&sock, port);
+       rc = kranal_create_sock(&sock);
        if (rc != 0)
                goto out_1;
 
         memset(&addr, 0, sizeof(addr));
         addr.sin_family      = AF_INET;
         addr.sin_port        = htons(port);
-        addr.sin_addr.s_addr = INADDR_ANY
+        addr.sin_addr.s_addr = INADDR_ANY;
 
-       rc = sock->ops->bind(sock, &addr, sizeof(addr));
+       rc = sock->ops->bind(sock, (struct sockaddr *)&addr, sizeof(addr));
        if (rc != 0) {
                CERROR("Can't bind to port %d\n", port);
                goto out_2;
        }
 
-       rc = sock->ops->listen(sock, kra_tunalbes.kra_backlog);
+       rc = sock->ops->listen(sock, kranal_tunables.kra_backlog);
        if (rc != 0) {
-               CERROR("Can't set listen backlog %d: %d\n", backlog, rc);
+               CERROR("Can't set listen backlog %d: %d\n", 
+                       kranal_tunables.kra_backlog, rc);
                goto out_2;
        }
 
@@ -976,7 +979,7 @@ kranal_listener(void *arg)
 }
 
 int
-kranal_start_listener ()
+kranal_start_listener (void)
 {
        long           pid;
        int            rc;
@@ -987,7 +990,7 @@ kranal_start_listener ()
        LASSERT (kranal_data.kra_listener_sock == NULL);
 
        kranal_data.kra_listener_shutdown == 0;
-       pid = kernel_thread(kranal_listener, sock, 0);
+       pid = kernel_thread(kranal_listener, NULL, 0);
        if (pid < 0) {
                CERROR("Can't spawn listener: %ld\n", pid);
                return (int)pid;
@@ -1004,7 +1007,7 @@ kranal_start_listener ()
 }
 
 void
-kranal_stop_listener()
+kranal_stop_listener(void)
 {
         CDEBUG(D_WARNING, "Stopping listener\n");
 
@@ -1012,7 +1015,7 @@ kranal_stop_listener()
        LASSERT (kranal_data.kra_listener_sock != NULL);
 
        kranal_data.kra_listener_shutdown = 1;
-       wake_up_all(kranal_data->kra_listener_sock->sk->sk_sleep);
+       wake_up_all(kranal_data.kra_listener_sock->sk->sk_sleep);
 
        /* Block until listener has torn down. */
        down(&kranal_data.kra_listener_signal);
@@ -1031,8 +1034,8 @@ kranal_listener_procint(ctl_table *table, int write, struct file *filp,
 
        down(&kranal_data.kra_nid_mutex);
 
-       LASSERT (tunable == &kranal_data.kra_port ||
-                tunable == &kranal_data.kra_backlog);
+       LASSERT (tunable == &kranal_tunables.kra_port ||
+                tunable == &kranal_tunables.kra_backlog);
        old_val = *tunable;
 
        rc = proc_dointvec(table, write, filp, buffer, lenp);
@@ -1060,7 +1063,7 @@ int
 kranal_set_mynid(ptl_nid_t nid)
 {
         lib_ni_t      *ni = &kranal_lib.libnal_ni;
-        int            rc;
+        int            rc = 0;
 
         CDEBUG(D_NET, "setting mynid to "LPX64" (old nid="LPX64")\n",
                nid, ni->ni_pid.nid);
@@ -1190,7 +1193,8 @@ kranal_unlink_peer_locked (kra_peer_t *peer)
 }
 
 int
-kranal_get_peer_info (int index, ptl_nid_t *nidp, int *portp, int *persistencep)
+kranal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp, int *portp, 
+                      int *persistencep)
 {
         kra_peer_t        *peer;
         struct list_head  *ptmp;
@@ -1210,6 +1214,7 @@ kranal_get_peer_info (int index, ptl_nid_t *nidp, int *portp, int *persistencep)
                                 continue;
 
                         *nidp = peer->rap_nid;
+                        *ipp = peer->rap_ip;
                         *portp = peer->rap_port;
                         *persistencep = peer->rap_persistence;
 
@@ -1240,7 +1245,7 @@ kranal_add_persistent_peer (ptl_nid_t nid, __u32 ip, int port)
 
         peer2 = kranal_find_peer_locked(nid);
         if (peer2 != NULL) {
-                kranal_put_peer(peer);
+                kranal_peer_decref(peer);
                 peer = peer2;
         } else {
                 /* peer table takes existing ref on peer */
@@ -1392,12 +1397,12 @@ kranal_close_stale_conns_locked (kra_peer_t *peer, __u64 incarnation)
         list_for_each_safe (ctmp, cnxt, &peer->rap_conns) {
                 conn = list_entry(ctmp, kra_conn_t, rac_list);
 
-                if (conn->rac_incarnation == incarnation)
+                if (conn->rac_peer_incarnation == incarnation)
                         continue;
 
                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
-                       peer->rap_nid, conn->rac_incarnation, incarnation);
-                LASSERT (conn->rac_incarnation < incarnation);
+                       peer->rap_nid, conn->rac_peer_incarnation, incarnation);
+                LASSERT (conn->rac_peer_incarnation < incarnation);
 
                 count++;
                 kranal_close_conn_locked(conn, -ESTALE);
@@ -1497,7 +1502,7 @@ kranal_cmd(struct portals_cfg *pcfg, void * private)
                         pcfg->pcfg_id    = 0;
                         pcfg->pcfg_misc  = 0;
                         pcfg->pcfg_flags = 0;
-                        kranal_put_conn(conn);
+                        kranal_conn_decref(conn);
                 }
                 break;
         }
@@ -1546,24 +1551,24 @@ kranal_alloc_txdescs(struct list_head *freelist, int n)
 
                 PORTAL_ALLOC(tx, sizeof(*tx));
                 if (tx == NULL) {
-                        CERROR("Can't allocate %stx[%d]\n", 
-                               isnblk ? "nblk ", i);
-                        kranal_free_txdescs();
+                        CERROR("Can't allocate %stx[%d]\n",
+                               isnblk ? "nblk " : "", i);
+                        kranal_free_txdescs(freelist);
                         return -ENOMEM;
                 }
 
                 PORTAL_ALLOC(tx->tx_phys,
-                             PLT_MD_MAX_IOV * sizeof(*tx->tx_phys));
+                             PTL_MD_MAX_IOV * sizeof(*tx->tx_phys));
                 if (tx->tx_phys == NULL) {
                         CERROR("Can't allocate %stx[%d]->tx_phys\n", 
-                               isnblk ? "nblk ", i);
+                               isnblk ? "nblk " : "", i);
 
                         PORTAL_FREE(tx, sizeof(*tx));
                         kranal_free_txdescs(freelist);
                         return -ENOMEM;
                 }
 
-                tx->tx_isnblk = isnblk
+                tx->tx_isnblk = isnblk;
                 tx->tx_buftype = RANAL_BUF_NONE;
 
                 list_add(&tx->tx_list, freelist);
@@ -1579,17 +1584,17 @@ kranal_device_init(int id, kra_device_t *dev)
         RAP_RETURN        rrc;
 
         dev->rad_id = id;
-        rrc = RapkGetDeviceByIndex(id, NULL, kranal_device_callback,
+        rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
                                    &dev->rad_handle);
         if (rrc != RAP_SUCCESS) {
-                CERROR("Can't get Rapidarray Device %d: %d\n", idx, rrc);
+                CERROR("Can't get Rapidarray Device %d: %d\n", id, rrc);
                 goto failed_0;
         }
 
         rrc = RapkReserveRdma(dev->rad_handle, total_ntx);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't reserve %d RDMA descriptors"
-                       " for device[%d]: %d\n", total_ntx, i, rrc);
+                       " for device %d: %d\n", total_ntx, id, rrc);
                 goto failed_1;
         }
 
@@ -1597,7 +1602,7 @@ kranal_device_init(int id, kra_device_t *dev)
                              &dev->rad_ptag);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create ptag"
-                       " for device[%d]: %d\n", i, rrc);
+                       " for device %d: %d\n", id, rrc);
                 goto failed_1;
         }
 
@@ -1605,7 +1610,7 @@ kranal_device_init(int id, kra_device_t *dev)
                            &dev->rad_rdma_cq);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create rdma cq size %d"
-                       " for device[%d]: %d\n", total_ntx, i, rrc);
+                       " for device %d: %d\n", total_ntx, id, rrc);
                 goto failed_2;
         }
 
@@ -1613,7 +1618,7 @@ kranal_device_init(int id, kra_device_t *dev)
                            dev->rad_ptag, &dev->rad_fma_cq);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Can't create fma cq size %d"
-                       " for device[%d]: %d\n", RANAL_RX_CQ_SIZE, i, rrc);
+                       " for device %d: %d\n", RANAL_FMA_CQ_SIZE, id, rrc);
                 goto failed_3;
         }
 
@@ -1632,7 +1637,7 @@ kranal_device_init(int id, kra_device_t *dev)
 void
 kranal_device_fini(kra_device_t *dev)
 {
-        RapkDestroyCQ(dev->rad_handle, dev->rad_rx_cq, dev->rad_ptag);
+        RapkDestroyCQ(dev->rad_handle, dev->rad_fma_cq, dev->rad_ptag);
         RapkDestroyCQ(dev->rad_handle, dev->rad_rdma_cq, dev->rad_ptag);
         RapkDestroyPtag(dev->rad_handle, dev->rad_ptag);
         RapkReleaseDevice(dev->rad_handle);
@@ -1663,7 +1668,7 @@ kranal_api_shutdown (nal_t *nal)
 
         case RANAL_INIT_ALL:
                 /* stop calls to nal_cmd */
-                libcfs_nal_cmd_unregister(OPENRANAL);
+                libcfs_nal_cmd_unregister(RANAL);
                 /* No new persistent peers */
 
                 /* resetting my NID to unadvertises me, removes my
@@ -1674,7 +1679,7 @@ kranal_api_shutdown (nal_t *nal)
                 /* Wait for all peer/conn state to clean up */
                 i = 2;
                 while (atomic_read(&kranal_data.kra_nconns) != 0 ||
-                       atomic_read(&kranal-data.kra_npeers) != 0) {
+                       atomic_read(&kranal_data.kra_npeers) != 0) {
                         i++;
                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
                                "waiting for %d peers and %d conns to close down\n",
@@ -1710,9 +1715,9 @@ kranal_api_shutdown (nal_t *nal)
         spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
         LASSERT (list_empty(&kranal_data.kra_connd_peers));
-        spin_lock_irqsave(&kranal-data.kra_connd_lock, flags); 
+        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); 
         wake_up_all(&kranal_data.kra_connd_waitq);
-        spin_unlock_irqrestore(&kranal-data.kra_connd_lock, flags); 
+        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); 
 
         i = 2;
         while (atomic_read(&kranal_data.kra_nthreads) != 0) {
@@ -1897,7 +1902,7 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
         if (kranal_data.kra_ndevs == 0)
                 goto failed;
 
-        rc = libcfs_nal_cmd_register(OPENRANAL, &kranal_cmd, NULL);
+        rc = libcfs_nal_cmd_register(RANAL, &kranal_cmd, NULL);
         if (rc != 0) {
                 CERROR("Can't initialise command interface (rc = %d)\n", rc);
                 goto failed;
@@ -1927,7 +1932,7 @@ kranal_module_fini (void)
 #endif
         PtlNIFini(kranal_ni);
 
-        ptl_unregister_nal(OPENRANAL);
+        ptl_unregister_nal(RANAL);
 }
 
 int __init
@@ -1949,16 +1954,16 @@ kranal_module_init (void)
         /* Initialise dynamic tunables to defaults once only */
         kranal_tunables.kra_timeout = RANAL_TIMEOUT;
 
-        rc = ptl_register_nal(OPENRANAL, &kranal_api);
+        rc = ptl_register_nal(RANAL, &kranal_api);
         if (rc != PTL_OK) {
                 CERROR("Can't register RANAL: %d\n", rc);
                 return -ENOMEM;               /* or something... */
         }
 
         /* Pure gateways want the NAL started up at module load time... */
-        rc = PtlNIInit(OPENRANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni);
+        rc = PtlNIInit(RANAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kranal_ni);
         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
-                ptl_unregister_nal(OPENRANAL);
+                ptl_unregister_nal(RANAL);
                 return -ENODEV;
         }
 
index c134179..fe130b7 100644 (file)
@@ -48,6 +48,8 @@
 #include <linux/kmod.h>
 #include <linux/sysctl.h>
 
+#include <net/sock.h>
+
 #define DEBUG_SUBSYSTEM S_NAL
 
 #include <linux/kp30.h>
@@ -79,7 +81,7 @@
 #define RANAL_NTX             64                /* # tx descs */
 #define RANAL_NTX_NBLK        256               /* # reserved tx descs */
 
-#define RANAL_RX_CQ_SIZE      1024              /* # entries in receive CQ 
+#define RANAL_FMA_CQ_SIZE     8192              /* # entries in receive CQ 
                                                  * (overflow is a performance hit) */
 
 #define RANAL_RESCHED         100               /* # scheduler loops before reschedule */
@@ -159,8 +161,8 @@ typedef struct
 
 #define RANAL_INIT_NOTHING         0
 #define RANAL_INIT_DATA            1
-
-#define RANAL_INIT_ALL             7
+#define RANAL_INIT_LIB             2
+#define RANAL_INIT_ALL             3
 
 /************************************************************************
  * Wire message structs.  These are sent in sender's byte order
@@ -339,11 +341,26 @@ typedef struct kra_peer
         unsigned long       rap_reconnect_interval; /* exponential backoff */
 } kra_peer_t;
 
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,6,0))
+# define sk_allocation  allocation
+# define sk_data_ready data_ready
+# define sk_write_space write_space
+# define sk_user_data   user_data
+# define sk_prot        prot
+# define sk_sndbuf      sndbuf
+# define sk_socket      socket
+# define sk_wmem_queued wmem_queued
+# define sk_err         err
+# define sk_sleep       sleep
+#endif
 
 extern lib_nal_t       kranal_lib;
 extern kra_data_t      kranal_data;
 extern kra_tunables_t  kranal_tunables;
 
+extern void __kranal_peer_decref(kra_peer_t *peer);
+extern void __kranal_conn_decref(kra_conn_t *conn);
+
 static inline void
 kranal_peer_addref(kra_peer_t *peer)
 {
@@ -404,8 +421,9 @@ kranal_cqid2connlist (__u32 cqid)
 static inline kra_conn_t *
 kranal_cqid2conn_locked (__u32 cqid) 
 {
-        struct list_head  conns = kranal_cqid2connlist(cqid);
+        struct list_head *conns = kranal_cqid2connlist(cqid);
         struct list_head *tmp;
+        kra_conn_t       *conn;
         
         list_for_each(tmp, conns) {
                 conn = list_entry(tmp, kra_conn_t, rac_hashlist);
@@ -436,3 +454,24 @@ kranal_page2phys (struct page *p)
 # error "no page->phys"
 #endif
 
+extern int kranal_listener_procint(ctl_table *table, 
+                                   int write, struct file *filp, 
+                                   void *buffer, size_t *lenp);
+extern int kranal_close_stale_conns_locked (kra_peer_t *peer, 
+                                            __u64 incarnation);
+extern void kranal_update_reaper_timeout(long timeout);
+extern void kranal_tx_done (kra_tx_t *tx, int completion);
+extern void kranal_unlink_peer_locked (kra_peer_t *peer);
+extern void kranal_schedule_conn(kra_conn_t *conn);
+extern kra_peer_t *kranal_create_peer (ptl_nid_t nid);
+extern kra_peer_t *kranal_find_peer_locked (ptl_nid_t nid);
+extern void kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx);
+extern int kranal_del_peer (ptl_nid_t nid, int single_share);
+extern void kranal_device_callback(RAP_INT32 devid);
+extern int kranal_thread_start (int(*fn)(void *arg), void *arg);
+extern int kranal_connd (void *arg);
+extern int kranal_reaper (void *arg);
+extern int kranal_scheduler (void *arg);
+extern void kranal_close_conn_locked (kra_conn_t *conn, int error);
+extern void kranal_terminate_conn_locked (kra_conn_t *conn);
+extern void kranal_connect (kra_peer_t *peer);
index b491d71..9490b56 100644 (file)
@@ -42,6 +42,7 @@ kranal_device_callback(RAP_INT32 devid)
 {
         kra_device_t *dev;
         int           i;
+        unsigned long flags;
         
         for (i = 0; i < kranal_data.kra_ndevs; i++) {
 
@@ -157,7 +158,7 @@ kranal_tx_done (kra_tx_t *tx, int completion)
 
         case RANAL_BUF_PHYS_MAPPED:
                 LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_con->rac_device;
+                dev = tx->tx_conn->rac_device;
                 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
                                            dev->rad_ptag, &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
@@ -165,8 +166,8 @@ kranal_tx_done (kra_tx_t *tx, int completion)
 
         case RANAL_BUF_VIRT_MAPPED:
                 LASSERT (tx->tx_conn != NULL);
-                dev = tx->tx_con->rac_device;
-                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer
+                dev = tx->tx_conn->rac_device;
+                rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
                                            dev->rad_ptag, &tx->tx_map_key);
                 LASSERT (rrc == RAP_SUCCESS);
                 break;
@@ -261,7 +262,7 @@ kranal_init_msg(kra_msg_t *msg, int type)
         /* ram_incarnation gets set when FMA is sent */
 }
 
-kra_tx_t
+kra_tx_t *
 kranal_new_tx_msg (int may_block, int type)
 {
         kra_tx_t *tx = kranal_get_idle_tx(may_block);
@@ -294,7 +295,7 @@ kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                 return -EMSGSIZE;
         }
 
-        tx->tx_bufftype = RANAL_BUF_IMMEDIATE;
+        tx->tx_buftype = RANAL_BUF_IMMEDIATE;
         tx->tx_nob = nob;
         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
         return 0;
@@ -321,7 +322,7 @@ kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
                 return -EMSGSIZE;
         }
 
-        tx->tx_bufftype = RANAL_BUF_VIRT_UNMAPPED;
+        tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
         tx->tx_nob = nob;
         tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
         return 0;
@@ -347,10 +348,9 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
                 LASSERT (nkiov > 0);
         }
 
-        tx->tx_bufftype = RANAL_BUF_PHYS_UNMAPPED;
+        tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
         tx->tx_nob = nob;
-        tx->tx_buffer = NULL;
-        tx->tx_phys_offset = kiov->kiov_offset + offset;
+        tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
         
         phys->Address = kranal_page2phys(kiov->kiov_page);
         phys->Length  = PAGE_SIZE;
@@ -368,15 +368,9 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
                         int i;
                         /* Can't have gaps */
                         CERROR("Can't make payload contiguous in I/O VM:"
-                               "page %d, offset %d, len %d \n", nphys, 
-                               kiov->kiov_offset, kiov->kiov_len);
-
-                        for (i = -nphys; i < nkiov; i++) {
-                                CERROR("kiov[%d] %p +%d for %d\n",
-                                       i, kiov[i].kiov_page, 
-                                       kiov[i].kiov_offset, kiov[i].kiov_len);
-                        }
-                        
+                               "page %d, offset %d, len %d \n", 
+                               phys - tx->tx_phys, 
+                               kiov->kiov_offset, kiov->kiov_len);                        
                         return -EINVAL;
                 }
 
@@ -406,7 +400,7 @@ kranal_setup_buffer (kra_tx_t *tx, int niov,
         if (kiov != NULL)
                 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
         
-        return kranal_setup_virt_buffer(tx, niov, kiov, offset, nob);
+        return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
 }
 
 void
@@ -414,6 +408,7 @@ kranal_map_buffer (kra_tx_t *tx)
 {
         kra_conn_t     *conn = tx->tx_conn;
         kra_device_t   *dev = conn->rac_device;
+        RAP_RETURN      rrc;
 
         switch (tx->tx_buftype) {
         default:
@@ -503,7 +498,7 @@ kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
         peer = kranal_find_peer_locked(nid);
         if (peer == NULL) {
                 write_unlock_irqrestore(g_lock, flags);
-                kranal_tx_done(tx -EHOSTUNREACH);
+                kranal_tx_done(tx, -EHOSTUNREACH);
                 return;
         }
 
@@ -547,8 +542,9 @@ static void
 kranal_rdma(kra_tx_t *tx, int type, 
             kra_rdma_desc_t *rard, int nob, __u64 cookie)
 {
-        kra_conn_t *conn = tx->tx_conn;
-        RAP_RETURN  rrc;
+        kra_conn_t   *conn = tx->tx_conn;
+        RAP_RETURN    rrc;
+        unsigned long flags;
 
         /* prep final completion message */
         kranal_init_msg(&tx->tx_msg, type);
@@ -559,7 +555,7 @@ kranal_rdma(kra_tx_t *tx, int type,
         LASSERT (nob <= rard->rard_nob);
 
         memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
-        tx->tx_rdma_desc.SrcPtr = tx->tx_buffer;
+        tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
         tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
         tx->tx_rdma_desc.DstPtr = rard->rard_addr;
         tx->tx_rdma_desc.DstKey = rard->rard_key;
@@ -619,6 +615,7 @@ kranal_do_send (lib_nal_t    *nal,
 {
         kra_conn_t *conn;
         kra_tx_t   *tx;
+        int         rc;
 
         /* NB 'private' is different depending on what we're sending.... */
 
@@ -672,8 +669,8 @@ kranal_do_send (lib_nal_t    *nal,
 
                 kranal_map_buffer(tx);
                 kranal_rdma(tx, RANAL_MSG_GET_DONE,
-                            &conn->rac_rxmsg->ram_u.getreq.ragm_desc, nob,
-                            &conn->rac_rxmsg->ram_u.getreq.ragm_cookie);
+                            &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
+                            conn->rac_rxmsg->ram_u.get.ragm_cookie);
                 return PTL_OK;
         }
 
@@ -704,7 +701,7 @@ kranal_do_send (lib_nal_t    *nal,
                 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
                 /* rest of tx_msg is setup just before it is sent */
                 kranal_launch_tx(tx, nid);
-                return PTL_OK
+                return PTL_OK;
 
         case PTL_MSG_ACK:
                 LASSERT (nob == 0);
@@ -716,7 +713,7 @@ kranal_do_send (lib_nal_t    *nal,
                     nob <= kranal_tunables.kra_max_immediate)
                         break;                  /* send IMMEDIATE */
                 
-                tx = kranal_new_tx_msg(!in_interrupt(), RANA_MSG_PUT_REQ);
+                tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
                 if (tx == NULL)
                         return PTL_NO_SPACE;
 
@@ -786,6 +783,7 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
 {
         kra_conn_t  *conn = private;
         kra_msg_t   *rxmsg = conn->rac_rxmsg;
+        kra_tx_t    *tx;
         void        *buffer;
         int          rc;
         
@@ -834,7 +832,7 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
 
         case RANAL_MSG_PUT_REQ:
                 if (libmsg == NULL) {           /* PUT didn't match... */
-                        lib_finalize(null, NULL, libmsg, PTL_OK);
+                        lib_finalize(nal, NULL, libmsg, PTL_OK);
                         return PTL_OK;
                 }
                 
@@ -853,9 +851,10 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
                 tx->tx_msg.ram_u.putack.rapam_src_cookie = 
                         conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
                 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
-                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_addr = tx->tx_buffer;
-                tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_nob = mlen;
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = 
+                        (__u64)((unsigned long)tx->tx_buffer);
+                tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
 
                 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
 
@@ -913,7 +912,7 @@ kranal_check_conn (kra_conn_t *conn)
         unsigned long      now = jiffies;
 
         if (!conn->rac_closing &&
-            time_after_eq(now, conn->rac_last_sent + conn->rac_keepalive * HZ)) {
+            time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
                 /* not sent in a while; schedule conn so scheduler sends a keepalive */
                 kranal_schedule_conn(conn);
         }
@@ -923,7 +922,7 @@ kranal_check_conn (kra_conn_t *conn)
 
         if (!conn->rac_close_recvd &&
             time_after_eq(now, conn->rac_last_rx + timeout)) {
-                CERROR("Nothing received from "LPX64" within %d seconds\n",
+                CERROR("Nothing received from "LPX64" within %lu seconds\n",
                        conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
                 return -ETIMEDOUT;
         }
@@ -942,8 +941,8 @@ kranal_check_conn (kra_conn_t *conn)
                 
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CERROR("tx on fmaq for "LPX64" blocked %d seconds\n",
-                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
+                               conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
                         return -ETIMEDOUT;
                 }
         }
@@ -953,8 +952,8 @@ kranal_check_conn (kra_conn_t *conn)
                 
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CERROR("tx on rdmaq for "LPX64" blocked %d seconds\n",
-                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
+                               conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
                         return -ETIMEDOUT;
                 }
         }
@@ -964,8 +963,8 @@ kranal_check_conn (kra_conn_t *conn)
                 
                 if (time_after_eq(now, tx->tx_qtime + timeout)) {
                         spin_unlock_irqrestore(&conn->rac_lock, flags);
-                        CERROR("tx on replyq for "LPX64" blocked %d seconds\n",
-                               conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
+                        CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
+                               conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
                         return -ETIMEDOUT;
                 }
         }
@@ -980,6 +979,8 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp)
         struct list_head  *conns = &kranal_data.kra_conns[idx];
         struct list_head  *ctmp;
         kra_conn_t        *conn;
+        unsigned long      flags;
+        int                rc;
 
  again:
         /* NB. We expect to check all the conns and not find any problems, so
@@ -987,7 +988,7 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp)
         read_lock(&kranal_data.kra_global_lock);
 
         list_for_each (ctmp, conns) {
-                conn = list_entry(ptmp, kra_conn_t, rac_hashlist);
+                conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
 
                 if (conn->rac_timeout < *min_timeoutp )
                         *min_timeoutp = conn->rac_timeout;
@@ -1004,13 +1005,15 @@ kranal_check_conns (int idx, unsigned long *min_timeoutp)
                 CERROR("Check on conn to "LPX64"failed: %d\n",
                        conn->rac_peer->rap_nid, rc);
 
-                write_lock_irqsave(&kranal_data.kra_global_lock);
+                write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
                 if (!conn->rac_closing)
                         kranal_close_conn_locked(conn, -ETIMEDOUT);
                 else
                         kranal_terminate_conn_locked(conn);
                         
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+
                 kranal_conn_decref(conn);
 
                 /* start again now I've dropped the lock */
@@ -1048,7 +1051,7 @@ kranal_connd (void *arg)
                         spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
                         kranal_connect(peer);
-                        kranal_put_peer(peer);
+                        kranal_peer_decref(peer);
 
                         spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
                        continue;
@@ -1095,7 +1098,6 @@ kranal_reaper (void *arg)
         unsigned long      flags;
         kra_conn_t        *conn;
         kra_peer_t        *peer;
-        unsigned long      flags;
         long               timeout;
         int                i;
         int                conn_entries = kranal_data.kra_conn_hash_size;
@@ -1197,7 +1199,6 @@ kranal_reaper (void *arg)
 
                 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
 
-                busy_loops = 0;
                 schedule_timeout(timeout);
 
                 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
@@ -1230,12 +1231,12 @@ kranal_process_rdmaq (__u32 cqid)
         spin_lock_irqsave(&conn->rac_lock, flags);
 
         LASSERT (!list_empty(&conn->rac_rdmaq));
-        tx = list_entry(con->rac_rdmaq.next, kra_tx_t, tx_list);
+        tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
         list_del(&tx->tx_list);
 
         LASSERT(desc->AppPtr == (void *)tx);
-        LASSERT(desc->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
-                desc->tx_msg.ram_type == RANAL_MSG_GET_DONE);
+        LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
+                tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
 
         list_add_tail(&tx->tx_list, &conn->rac_fmaq);
         tx->tx_qtime = jiffies;
@@ -1252,26 +1253,30 @@ int
 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
                void *immediate, int immediatenob)
 {
-        int   sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
-
+        int        sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
+        RAP_RETURN rrc;
+        
         LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
         LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
                  immediatenob <= RANAL_FMA_MAX_DATA_LEN :
                  immediatenob == 0);
 
-        msg->ram_incarnation = conn->rac_incarnation;
+        msg->ram_incarnation = conn->rac_my_incarnation;
         msg->ram_seq = conn->rac_tx_seq;
 
         if (sync)
-                rrc = RapkFmaSyncSend(conn->rac_device.rad_handle,
+                rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
                                       immediate, immediatenob,
                                       msg, sizeof(*msg));
         else
-                rrc = RapkFmaSend(conn->rac_device.rad_handle,
+                rrc = RapkFmaSend(conn->rac_device->rad_handle,
                                   immediate, immediatenob,
                                   msg, sizeof(*msg));
 
         switch (rrc) {
+        default:
+                LBUG();
+
         case RAP_SUCCESS:
                 conn->rac_last_tx = jiffies;
                 conn->rac_tx_seq++;
@@ -1279,9 +1284,6 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
                 
         case RAP_NOT_DONE:
                 return -EAGAIN;
-
-        default:
-                LBUG();
         }
 }
 
@@ -1323,7 +1325,8 @@ kranal_process_fmaq (kra_conn_t *conn)
 
                 spin_unlock_irqrestore(&conn->rac_lock, flags);
 
-                if (time_after_eq(conn->rac_last_tx + conn->rac_keepalive)) {
+                if (time_after_eq(jiffies, 
+                                  conn->rac_last_tx + conn->rac_keepalive)) {
                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                 }
@@ -1367,7 +1370,8 @@ kranal_process_fmaq (kra_conn_t *conn)
                 kranal_map_buffer(tx);
                 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
                 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
-                tx->tx_msg.ram_u.get.ragm_desc.rard_addr = tx->tx_buffer;
+                tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = 
+                        (__u64)((unsigned long)tx->tx_buffer);
                 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
                 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
                 expect_reply = 1;
@@ -1404,7 +1408,7 @@ kranal_swab_rdma_desc (kra_rdma_desc_t *d)
         __swab16s(&d->rard_key.Cookie);
         __swab16s(&d->rard_key.MdHandle);
         __swab32s(&d->rard_key.Flags);
-        __swab64s(&d->rard_addr);
+        __swab64s(&d->rard_addr.AddressBits);
         __swab32s(&d->rard_nob);
 }
 
@@ -1440,8 +1444,10 @@ kranal_process_receives(kra_conn_t *conn)
         unsigned long flags;
         __u32         seq;
         __u32         nob;
+        kra_tx_t     *tx;
         kra_msg_t    *msg;
-        RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &msg);
+        void         *prefix;
+        RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
         kra_peer_t   *peer = conn->rac_peer;
 
         if (rrc == RAP_NOT_DONE)
@@ -1449,7 +1455,8 @@ kranal_process_receives(kra_conn_t *conn)
         
         LASSERT (rrc == RAP_SUCCESS);
         conn->rac_last_rx = jiffies;
-        seq = conn->rac_seq++;
+        seq = conn->rac_rx_seq++;
+        msg = (kra_msg_t *)prefix;
 
         if (msg->ram_magic != RANAL_MSG_MAGIC) {
                 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
@@ -1492,10 +1499,10 @@ kranal_process_receives(kra_conn_t *conn)
                 goto out;
         }
         
-        if (msg->ram_incarnation != conn->rac_incarnation) {
+        if (msg->ram_incarnation != conn->rac_peer_incarnation) {
                 CERROR("Unexpected incarnation "LPX64"("LPX64
                        " expected) from "LPX64"\n",
-                       msg->ram_incarnation, conn->rac_incarnation,
+                       msg->ram_incarnation, conn->rac_peer_incarnation,
                        peer->rap_nid);
                 goto out;
         }
@@ -1514,13 +1521,14 @@ kranal_process_receives(kra_conn_t *conn)
  
         if (msg->ram_type == RANAL_MSG_CLOSE) {
                 conn->rac_close_recvd = 1;
-                write_lock_irqsave(&kranal_data.kra_global_lock);
+                write_lock_irqsave(&kranal_data.kra_global_lock, flags);
 
                 if (!conn->rac_closing)
                         kranal_close_conn_locked(conn, -ETIMEDOUT);
                 else if (conn->rac_close_sent)
                         kranal_terminate_conn_locked(conn);
-                
+
+                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
                 goto out;
         }
 
@@ -1548,7 +1556,8 @@ kranal_process_receives(kra_conn_t *conn)
                 if (tx == NULL)
                         break;
                 
-                tx->tx_msg.ram_u.racm_cookie = msg->msg_u.putreq.raprm_cookie;
+                tx->tx_msg.ram_u.completion.racm_cookie = 
+                        msg->ram_u.putreq.raprm_cookie;
                 kranal_post_fma(conn, tx);
                 break;
 
@@ -1571,7 +1580,7 @@ kranal_process_receives(kra_conn_t *conn)
 
                 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
                             &msg->ram_u.putack.rapam_desc, 
-                            msg->msg_u.putack.rapam_desc.rard_nob,
+                            msg->ram_u.putack.rapam_desc.rard_nob,
                             msg->ram_u.putack.rapam_dst_cookie);
                 break;
 
@@ -1587,7 +1596,7 @@ kranal_process_receives(kra_conn_t *conn)
                 break;
 
         case RANAL_MSG_GET_REQ:
-                lib_parse(&kranal_lib, &msg->ram_u.getreq.ragm_hdr, conn);
+                lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
                 
                 if (conn->rac_rxmsg == NULL)    /* lib_parse matched something */
                         break;
@@ -1596,7 +1605,7 @@ kranal_process_receives(kra_conn_t *conn)
                 if (tx == NULL)
                         break;
 
-                tx->tx_msg.ram_u.racm_cookie = msg->msg_u.getreq.ragm_cookie;
+                tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
                 kranal_post_fma(conn, tx);
                 break;
                 
@@ -1624,7 +1633,7 @@ kranal_process_receives(kra_conn_t *conn)
         }
 
  out:
-        if (conn->rac_msg != NULL)
+        if (conn->rac_rxmsg != NULL)
                 kranal_consume_rxmsg(conn, NULL, 0);
 
         return 1;
@@ -1638,13 +1647,16 @@ kranal_scheduler (void *arg)
         char            name[16];
         kra_conn_t     *conn;
         unsigned long   flags;
+        RAP_RETURN      rrc;
         int             rc;
+        int             resched;
         int             i;
         __u32           cqid;
+        __u32           event_type;
         int             did_something;
         int             busy_loops = 0;
 
-        snprintf(name, sizeof(name), "kranal_sd_%02ld", dev->rad_idx);
+        snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
         kportal_daemonize(name);
         kportal_blockallsigs();
 
index 00a0c4b..9c1537b 100644 (file)
@@ -676,7 +676,7 @@ jt_ptl_print_peers (int argc, char **argv)
         int                      index;
         int                      rc;
 
-        if (!g_nal_is_compatible (argv[0], SOCKNAL, OPENIBNAL, IIBNAL, 0))
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, OPENIBNAL, IIBNAL, RANAL, 0))
                 return -1;
 
         for (index = 0;;index++) {
@@ -832,7 +832,7 @@ jt_ptl_print_connections (int argc, char **argv)
         int                      index;
         int                      rc;
 
-        if (!g_nal_is_compatible (argv[0], SOCKNAL, OPENIBNAL, IIBNAL, 0))
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, OPENIBNAL, IIBNAL, RANAL, 0))
                 return -1;
 
         for (index = 0;;index++) {
@@ -1023,7 +1023,7 @@ int jt_ptl_disconnect(int argc, char **argv)
                 return 0;
         }
 
-        if (!g_nal_is_compatible (NULL, SOCKNAL, OPENIBNAL, IIBNAL, 0))
+        if (!g_nal_is_compatible (NULL, SOCKNAL, OPENIBNAL, IIBNAL, RANAL, 0))
                 return 0;
 
         if (argc >= 2 &&