Whamcloud - gitweb
Branch HEAD
authorliangzhen <liangzhen>
Mon, 19 Jan 2009 08:57:06 +0000 (08:57 +0000)
committerliangzhen <liangzhen>
Mon, 19 Jan 2009 08:57:06 +0000 (08:57 +0000)
Socklnd protocol V3:
. dedicated connection for emergency message (ZC-ACK)
. keepalive ping

b=14634
i=isaac
i=maxim

14 files changed:
lnet/ChangeLog
lnet/include/lnet/socklnd.h
lnet/klnds/socklnd/Makefile.in
lnet/klnds/socklnd/autoMakefile.am
lnet/klnds/socklnd/socklnd.c
lnet/klnds/socklnd/socklnd.h
lnet/klnds/socklnd/socklnd_cb.c
lnet/klnds/socklnd/socklnd_lib-linux.c
lnet/klnds/socklnd/socklnd_lib-winnt.c
lnet/klnds/socklnd/socklnd_modparams.c
lnet/klnds/socklnd/socklnd_proto.c [new file with mode: 0644]
lnet/ulnds/socklnd/conn.c
lnet/ulnds/socklnd/handlers.c
lnet/ulnds/socklnd/usocklnd.h

index dd608fd..6a05cf6 100644 (file)
@@ -18,6 +18,18 @@ Description:
 Details    : 
 
 Severity   : major
 Details    : 
 
 Severity   : major
+Bugzilla   : 14634
+Description: socklnd prtocol version 3
+Details    : With current protocol V2, connections on router can be
+             blocked and can't receive any incoming messages when there is no
+             more router buffer, so ZC-ACK can't be handled (LNet message
+             can't be finalized) and will cause deadlock on router.
+             Protocol V3 has a dedicated connection for emergency messages
+             like ZC-ACK to router, messages on this dedicated connection
+             don't need any credit so will never be blocked. Also, V3 can send
+             keepalive ping in specified period for router healthy checking.
+
+Severity   : major
 Bugzilla   : 15983
 Description: workaround for OOM from o2iblnd
 Details    : OFED needs allocate big chunk of memory for QP while creating
 Bugzilla   : 15983
 Description: workaround for OOM from o2iblnd
 Details    : OFED needs allocate big chunk of memory for QP while creating
index b697da0..4510ae7 100644 (file)
@@ -50,6 +50,8 @@
 #define SOCKLND_CONN_BULK_OUT   3
 #define SOCKLND_CONN_NTYPES     4
 
 #define SOCKLND_CONN_BULK_OUT   3
 #define SOCKLND_CONN_NTYPES     4
 
+#define SOCKLND_CONN_ACK        SOCKLND_CONN_BULK_IN
+
 #include <libcfs/libcfs_pack.h>
 typedef struct {
         __u32                   kshm_magic;     /* magic number of socklnd message */
 #include <libcfs/libcfs_pack.h>
 typedef struct {
         __u32                   kshm_magic;     /* magic number of socklnd message */
@@ -79,20 +81,28 @@ typedef struct {
 typedef struct {
         __u32                   ksm_type;       /* type of socklnd message */
         __u32                   ksm_csum;       /* checksum if != 0 */
 typedef struct {
         __u32                   ksm_type;       /* type of socklnd message */
         __u32                   ksm_csum;       /* checksum if != 0 */
-        __u64                   ksm_zc_req_cookie; /* ack required if != 0 */
-        __u64                   ksm_zc_ack_cookie; /* ack if != 0 */
+        __u64                   ksm_zc_cookies[2]; /* Zero-Copy request/ACK cookie */
         union {
                 ksock_lnet_msg_t lnetmsg;       /* lnet message, it's empty if it's NOOP */
         } WIRE_ATTR ksm_u;
 } WIRE_ATTR ksock_msg_t;
 
         union {
                 ksock_lnet_msg_t lnetmsg;       /* lnet message, it's empty if it's NOOP */
         } WIRE_ATTR ksm_u;
 } WIRE_ATTR ksock_msg_t;
 
+static inline void
+socklnd_init_msg(ksock_msg_t *msg, int type)
+{
+        msg->ksm_csum           = 0;
+        msg->ksm_type           = type;
+        msg->ksm_zc_cookies[0]  = msg->ksm_zc_cookies[1]  = 0;
+}
+
 #include <libcfs/libcfs_unpack.h>
 
 #include <libcfs/libcfs_unpack.h>
 
-#define KSOCK_MSG_NOOP          0xc0            /* ksm_u empty */ 
+#define KSOCK_MSG_NOOP          0xc0            /* ksm_u empty */
 #define KSOCK_MSG_LNET          0xc1            /* lnet msg */
 
 /* We need to know this number to parse hello msg from ksocklnd in
 #define KSOCK_MSG_LNET          0xc1            /* lnet msg */
 
 /* We need to know this number to parse hello msg from ksocklnd in
- * other LND (usocklnd, for example) */ 
+ * other LND (usocklnd, for example) */
 #define KSOCK_PROTO_V2          2
 #define KSOCK_PROTO_V2          2
+#define KSOCK_PROTO_V3          3
 
 #endif
 
 #endif
index 3a6c3f7..e517d75 100644 (file)
@@ -1,5 +1,5 @@
 MODULES := ksocklnd
 
 MODULES := ksocklnd
 
-ksocklnd-objs := socklnd.o socklnd_cb.o socklnd_modparams.o socklnd_lib-linux.o
+ksocklnd-objs := socklnd.o socklnd_cb.o socklnd_proto.o socklnd_modparams.o socklnd_lib-linux.o
 
 @INCLUDE_RULES@
 
 @INCLUDE_RULES@
index 0dbe697..d2799c9 100644 (file)
@@ -12,7 +12,7 @@ if DARWIN
 
   macos_PROGRAMS := ksocklnd
 
 
   macos_PROGRAMS := ksocklnd
 
-  nodist_ksocklnd_SOURCES := socklnd.c socklnd_cb.c socklnd_modparams.c socklnd_lib-darwin.c
+  nodist_ksocklnd_SOURCES := socklnd.c socklnd_cb.c socklnd_proto.c socklnd_modparams.c socklnd_lib-darwin.c
   DIST_SOURCES += socklnd_lib-darwin.c socklnd_lib-darwin.h
 
   ksocklnd_CFLAGS := $(EXTRA_KCFLAGS)
   DIST_SOURCES += socklnd_lib-darwin.c socklnd_lib-darwin.h
 
   ksocklnd_CFLAGS := $(EXTRA_KCFLAGS)
index 5201846..47f28ce 100644 (file)
@@ -120,8 +120,9 @@ ksocknal_create_peer (ksock_peer_t **peerp, lnet_ni_t *ni, lnet_process_id_t id)
         cfs_atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
         peer->ksnp_closing = 0;
         peer->ksnp_accepting = 0;
         cfs_atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
         peer->ksnp_closing = 0;
         peer->ksnp_accepting = 0;
-        peer->ksnp_zc_next_cookie = 1;
         peer->ksnp_proto = NULL;
         peer->ksnp_proto = NULL;
+        peer->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
+
         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
@@ -1034,6 +1035,7 @@ ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
         ksock_hello_msg_t *hello;
         unsigned int       irq;
         ksock_tx_t        *tx;
         ksock_hello_msg_t *hello;
         unsigned int       irq;
         ksock_tx_t        *tx;
+        ksock_tx_t        *txtmp;
         int                rc;
         int                active;
         char              *warn = NULL;
         int                rc;
         int                active;
         char              *warn = NULL;
@@ -1062,14 +1064,13 @@ ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
         ksocknal_lib_save_callback(sock, conn);
         cfs_atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
 
         ksocknal_lib_save_callback(sock, conn);
         cfs_atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
 
-        conn->ksnc_zc_capable = ksocknal_lib_zc_capable(sock);
         conn->ksnc_rx_ready = 0;
         conn->ksnc_rx_scheduled = 0;
 
         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
         conn->ksnc_tx_ready = 0;
         conn->ksnc_tx_scheduled = 0;
         conn->ksnc_rx_ready = 0;
         conn->ksnc_rx_scheduled = 0;
 
         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
         conn->ksnc_tx_ready = 0;
         conn->ksnc_tx_scheduled = 0;
-        conn->ksnc_tx_mono = NULL;
+        conn->ksnc_tx_carrier = NULL;
         cfs_atomic_set (&conn->ksnc_tx_nob, 0);
 
         LIBCFS_ALLOC(hello, offsetof(ksock_hello_msg_t,
         cfs_atomic_set (&conn->ksnc_tx_nob, 0);
 
         LIBCFS_ALLOC(hello, offsetof(ksock_hello_msg_t,
@@ -1102,10 +1103,12 @@ ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
                 cfs_write_unlock_bh(global_lock);
 
                 if (conn->ksnc_proto == NULL) {
                 cfs_write_unlock_bh(global_lock);
 
                 if (conn->ksnc_proto == NULL) {
-                        conn->ksnc_proto = &ksocknal_protocol_v2x;
+                         conn->ksnc_proto = &ksocknal_protocol_v3x;
 #if SOCKNAL_VERSION_DEBUG
 #if SOCKNAL_VERSION_DEBUG
-                        if (*ksocknal_tunables.ksnd_protocol != 2)
-                                conn->ksnc_proto = &ksocknal_protocol_v1x;
+                         if (*ksocknal_tunables.ksnd_protocol == 2)
+                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
+                         else if (*ksocknal_tunables.ksnd_protocol == 1)
+                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
 #endif
                 }
 
 #endif
                 }
 
@@ -1260,12 +1263,14 @@ ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
 
         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
         peer->ksnp_last_alive = cfs_time_current();
 
         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
         peer->ksnp_last_alive = cfs_time_current();
+        peer->ksnp_send_keepalive = 0;
         peer->ksnp_error = 0;
 
         sched = ksocknal_choose_scheduler_locked (irq);
         sched->kss_nconns++;
         conn->ksnc_scheduler = sched;
 
         peer->ksnp_error = 0;
 
         sched = ksocknal_choose_scheduler_locked (irq);
         sched->kss_nconns++;
         conn->ksnc_scheduler = sched;
 
+        conn->ksnc_tx_last_post = cfs_time_current();
         /* Set the deadline for the outgoing HELLO to drain */
         conn->ksnc_tx_bufnob = libcfs_sock_wmem_queued(sock);
         conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
         /* Set the deadline for the outgoing HELLO to drain */
         conn->ksnc_tx_bufnob = libcfs_sock_wmem_queued(sock);
         conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
@@ -1276,12 +1281,12 @@ ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
 
         ksocknal_new_packet(conn, 0);
 
 
         ksocknal_new_packet(conn, 0);
 
-        /* Take all the packets blocking for a connection.
-         * NB, it might be nicer to share these blocked packets among any
-         * other connections that are becoming established. */
-        while (!list_empty (&peer->ksnp_tx_queue)) {
-                tx = list_entry (peer->ksnp_tx_queue.next,
-                                 ksock_tx_t, tx_list);
+        conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
+
+        /* Take packets blocking for this connection. */
+        list_for_each_entry_safe(tx, txtmp, &peer->ksnp_tx_queue, tx_list) {
+                if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) == SOCKNAL_MATCH_NO)
+                                continue;
 
                 list_del (&tx->tx_list);
                 ksocknal_queue_tx_locked (tx, conn);
 
                 list_del (&tx->tx_list);
                 ksocknal_queue_tx_locked (tx, conn);
@@ -1450,6 +1455,21 @@ ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
         if (list_empty (&peer->ksnp_conns)) {
                 /* No more connections to this peer */
 
         if (list_empty (&peer->ksnp_conns)) {
                 /* No more connections to this peer */
 
+                if (!list_empty(&peer->ksnp_tx_queue)) {
+                        ksock_tx_t *tx;
+
+                        LASSERT (conn->ksnc_proto == &ksocknal_protocol_v3x);
+
+                        /* throw them to the last connection...,
+                         * these TXs will be send to /dev/null by scheduler */
+                        list_for_each_entry(tx, &peer->ksnp_tx_queue, tx_list)
+                                ksocknal_tx_prep(conn, tx);
+
+                        spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
+                        list_splice_init(&peer->ksnp_tx_queue, &conn->ksnc_tx_queue);
+                        spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
+                }
+
                 peer->ksnp_proto = NULL;        /* renegotiate protocol version */
                 peer->ksnp_error = error;       /* stash last conn close reason */
 
                 peer->ksnp_proto = NULL;        /* renegotiate protocol version */
                 peer->ksnp_error = error;       /* stash last conn close reason */
 
@@ -1516,9 +1536,9 @@ ksocknal_finalize_zcreq(ksock_conn_t *conn)
                 if (tx->tx_conn != conn)
                         continue;
 
                 if (tx->tx_conn != conn)
                         continue;
 
-                LASSERT (tx->tx_msg.ksm_zc_req_cookie != 0);
+                LASSERT (tx->tx_msg.ksm_zc_cookies[0] != 0);
 
 
-                tx->tx_msg.ksm_zc_req_cookie = 0;
+                tx->tx_msg.ksm_zc_cookies[0] = 0;
                 list_del(&tx->tx_zc_list);
                 list_add(&tx->tx_zc_list, &zlist);
         }
                 list_del(&tx->tx_zc_list);
                 list_add(&tx->tx_zc_list, &zlist);
         }
@@ -2574,7 +2594,8 @@ ksocknal_module_init (void)
         int    rc;
 
         /* check ksnr_connected/connecting field large enough */
         int    rc;
 
         /* check ksnr_connected/connecting field large enough */
-        CLASSERT(SOCKLND_CONN_NTYPES <= 4);
+        CLASSERT (SOCKLND_CONN_NTYPES <= 4);
+        CLASSERT (SOCKLND_CONN_ACK == SOCKLND_CONN_BULK_IN);
 
         /* initialize the_ksocklnd */
         the_ksocklnd.lnd_type     = SOCKLND;
 
         /* initialize the_ksocklnd */
         the_ksocklnd.lnd_type     = SOCKLND;
@@ -2596,7 +2617,7 @@ ksocknal_module_init (void)
 }
 
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
 }
 
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
-MODULE_DESCRIPTION("Kernel TCP Socket LND v2.0.0");
+MODULE_DESCRIPTION("Kernel TCP Socket LND v3.0.0");
 MODULE_LICENSE("GPL");
 
 MODULE_LICENSE("GPL");
 
-cfs_module(ksocknal, "2.0.0", ksocknal_module_init, ksocknal_module_fini);
+cfs_module(ksocknal, "3.0.0", ksocknal_module_init, ksocknal_module_fini);
index b4ee146..6354f62 100644 (file)
@@ -52,8 +52,6 @@
 #define SOCKNAL_RESCHED         100             /* # scheduler loops before reschedule */
 #define SOCKNAL_ENOMEM_RETRY    CFS_TICK        /* jiffies between retries */
 
 #define SOCKNAL_RESCHED         100             /* # scheduler loops before reschedule */
 #define SOCKNAL_ENOMEM_RETRY    CFS_TICK        /* jiffies between retries */
 
-#define SOCKNAL_ROUND_ROBIN     0               /* round robin / load balance */
-
 #define SOCKNAL_SINGLE_FRAG_TX      0           /* disable multi-fragment sends */
 #define SOCKNAL_SINGLE_FRAG_RX      0           /* disable multi-fragment receives */
 
 #define SOCKNAL_SINGLE_FRAG_TX      0           /* disable multi-fragment sends */
 #define SOCKNAL_SINGLE_FRAG_RX      0           /* disable multi-fragment receives */
 
@@ -81,7 +79,6 @@ typedef struct                                  /* per scheduler state */
 #if !SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_SINGLE_FRAG_RX
         struct iovec      kss_scratch_iov[LNET_MAX_IOV];
 #endif
 #if !SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_SINGLE_FRAG_RX
         struct iovec      kss_scratch_iov[LNET_MAX_IOV];
 #endif
-
 } ksock_sched_t;
 
 typedef struct
 } ksock_sched_t;
 
 typedef struct
@@ -112,6 +109,8 @@ typedef struct
         int              *ksnd_tx_buffer_size;  /* socket tx buffer size */
         int              *ksnd_rx_buffer_size;  /* socket rx buffer size */
         int              *ksnd_nagle;           /* enable NAGLE? */
         int              *ksnd_tx_buffer_size;  /* socket tx buffer size */
         int              *ksnd_rx_buffer_size;  /* socket rx buffer size */
         int              *ksnd_nagle;           /* enable NAGLE? */
+        int              *ksnd_round_robin;     /* round robin for multiple interfaces */
+        int              *ksnd_keepalive;       /* # secs for sending keepalive NOOP */
         int              *ksnd_keepalive_idle;  /* # idle secs before 1st probe */
         int              *ksnd_keepalive_count; /* # probes */
         int              *ksnd_keepalive_intvl; /* time between probes */
         int              *ksnd_keepalive_idle;  /* # idle secs before 1st probe */
         int              *ksnd_keepalive_count; /* # probes */
         int              *ksnd_keepalive_intvl; /* time between probes */
@@ -119,7 +118,7 @@ typedef struct
         int              *ksnd_peercredits;     /* # concurrent sends to 1 peer */
         int              *ksnd_enable_csum;     /* enable check sum */
         int              *ksnd_inject_csum_error; /* set non-zero to inject checksum error */
         int              *ksnd_peercredits;     /* # concurrent sends to 1 peer */
         int              *ksnd_enable_csum;     /* enable check sum */
         int              *ksnd_inject_csum_error; /* set non-zero to inject checksum error */
-        unsigned int     *ksnd_zc_min_frag;     /* minimum zero copy frag size */
+        unsigned int     *ksnd_zc_min_payload;  /* minimum zero copy payload size */
         int              *ksnd_zc_recv;         /* enable ZC receive (for Chelsio TOE) */
         int              *ksnd_zc_recv_min_nfrags; /* minimum # of fragments to enable ZC receive */
 #ifdef CPU_AFFINITY
         int              *ksnd_zc_recv;         /* enable ZC receive (for Chelsio TOE) */
         int              *ksnd_zc_recv_min_nfrags; /* minimum # of fragments to enable ZC receive */
 #ifdef CPU_AFFINITY
@@ -215,7 +214,9 @@ typedef struct                                  /* transmit packet */
         int                     tx_niov;        /* # packet iovec frags */
         struct iovec           *tx_iov;         /* packet iovec frags */
         int                     tx_nkiov;       /* # packet page frags */
         int                     tx_niov;        /* # packet iovec frags */
         struct iovec           *tx_iov;         /* packet iovec frags */
         int                     tx_nkiov;       /* # packet page frags */
-        unsigned int            tx_checked_zc;  /* Have I checked if I should ZC? */
+        unsigned int            tx_zc_capable:1; /* payload is large enough for ZC */
+        unsigned int            tx_zc_checked:1; /* Have I checked if I should ZC? */
+        unsigned int            tx_nonblk:1;    /* it's a non-blocking ACK */
         lnet_kiov_t            *tx_kiov;        /* packet page frags */
         struct ksock_conn      *tx_conn;        /* owning conn */
         lnet_msg_t             *tx_lnetmsg;     /* lnet message for lnet_finalize() */
         lnet_kiov_t            *tx_kiov;        /* packet page frags */
         struct ksock_conn      *tx_conn;        /* owning conn */
         lnet_msg_t             *tx_lnetmsg;     /* lnet message for lnet_finalize() */
@@ -295,14 +296,13 @@ typedef struct ksock_conn
         /* WRITER */
         struct list_head    ksnc_tx_list;       /* where I enq waiting for output space */
         struct list_head    ksnc_tx_queue;      /* packets waiting to be sent */
         /* WRITER */
         struct list_head    ksnc_tx_list;       /* where I enq waiting for output space */
         struct list_head    ksnc_tx_queue;      /* packets waiting to be sent */
-        ksock_tx_t         *ksnc_tx_mono;       /* V2.x only, next mono-packet, mono-packet is :
-                                                 * a. lnet packet without piggyback
-                                                 * b. noop ZC-ACK packet */
+        ksock_tx_t         *ksnc_tx_carrier;    /* next TX that can carry a LNet message or ZC-ACK */
         cfs_time_t          ksnc_tx_deadline;   /* when (in jiffies) tx times out */
         int                 ksnc_tx_bufnob;     /* send buffer marker */
         cfs_atomic_t        ksnc_tx_nob;        /* # bytes queued */
         int                 ksnc_tx_ready;      /* write space */
         int                 ksnc_tx_scheduled;  /* being progressed */
         cfs_time_t          ksnc_tx_deadline;   /* when (in jiffies) tx times out */
         int                 ksnc_tx_bufnob;     /* send buffer marker */
         cfs_atomic_t        ksnc_tx_nob;        /* # bytes queued */
         int                 ksnc_tx_ready;      /* write space */
         int                 ksnc_tx_scheduled;  /* being progressed */
+        cfs_time_t          ksnc_tx_last_post;  /* time stamp of the last posted TX */
 } ksock_conn_t;
 
 typedef struct ksock_route
 } ksock_conn_t;
 
 typedef struct ksock_route
@@ -324,6 +324,8 @@ typedef struct ksock_route
         int                 ksnr_conn_count;    /* # conns established by this route */
 } ksock_route_t;
 
         int                 ksnr_conn_count;    /* # conns established by this route */
 } ksock_route_t;
 
+#define SOCKNAL_KEEPALIVE_PING          1       /* cookie for keepalive ping */
+
 typedef struct ksock_peer
 {
         struct list_head    ksnp_list;          /* stash on global peer list */
 typedef struct ksock_peer
 {
         struct list_head    ksnp_list;          /* stash on global peer list */
@@ -342,6 +344,7 @@ typedef struct ksock_peer
         cfs_spinlock_t      ksnp_lock;          /* serialize, NOT safe in g_lock */
         struct list_head    ksnp_zc_req_list;   /* zero copy requests wait for ACK  */
         cfs_time_t          ksnp_last_alive;    /* when (in jiffies) I was last alive */
         cfs_spinlock_t      ksnp_lock;          /* serialize, NOT safe in g_lock */
         struct list_head    ksnp_zc_req_list;   /* zero copy requests wait for ACK  */
         cfs_time_t          ksnp_last_alive;    /* when (in jiffies) I was last alive */
+        cfs_time_t          ksnp_send_keepalive; /* time to send keepalive */
         lnet_ni_t          *ksnp_ni;            /* which network */
         int                 ksnp_n_passive_ips; /* # of... */
         __u32               ksnp_passive_ips[LNET_MAX_INTERFACES]; /* preferred local interfaces */
         lnet_ni_t          *ksnp_ni;            /* which network */
         int                 ksnp_n_passive_ips; /* # of... */
         __u32               ksnp_passive_ips[LNET_MAX_INTERFACES]; /* preferred local interfaces */
@@ -357,17 +360,31 @@ typedef struct ksock_connreq
 extern ksock_nal_data_t ksocknal_data;
 extern ksock_tunables_t ksocknal_tunables;
 
 extern ksock_nal_data_t ksocknal_data;
 extern ksock_tunables_t ksocknal_tunables;
 
+#define SOCKNAL_MATCH_NO        0               /* TX can't match type of connection */
+#define SOCKNAL_MATCH_YES       1               /* TX matches type of connection */
+#define SOCKNAL_MATCH_MAY       2               /* TX can be sent on the connection, but not preferred */
+
 typedef struct ksock_proto
 {
 typedef struct ksock_proto
 {
-        int     pro_version;                                                /* version number of protocol */
-        int     (*pro_send_hello)(ksock_conn_t *, ksock_hello_msg_t *);     /* handshake function */
-        int     (*pro_recv_hello)(ksock_conn_t *, ksock_hello_msg_t *, int);/* handshake function */
-        void    (*pro_pack)(ksock_tx_t *);                                  /* message pack */
-        void    (*pro_unpack)(ksock_msg_t *);                               /* message unpack */
+        int           pro_version;                                              /* version number of protocol */
+        int         (*pro_send_hello)(ksock_conn_t *, ksock_hello_msg_t *);     /* handshake function */
+        int         (*pro_recv_hello)(ksock_conn_t *, ksock_hello_msg_t *, int);/* handshake function */
+        void        (*pro_pack)(ksock_tx_t *);                                  /* message pack */
+        void        (*pro_unpack)(ksock_msg_t *);                               /* message unpack */
+        ksock_tx_t *(*pro_queue_tx_msg)(ksock_conn_t *, ksock_tx_t *);          /* queue tx on the connection */
+        int         (*pro_queue_tx_zcack)(ksock_conn_t *, ksock_tx_t *, __u64); /* queue ZC ack on the connection */
+        int         (*pro_handle_zcreq)(ksock_conn_t *, __u64, int);            /* handle ZC request */
+        int         (*pro_handle_zcack)(ksock_conn_t *, __u64, __u64);          /* handle ZC ACK */
+        int         (*pro_match_tx)(ksock_conn_t *, ksock_tx_t *, int);         /* msg type matches the connection type:
+                                                                                 * return value:
+                                                                                 *   return MATCH_NO  : no
+                                                                                 *   return MATCH_YES : matching type
+                                                                                 *   return MATCH_MAY : can be backup */
 } ksock_proto_t;
 
 extern ksock_proto_t ksocknal_protocol_v1x;
 extern ksock_proto_t ksocknal_protocol_v2x;
 } ksock_proto_t;
 
 extern ksock_proto_t ksocknal_protocol_v1x;
 extern ksock_proto_t ksocknal_protocol_v2x;
+extern ksock_proto_t ksocknal_protocol_v3x;
 
 #define KSOCK_PROTO_V1_MAJOR    LNET_PROTO_TCP_VERSION_MAJOR
 #define KSOCK_PROTO_V1_MINOR    LNET_PROTO_TCP_VERSION_MINOR
 
 #define KSOCK_PROTO_V1_MAJOR    LNET_PROTO_TCP_VERSION_MAJOR
 #define KSOCK_PROTO_V1_MINOR    LNET_PROTO_TCP_VERSION_MINOR
@@ -449,6 +466,7 @@ ksocknal_tx_addref (ksock_tx_t *tx)
         cfs_atomic_inc(&tx->tx_refcount);
 }
 
         cfs_atomic_inc(&tx->tx_refcount);
 }
 
+extern void ksocknal_tx_prep (ksock_conn_t *, ksock_tx_t *tx);
 extern void ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx);
 
 static inline void
 extern void ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx);
 
 static inline void
@@ -516,7 +534,15 @@ extern int  ksocknal_close_peer_conns_locked (ksock_peer_t *peer,
                                               __u32 ipaddr, int why);
 extern int ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why);
 extern int ksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr);
                                               __u32 ipaddr, int why);
 extern int ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why);
 extern int ksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr);
-
+extern ksock_conn_t *ksocknal_find_conn_locked(ksock_peer_t *peer,
+                                               ksock_tx_t *tx, int nonblk);
+
+extern int  ksocknal_launch_packet(lnet_ni_t *ni, ksock_tx_t *tx,
+                                   lnet_process_id_t id);
+extern ksock_tx_t *ksocknal_alloc_tx(int type, int size);
+extern void ksocknal_free_tx (ksock_tx_t *tx);
+extern ksock_tx_t *ksocknal_alloc_tx_noop(__u64 cookie, int nonblk);
+extern void ksocknal_next_tx_carrier(ksock_conn_t *conn);
 extern void ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn);
 extern void ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error);
 extern void ksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive);
 extern void ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn);
 extern void ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error);
 extern void ksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive);
@@ -535,7 +561,7 @@ extern int ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn,
 extern void ksocknal_read_callback(ksock_conn_t *conn);
 extern void ksocknal_write_callback(ksock_conn_t *conn);
 
 extern void ksocknal_read_callback(ksock_conn_t *conn);
 extern void ksocknal_write_callback(ksock_conn_t *conn);
 
-extern int ksocknal_lib_zc_capable(cfs_socket_t *sock);
+extern int ksocknal_lib_zc_capable(ksock_conn_t *conn);
 extern void ksocknal_lib_save_callback(cfs_socket_t *sock, ksock_conn_t *conn);
 extern void ksocknal_lib_set_callback(cfs_socket_t *sock,  ksock_conn_t *conn);
 extern void ksocknal_lib_reset_callback(cfs_socket_t *sock, ksock_conn_t *conn);
 extern void ksocknal_lib_save_callback(cfs_socket_t *sock, ksock_conn_t *conn);
 extern void ksocknal_lib_set_callback(cfs_socket_t *sock,  ksock_conn_t *conn);
 extern void ksocknal_lib_reset_callback(cfs_socket_t *sock, ksock_conn_t *conn);
index 024c306..d73314c 100644 (file)
 #include "socklnd.h"
 
 ksock_tx_t *
 #include "socklnd.h"
 
 ksock_tx_t *
-ksocknal_alloc_tx (int size)
+ksocknal_alloc_tx(int type, int size)
 {
         ksock_tx_t *tx = NULL;
 
 {
         ksock_tx_t *tx = NULL;
 
-        if (size == KSOCK_NOOP_TX_SIZE) {
+        if (type == KSOCK_MSG_NOOP) {
+                LASSERT (size == KSOCK_NOOP_TX_SIZE);
+
                 /* searching for a noop tx in free list */
                 cfs_spin_lock(&ksocknal_data.ksnd_tx_lock);
 
                 /* searching for a noop tx in free list */
                 cfs_spin_lock(&ksocknal_data.ksnd_tx_lock);
 
@@ -52,18 +54,47 @@ ksocknal_alloc_tx (int size)
                 return NULL;
 
         cfs_atomic_set(&tx->tx_refcount, 1);
                 return NULL;
 
         cfs_atomic_set(&tx->tx_refcount, 1);
-        tx->tx_desc_size = size;
+        tx->tx_zc_capable = 0;
+        tx->tx_zc_checked = 0;
+        tx->tx_desc_size  = size;
+
         cfs_atomic_inc(&ksocknal_data.ksnd_nactive_txs);
 
         return tx;
 }
 
         cfs_atomic_inc(&ksocknal_data.ksnd_nactive_txs);
 
         return tx;
 }
 
+ksock_tx_t *
+ksocknal_alloc_tx_noop(__u64 cookie, int nonblk)
+{
+        ksock_tx_t *tx;
+
+        tx = ksocknal_alloc_tx(KSOCK_MSG_NOOP, KSOCK_NOOP_TX_SIZE);
+        if (tx == NULL) {
+                CERROR("Can't allocate noop tx desc\n");
+                return NULL;
+        }
+
+        tx->tx_conn     = NULL;
+        tx->tx_lnetmsg  = NULL;
+        tx->tx_kiov     = NULL;
+        tx->tx_nkiov    = 0;
+        tx->tx_iov      = tx->tx_frags.virt.iov;
+        tx->tx_niov     = 1;
+        tx->tx_nonblk   = nonblk;
+
+        socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
+        tx->tx_msg.ksm_zc_cookies[1] = cookie;
+
+        return tx;
+}
+
+
 void
 ksocknal_free_tx (ksock_tx_t *tx)
 {
         cfs_atomic_dec(&ksocknal_data.ksnd_nactive_txs);
 
 void
 ksocknal_free_tx (ksock_tx_t *tx)
 {
         cfs_atomic_dec(&ksocknal_data.ksnd_nactive_txs);
 
-        if (tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) {
+        if (tx->tx_lnetmsg == NULL && tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) {
                 /* it's a noop tx */
                 cfs_spin_lock(&ksocknal_data.ksnd_tx_lock);
 
                 /* it's a noop tx */
                 cfs_spin_lock(&ksocknal_data.ksnd_tx_lock);
 
@@ -75,15 +106,6 @@ ksocknal_free_tx (ksock_tx_t *tx)
         }
 }
 
         }
 }
 
-void
-ksocknal_init_msg(ksock_msg_t *msg, int type)
-{
-        msg->ksm_type           = type;
-        msg->ksm_csum           = 0;
-        msg->ksm_zc_req_cookie  = 0;
-        msg->ksm_zc_ack_cookie  = 0;
-}
-
 int
 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
 int
 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
@@ -396,7 +418,7 @@ ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error)
                                 le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type),
                                 le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length),
                                 libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),
                                 le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type),
                                 le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length),
                                 libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),
-                                libcfs_nid2str(le64_to_cpu (tx->tx_lnetmsg->msg_hdr.dest_nid)));
+                                libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.dest_nid)));
                 } else if (error) {
                         CDEBUG (D_NETERROR, "Deleting noop packet\n");
                 }
                 } else if (error) {
                         CDEBUG (D_NETERROR, "Deleting noop packet\n");
                 }
@@ -413,32 +435,24 @@ ksocknal_check_zc_req(ksock_tx_t *tx)
 {
         ksock_conn_t   *conn = tx->tx_conn;
         ksock_peer_t   *peer = conn->ksnc_peer;
 {
         ksock_conn_t   *conn = tx->tx_conn;
         ksock_peer_t   *peer = conn->ksnc_peer;
-        lnet_kiov_t    *kiov = tx->tx_kiov;
-        int             nkiov = tx->tx_nkiov;
 
 
-        /* Set tx_msg.ksm_zc_req_cookie to a unique non-zero cookie and add tx
+        /* Set tx_msg.ksm_zc_cookies[0] to a unique non-zero cookie and add tx
          * to ksnp_zc_req_list if some fragment of this message should be sent
          * zero-copy.  Our peer will send an ACK containing this cookie when
          * she has received this message to tell us we can signal completion.
          * to ksnp_zc_req_list if some fragment of this message should be sent
          * zero-copy.  Our peer will send an ACK containing this cookie when
          * she has received this message to tell us we can signal completion.
-         * tx_msg.ksm_zc_req_cookie remains non-zero while tx is on
+         * tx_msg.ksm_zc_cookies[0] remains non-zero while tx is on
          * ksnp_zc_req_list. */
          * ksnp_zc_req_list. */
+        LASSERT (tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
+        LASSERT (tx->tx_zc_capable);
 
 
-        if (conn->ksnc_proto != &ksocknal_protocol_v2x ||
-            !conn->ksnc_zc_capable)
-                return;
+        tx->tx_zc_checked = 1;
 
 
-        while (nkiov > 0) {
-                if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag)
-                        break;
-                --nkiov;
-                ++kiov;
-        }
-
-        if (nkiov == 0)
+        if (conn->ksnc_proto == &ksocknal_protocol_v1x ||
+            !conn->ksnc_zc_capable)
                 return;
 
         /* assign cookie and queue tx to pending list, it will be released when
                 return;
 
         /* assign cookie and queue tx to pending list, it will be released when
-         * a matching ack is received. See ksocknal_handle_zc_ack() */
+         * a matching ack is received. See ksocknal_handle_zcack() */
 
         ksocknal_tx_addref(tx);
 
 
         ksocknal_tx_addref(tx);
 
@@ -448,27 +462,37 @@ ksocknal_check_zc_req(ksock_tx_t *tx)
         tx->tx_deadline =
                 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
 
         tx->tx_deadline =
                 cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
 
-        LASSERT (tx->tx_msg.ksm_zc_req_cookie == 0);
-        tx->tx_msg.ksm_zc_req_cookie = peer->ksnp_zc_next_cookie++;
+        LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0);
+
+        tx->tx_msg.ksm_zc_cookies[0] = peer->ksnp_zc_next_cookie++;
+
+        if (peer->ksnp_zc_next_cookie == 0)
+                peer->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
+
         list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list);
 
         cfs_spin_unlock(&peer->ksnp_lock);
 }
 
 static void
         list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list);
 
         cfs_spin_unlock(&peer->ksnp_lock);
 }
 
 static void
-ksocknal_unzc_req(ksock_tx_t *tx)
+ksocknal_uncheck_zc_req(ksock_tx_t *tx)
 {
         ksock_peer_t   *peer = tx->tx_conn->ksnc_peer;
 
 {
         ksock_peer_t   *peer = tx->tx_conn->ksnc_peer;
 
+        LASSERT (tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
+        LASSERT (tx->tx_zc_capable);
+
+        tx->tx_zc_checked = 0;
+
         cfs_spin_lock(&peer->ksnp_lock);
 
         cfs_spin_lock(&peer->ksnp_lock);
 
-        if (tx->tx_msg.ksm_zc_req_cookie == 0) {
+        if (tx->tx_msg.ksm_zc_cookies[0] == 0) {
                 /* Not waiting for an ACK */
                 cfs_spin_unlock(&peer->ksnp_lock);
                 return;
         }
 
                 /* Not waiting for an ACK */
                 cfs_spin_unlock(&peer->ksnp_lock);
                 return;
         }
 
-        tx->tx_msg.ksm_zc_req_cookie = 0;
+        tx->tx_msg.ksm_zc_cookies[0] = 0;
         list_del(&tx->tx_zc_list);
 
         cfs_spin_unlock(&peer->ksnp_lock);
         list_del(&tx->tx_zc_list);
 
         cfs_spin_unlock(&peer->ksnp_lock);
@@ -481,10 +505,8 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         int            rc;
 
 {
         int            rc;
 
-        if (!tx->tx_checked_zc) {
-                tx->tx_checked_zc = 1;
+        if (tx->tx_zc_capable && !tx->tx_zc_checked)
                 ksocknal_check_zc_req(tx);
                 ksocknal_check_zc_req(tx);
-        }
 
         rc = ksocknal_transmit (conn, tx);
 
 
         rc = ksocknal_transmit (conn, tx);
 
@@ -548,7 +570,8 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                        conn->ksnc_port);
         }
 
                        conn->ksnc_port);
         }
 
-        ksocknal_unzc_req(tx);
+        if (tx->tx_zc_checked)
+                ksocknal_uncheck_zc_req(tx);
 
         /* it's not an error if conn is being closed */
         ksocknal_close_conn_and_siblings (conn,
 
         /* it's not an error if conn is being closed */
         ksocknal_close_conn_and_siblings (conn,
@@ -580,123 +603,70 @@ ksocknal_launch_connection_locked (ksock_route_t *route)
 }
 
 ksock_conn_t *
 }
 
 ksock_conn_t *
-ksocknal_find_conn_locked (int payload_nob, ksock_peer_t *peer)
+ksocknal_find_conn_locked(ksock_peer_t *peer, ksock_tx_t *tx, int nonblk)
 {
         struct list_head *tmp;
 {
         struct list_head *tmp;
+        ksock_conn_t     *conn;
         ksock_conn_t     *typed = NULL;
         ksock_conn_t     *typed = NULL;
-        int               tnob  = 0;
         ksock_conn_t     *fallback = NULL;
         ksock_conn_t     *fallback = NULL;
+        int               tnob     = 0;
         int               fnob     = 0;
         int               fnob     = 0;
-        ksock_conn_t     *conn;
 
         list_for_each (tmp, &peer->ksnp_conns) {
 
         list_for_each (tmp, &peer->ksnp_conns) {
-                ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
-                int           hdr_nob = 0;
-#if SOCKNAL_ROUND_ROBIN
-                const int     nob = 0;
-#else
+                ksock_conn_t *c   = list_entry(tmp, ksock_conn_t, ksnc_list);
                 int           nob = cfs_atomic_read(&c->ksnc_tx_nob) +
                 int           nob = cfs_atomic_read(&c->ksnc_tx_nob) +
-                                        libcfs_sock_wmem_queued(c->ksnc_sock);
-#endif
-                LASSERT (!c->ksnc_closing);
-                LASSERT (c->ksnc_proto != NULL);
+                                    libcfs_sock_wmem_queued(c->ksnc_sock);
+                int           rc;
 
 
-                if (fallback == NULL || nob < fnob) {
-                        fallback = c;
-                        fnob     = nob;
-                }
-
-                if (!*ksocknal_tunables.ksnd_typed_conns)
-                        continue;
+                LASSERT (!c->ksnc_closing);
+                LASSERT (c->ksnc_proto != NULL &&
+                         c->ksnc_proto->pro_match_tx != NULL);
 
 
-                if (payload_nob == 0) {
-                        /* noop packet */
-                        hdr_nob = offsetof(ksock_msg_t, ksm_u);
-                } else {
-                        /* lnet packet */
-                        hdr_nob = (c->ksnc_proto == &ksocknal_protocol_v2x)?
-                                  sizeof(ksock_msg_t) : sizeof(lnet_hdr_t);
-                }
+                rc = c->ksnc_proto->pro_match_tx(c, tx, nonblk);
 
 
-                switch (c->ksnc_type) {
+                switch (rc) {
                 default:
                 default:
-                        CERROR("ksnc_type bad: %u\n", c->ksnc_type);
                         LBUG();
                         LBUG();
-                case SOCKLND_CONN_ANY:
-                        break;
-                case SOCKLND_CONN_BULK_IN:
+                case SOCKNAL_MATCH_NO: /* protocol rejected the tx */
                         continue;
                         continue;
-                case SOCKLND_CONN_BULK_OUT:
-                        if ((hdr_nob + payload_nob) < *ksocknal_tunables.ksnd_min_bulk)
-                                continue;
-                        break;
-                case SOCKLND_CONN_CONTROL:
-                        if ((hdr_nob + payload_nob) >= *ksocknal_tunables.ksnd_min_bulk)
-                                continue;
+
+                case SOCKNAL_MATCH_YES: /* typed connection */
+                        if (typed == NULL || tnob > nob ||
+                            (tnob == nob && *ksocknal_tunables.ksnd_round_robin &&
+                             cfs_time_after(typed->ksnc_tx_last_post, c->ksnc_tx_last_post))) {
+                                typed = c;
+                                tnob  = nob;
+                        }
                         break;
                         break;
-                }
 
 
-                if (typed == NULL || nob < tnob) {
-                        typed = c;
-                        tnob  = nob;
+                case SOCKNAL_MATCH_MAY: /* fallback connection */
+                        if (fallback == NULL || fnob > nob ||
+                            (fnob == nob && *ksocknal_tunables.ksnd_round_robin &&
+                             cfs_time_after(fallback->ksnc_tx_last_post, c->ksnc_tx_last_post))) {
+                                fallback = c;
+                                fnob     = nob;
+                        }
+                        break;
                 }
         }
 
         /* prefer the typed selection */
         conn = (typed != NULL) ? typed : fallback;
 
                 }
         }
 
         /* prefer the typed selection */
         conn = (typed != NULL) ? typed : fallback;
 
-#if SOCKNAL_ROUND_ROBIN
-        if (conn != NULL) {
-                /* round-robin all else being equal */
-                list_del (&conn->ksnc_list);
-                list_add_tail (&conn->ksnc_list, &peer->ksnp_conns);
-        }
-#endif
+        if (conn != NULL)
+                conn->ksnc_tx_last_post = cfs_time_current();
+
         return conn;
 }
 
 void
         return conn;
 }
 
 void
-ksocknal_next_mono_tx(ksock_conn_t *conn)
-{
-        ksock_tx_t     *tx = conn->ksnc_tx_mono;
-
-        /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */
-        LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x);
-        LASSERT(!list_empty(&conn->ksnc_tx_queue));
-        LASSERT(tx != NULL);
-
-        if (tx->tx_list.next == &conn->ksnc_tx_queue) {
-                /* no more packets queued */
-                conn->ksnc_tx_mono = NULL;
-        } else {
-                conn->ksnc_tx_mono = list_entry(tx->tx_list.next, ksock_tx_t, tx_list);
-                LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == tx->tx_msg.ksm_type);
-        }
-}
-
-int
-ksocknal_piggyback_zcack(ksock_conn_t *conn, __u64 cookie)
+ksocknal_tx_prep(ksock_conn_t *conn, ksock_tx_t *tx)
 {
 {
-        ksock_tx_t     *tx = conn->ksnc_tx_mono;
-
-        /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */
-
-        if (tx == NULL)
-                return 0;
-
-        if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) {
-                /* tx is noop zc-ack, can't piggyback zc-ack cookie */
-                return 0;
-        }
-
-        LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET);
-        LASSERT(tx->tx_msg.ksm_zc_ack_cookie == 0);
-
-        /* piggyback the zc-ack cookie */
-        tx->tx_msg.ksm_zc_ack_cookie = cookie;
-        ksocknal_next_mono_tx(conn);
+        conn->ksnc_proto->pro_pack(tx);
 
 
-        return 1;
+        cfs_atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
+        ksocknal_conn_addref(conn); /* +1 ref for tx */
+        tx->tx_conn = conn;
 }
 
 void
 }
 
 void
@@ -704,7 +674,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 {
         ksock_sched_t *sched = conn->ksnc_scheduler;
         ksock_msg_t   *msg = &tx->tx_msg;
 {
         ksock_sched_t *sched = conn->ksnc_scheduler;
         ksock_msg_t   *msg = &tx->tx_msg;
-        ksock_tx_t    *ztx;
+        ksock_tx_t    *ztx = NULL;
         int            bufnob = 0;
 
         /* called holding global lock (read or irq-write) and caller may
         int            bufnob = 0;
 
         /* called holding global lock (read or irq-write) and caller may
@@ -713,13 +683,12 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
          * ksnc_sock... */
         LASSERT(!conn->ksnc_closing);
 
          * ksnc_sock... */
         LASSERT(!conn->ksnc_closing);
 
-        CDEBUG (D_NET, "Sending to %s ip %d.%d.%d.%d:%d\n",
+        CDEBUG (D_NET, "Sending to %s ip %d.%d.%d.%d:%d\n", 
                 libcfs_id2str(conn->ksnc_peer->ksnp_id),
                 HIPQUAD(conn->ksnc_ipaddr),
                 conn->ksnc_port);
 
                 libcfs_id2str(conn->ksnc_peer->ksnp_id),
                 HIPQUAD(conn->ksnc_ipaddr),
                 conn->ksnc_port);
 
-        tx->tx_checked_zc = 0;
-        conn->ksnc_proto->pro_pack(tx);
+        ksocknal_tx_prep(conn, tx);
 
         /* Ensure the frags we've been given EXACTLY match the number of
          * bytes we want to send.  Many TCP/IP stacks disregard any total
 
         /* Ensure the frags we've been given EXACTLY match the number of
          * bytes we want to send.  Many TCP/IP stacks disregard any total
@@ -734,14 +703,10 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
         LASSERT (tx->tx_resid == tx->tx_nob);
 
         CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n",
         LASSERT (tx->tx_resid == tx->tx_nob);
 
         CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n",
-                tx, (tx->tx_lnetmsg != NULL)? tx->tx_lnetmsg->msg_hdr.type:
-                                              KSOCK_MSG_NOOP,
+                tx, (tx->tx_lnetmsg != NULL) ? tx->tx_lnetmsg->msg_hdr.type:
+                                               KSOCK_MSG_NOOP,
                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
 
                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
 
-        cfs_atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
-        tx->tx_conn = conn;
-        ksocknal_conn_addref(conn); /* +1 ref for tx */
-
         /*
          * FIXME: SOCK_WMEM_QUEUED and SOCK_ERROR could block in __DARWIN8__
          * but they're used inside spinlocks a lot.
         /*
          * FIXME: SOCK_WMEM_QUEUED and SOCK_ERROR could block in __DARWIN8__
          * but they're used inside spinlocks a lot.
@@ -757,67 +722,29 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
                 cfs_mb(); /* order with adding to tx_queue */
         }
 
                 cfs_mb(); /* order with adding to tx_queue */
         }
 
-        ztx = NULL;
-
         if (msg->ksm_type == KSOCK_MSG_NOOP) {
                 /* The packet is noop ZC ACK, try to piggyback the ack_cookie
                  * on a normal packet so I don't need to send it */
         if (msg->ksm_type == KSOCK_MSG_NOOP) {
                 /* The packet is noop ZC ACK, try to piggyback the ack_cookie
                  * on a normal packet so I don't need to send it */
-                LASSERT(msg->ksm_zc_req_cookie == 0);
-                LASSERT(msg->ksm_zc_ack_cookie != 0);
-
-                if (conn->ksnc_tx_mono != NULL) {
-                        if (ksocknal_piggyback_zcack(conn, msg->ksm_zc_ack_cookie)) {
-                                /* zc-ack cookie is piggybacked */
-                                cfs_atomic_sub (tx->tx_nob, &conn->ksnc_tx_nob);
-                                ztx = tx;       /* Put to freelist later */
-                        } else {
-                                /* no packet can piggyback zc-ack cookie */
-                                list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
-                        }
-                } else {
-                        /* It's the first mono-packet */
-                        conn->ksnc_tx_mono = tx;
-                        list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
-                }
+                LASSERT (msg->ksm_zc_cookies[1] != 0);
+                LASSERT (conn->ksnc_proto->pro_queue_tx_zcack != NULL);
+
+                if (conn->ksnc_proto->pro_queue_tx_zcack(conn, tx, 0))
+                        ztx = tx; /* ZC ACK piggybacked on ztx release tx later */
 
         } else {
                 /* It's a normal packet - can it piggback a noop zc-ack that
                  * has been queued already? */
 
         } else {
                 /* It's a normal packet - can it piggback a noop zc-ack that
                  * has been queued already? */
-                LASSERT(msg->ksm_zc_ack_cookie == 0);
-
-                if (conn->ksnc_proto == &ksocknal_protocol_v2x &&  /* V2.x packet */
-                    conn->ksnc_tx_mono != NULL) {
-                        if (conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_NOOP) {
-                                /* There is a noop zc-ack can be piggybacked */
-                                ztx = conn->ksnc_tx_mono;
-
-                                msg->ksm_zc_ack_cookie = ztx->tx_msg.ksm_zc_ack_cookie;
-                                ksocknal_next_mono_tx(conn);
-
-                                /* use tx to replace the noop zc-ack packet, ztx will
-                                 * be put to freelist later */
-                                list_add(&tx->tx_list, &ztx->tx_list);
-                                list_del(&ztx->tx_list);
-
-                                cfs_atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob);
-                        } else {
-                                /* no noop zc-ack packet, just enqueue it */
-                                LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_LNET);
-                                list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
-                        }
+                LASSERT (msg->ksm_zc_cookies[1] == 0);
+                LASSERT (conn->ksnc_proto->pro_queue_tx_msg != NULL);
 
 
-                } else if (conn->ksnc_proto == &ksocknal_protocol_v2x) {
-                        /* it's the first mono-packet, enqueue it */
-                        conn->ksnc_tx_mono = tx;
-                        list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
-                } else {
-                        /* V1.x packet, just enqueue it */
-                        list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
-                }
+                ztx = conn->ksnc_proto->pro_queue_tx_msg(conn, tx);
+                /* ztx will be released later */
         }
 
         }
 
-        if (ztx != NULL)
+        if (ztx != NULL) {
+                cfs_atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob);
                 list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs);
                 list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs);
+        }
 
         if (conn->ksnc_tx_ready &&      /* able to send */
             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
 
         if (conn->ksnc_tx_ready &&      /* able to send */
             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
@@ -832,6 +759,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
         cfs_spin_unlock_bh (&sched->kss_lock);
 }
 
         cfs_spin_unlock_bh (&sched->kss_lock);
 }
 
+
 ksock_route_t *
 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
 {
 ksock_route_t *
 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
 {
@@ -891,17 +819,15 @@ ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id)
         int               rc;
 
         LASSERT (tx->tx_conn == NULL);
         int               rc;
 
         LASSERT (tx->tx_conn == NULL);
-        LASSERT (tx->tx_lnetmsg != NULL);
 
         g_lock = &ksocknal_data.ksnd_global_lock;
 
         for (retry = 0;; retry = 1) {
 
         g_lock = &ksocknal_data.ksnd_global_lock;
 
         for (retry = 0;; retry = 1) {
-#if !SOCKNAL_ROUND_ROBIN
                 cfs_read_lock (g_lock);
                 peer = ksocknal_find_peer_locked(ni, id);
                 if (peer != NULL) {
                         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
                 cfs_read_lock (g_lock);
                 peer = ksocknal_find_peer_locked(ni, id);
                 if (peer != NULL) {
                         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
-                                conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer);
+                                conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk);
                                 if (conn != NULL) {
                                         /* I've got no routes that need to be
                                          * connecting and I do have an actual
                                 if (conn != NULL) {
                                         /* I've got no routes that need to be
                                          * connecting and I do have an actual
@@ -915,7 +841,7 @@ ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id)
 
                 /* I'll need a write lock... */
                 cfs_read_unlock (g_lock);
 
                 /* I'll need a write lock... */
                 cfs_read_unlock (g_lock);
-#endif
+
                 cfs_write_lock_bh (g_lock);
 
                 peer = ksocknal_find_peer_locked(ni, id);
                 cfs_write_lock_bh (g_lock);
 
                 peer = ksocknal_find_peer_locked(ni, id);
@@ -954,7 +880,7 @@ ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id)
                 ksocknal_launch_connection_locked (route);
         }
 
                 ksocknal_launch_connection_locked (route);
         }
 
-        conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer);
+        conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk);
         if (conn != NULL) {
                 /* Connection exists; queue message on it */
                 ksocknal_queue_tx_locked (tx, conn);
         if (conn != NULL) {
                 /* Connection exists; queue message on it */
                 ksocknal_queue_tx_locked (tx, conn);
@@ -967,7 +893,7 @@ ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id)
                 /* the message is going to be pinned to the peer */
                 tx->tx_deadline =
                         cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
                 /* the message is going to be pinned to the peer */
                 tx->tx_deadline =
                         cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
-                
+
                 /* Queue the message until a connection is established */
                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
                 cfs_write_unlock_bh (g_lock);
                 /* Queue the message until a connection is established */
                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
                 cfs_write_unlock_bh (g_lock);
@@ -1014,7 +940,7 @@ ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 desc_size = offsetof(ksock_tx_t,
                                      tx_frags.paged.kiov[payload_niov]);
 
                 desc_size = offsetof(ksock_tx_t,
                                      tx_frags.paged.kiov[payload_niov]);
 
-        tx = ksocknal_alloc_tx(desc_size);
+        tx = ksocknal_alloc_tx(KSOCK_MSG_LNET, desc_size);
         if (tx == NULL) {
                 CERROR("Can't allocate tx desc type %d size %d\n",
                        type, desc_size);
         if (tx == NULL) {
                 CERROR("Can't allocate tx desc type %d size %d\n",
                        type, desc_size);
@@ -1039,9 +965,12 @@ ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov,
                                                  payload_niov, payload_kiov,
                                                  payload_offset, payload_nob);
                 tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov,
                                                  payload_niov, payload_kiov,
                                                  payload_offset, payload_nob);
+
+                if (payload_nob >= *ksocknal_tunables.ksnd_zc_min_payload)
+                        tx->tx_zc_capable = 1;
         }
 
         }
 
-        ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
+        socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
 
         /* The first fragment will be set later in pro_pack */
         rc = ksocknal_launch_packet(ni, tx, target);
 
         /* The first fragment will be set later in pro_pack */
         rc = ksocknal_launch_packet(ni, tx, target);
@@ -1096,6 +1025,7 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
 
                 switch (conn->ksnc_proto->pro_version) {
                 case  KSOCK_PROTO_V2:
 
                 switch (conn->ksnc_proto->pro_version) {
                 case  KSOCK_PROTO_V2:
+                case  KSOCK_PROTO_V3:
                         conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER;
                         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
                         conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg;
                         conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER;
                         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
                         conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg;
@@ -1155,99 +1085,12 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
         return (0);
 }
 
         return (0);
 }
 
-/* (Sink) handle incoming ZC request from sender */
-static int
-ksocknal_handle_zc_req(ksock_peer_t *peer, __u64 cookie)
-{
-        ksock_conn_t   *conn;
-        ksock_tx_t     *tx;
-        ksock_sched_t  *sched;
-        int             rc;
-
-        cfs_read_lock (&ksocknal_data.ksnd_global_lock);
-
-        conn = ksocknal_find_conn_locked (0, peer);
-        if (conn == NULL) {
-                cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
-                CERROR("Can't find connection to send zcack.\n");
-                return -ECONNRESET;
-        }
-
-        sched = conn->ksnc_scheduler;
-
-        cfs_spin_lock_bh (&sched->kss_lock);
-        rc = ksocknal_piggyback_zcack(conn, cookie);
-        cfs_spin_unlock_bh (&sched->kss_lock);
-
-        cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
-        if (rc) {
-                /* Ack cookie is piggybacked */
-                return 0;
-        }
-
-        tx = ksocknal_alloc_tx(KSOCK_NOOP_TX_SIZE);
-        if (tx == NULL) {
-                CERROR("Can't allocate noop tx desc\n");
-                return -ENOMEM;
-        }
-
-        tx->tx_conn     = NULL;
-        tx->tx_lnetmsg  = NULL;
-        tx->tx_kiov     = NULL;
-        tx->tx_nkiov    = 0;
-        tx->tx_iov      = tx->tx_frags.virt.iov;
-        tx->tx_niov     = 1;
-
-        ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
-        tx->tx_msg.ksm_zc_ack_cookie = cookie; /* incoming cookie */
-
-        cfs_read_lock (&ksocknal_data.ksnd_global_lock);
-
-        conn = ksocknal_find_conn_locked (0, peer);
-        if (conn == NULL) {
-                cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
-                ksocknal_free_tx(tx);
-                CERROR("Can't find connection to send zcack.\n");
-                return -ECONNRESET;
-        }
-        ksocknal_queue_tx_locked(tx, conn);
-
-        cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
-
-        return 0;
-}
-
-/* (Sender) handle ZC_ACK from sink */
-static int
-ksocknal_handle_zc_ack(ksock_peer_t *peer, __u64 cookie)
-{
-        ksock_tx_t             *tx;
-        struct list_head       *ctmp;
-
-        cfs_spin_lock(&peer->ksnp_lock);
-
-        list_for_each(ctmp, &peer->ksnp_zc_req_list) {
-                tx = list_entry (ctmp, ksock_tx_t, tx_zc_list);
-                if (tx->tx_msg.ksm_zc_req_cookie != cookie)
-                        continue;
-
-                tx->tx_msg.ksm_zc_req_cookie = 0;
-                list_del(&tx->tx_zc_list);
-
-                cfs_spin_unlock(&peer->ksnp_lock);
-
-                ksocknal_tx_decref(tx);
-                return 0;
-        }
-        cfs_spin_unlock(&peer->ksnp_lock);
-
-        return -EPROTO;
-}
-
 int
 ksocknal_process_receive (ksock_conn_t *conn)
 {
 int
 ksocknal_process_receive (ksock_conn_t *conn)
 {
-        int           rc;
+        lnet_hdr_t        *lhdr;
+        lnet_process_id_t *id;
+        int                rc;
 
         LASSERT (cfs_atomic_read(&conn->ksnc_conn_refcount) > 0);
 
 
         LASSERT (cfs_atomic_read(&conn->ksnc_conn_refcount) > 0);
 
@@ -1294,8 +1137,8 @@ ksocknal_process_receive (ksock_conn_t *conn)
                 if (conn->ksnc_flip) {
                         __swab32s(&conn->ksnc_msg.ksm_type);
                         __swab32s(&conn->ksnc_msg.ksm_csum);
                 if (conn->ksnc_flip) {
                         __swab32s(&conn->ksnc_msg.ksm_type);
                         __swab32s(&conn->ksnc_msg.ksm_csum);
-                        __swab64s(&conn->ksnc_msg.ksm_zc_req_cookie);
-                        __swab64s(&conn->ksnc_msg.ksm_zc_ack_cookie);
+                        __swab64s(&conn->ksnc_msg.ksm_zc_cookies[0]);
+                        __swab64s(&conn->ksnc_msg.ksm_zc_cookies[1]);
                 }
 
                 if (conn->ksnc_msg.ksm_type != KSOCK_MSG_NOOP &&
                 }
 
                 if (conn->ksnc_msg.ksm_type != KSOCK_MSG_NOOP &&
@@ -1320,15 +1163,21 @@ ksocknal_process_receive (ksock_conn_t *conn)
                         return (-EIO);
                 }
 
                         return (-EIO);
                 }
 
-                if (conn->ksnc_msg.ksm_zc_ack_cookie != 0) {
-                        LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x);
+                if (conn->ksnc_msg.ksm_zc_cookies[1] != 0) {
+                        __u64 cookie = 0;
+
+                        LASSERT (conn->ksnc_proto != &ksocknal_protocol_v1x);
+
+                        if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP)
+                                cookie = conn->ksnc_msg.ksm_zc_cookies[0];
+
+                        rc = conn->ksnc_proto->pro_handle_zcack(conn, cookie,
+                                               conn->ksnc_msg.ksm_zc_cookies[1]);
 
 
-                        rc = ksocknal_handle_zc_ack(conn->ksnc_peer,
-                                                    conn->ksnc_msg.ksm_zc_ack_cookie);
                         if (rc != 0) {
                         if (rc != 0) {
-                                CERROR("%s: Unknown zero copy ACK cookie: "LPU64"\n",
+                                CERROR("%s: Unknown ZC-ACK cookie: "LPU64", "LPU64"\n",
                                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
                                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
-                                       conn->ksnc_msg.ksm_zc_ack_cookie);
+                                       cookie, conn->ksnc_msg.ksm_zc_cookies[1]);
                                 ksocknal_new_packet(conn, 0);
                                 ksocknal_close_conn_and_siblings(conn, -EPROTO);
                                 return (rc);
                                 ksocknal_new_packet(conn, 0);
                                 ksocknal_close_conn_and_siblings(conn, -EPROTO);
                                 return (rc);
@@ -1360,8 +1209,8 @@ ksocknal_process_receive (ksock_conn_t *conn)
 
                 if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) {
                         /* Userspace peer */
 
                 if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) {
                         /* Userspace peer */
-                        lnet_process_id_t *id = &conn->ksnc_peer->ksnp_id;
-                        lnet_hdr_t        *lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
+                        lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
+                        id   = &conn->ksnc_peer->ksnp_id;
 
                         /* Substitute process ID assigned at connection time */
                         lhdr->src_pid = cpu_to_le32(id->pid);
 
                         /* Substitute process ID assigned at connection time */
                         lhdr->src_pid = cpu_to_le32(id->pid);
@@ -1405,14 +1254,19 @@ ksocknal_process_receive (ksock_conn_t *conn)
                         rc = -EIO;
                 }
 
                         rc = -EIO;
                 }
 
-                lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc);
+                if (rc == 0 && conn->ksnc_msg.ksm_zc_cookies[0] != 0) {
+                        LASSERT(conn->ksnc_proto != &ksocknal_protocol_v1x);
+
+                        lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
+                        id   = &conn->ksnc_peer->ksnp_id;
 
 
-                if (rc == 0 && conn->ksnc_msg.ksm_zc_req_cookie != 0) {
-                        LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x);
-                        rc = ksocknal_handle_zc_req(conn->ksnc_peer,
-                                                    conn->ksnc_msg.ksm_zc_req_cookie);
+                        rc = conn->ksnc_proto->pro_handle_zcreq(conn,
+                                               conn->ksnc_msg.ksm_zc_cookies[0],
+                                               le64_to_cpu(lhdr->src_nid) != id->nid);
                 }
 
                 }
 
+                lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc);
+
                 if (rc != 0) {
                         ksocknal_new_packet(conn, 0);
                         ksocknal_close_conn_and_siblings (conn, rc);
                 if (rc != 0) {
                         ksocknal_new_packet(conn, 0);
                         ksocknal_close_conn_and_siblings (conn, rc);
@@ -1595,8 +1449,8 @@ int ksocknal_scheduler (void *arg)
                         tx = list_entry(conn->ksnc_tx_queue.next,
                                         ksock_tx_t, tx_list);
 
                         tx = list_entry(conn->ksnc_tx_queue.next,
                                         ksock_tx_t, tx_list);
 
-                        if (conn->ksnc_tx_mono == tx)
-                                ksocknal_next_mono_tx(conn);
+                        if (conn->ksnc_tx_carrier == tx)
+                                ksocknal_next_tx_carrier(conn);
 
                         /* dequeue now so empty list => more to send */
                         list_del(&tx->tx_list);
 
                         /* dequeue now so empty list => more to send */
                         list_del(&tx->tx_list);
@@ -1732,15 +1586,29 @@ void ksocknal_write_callback (ksock_conn_t *conn)
 ksock_proto_t *
 ksocknal_parse_proto_version (ksock_hello_msg_t *hello)
 {
 ksock_proto_t *
 ksocknal_parse_proto_version (ksock_hello_msg_t *hello)
 {
-        if ((hello->kshm_magic   == LNET_PROTO_MAGIC &&
-             hello->kshm_version == KSOCK_PROTO_V2) ||
-            (hello->kshm_magic   == __swab32(LNET_PROTO_MAGIC) &&
-             hello->kshm_version == __swab32(KSOCK_PROTO_V2))) {
+        __u32   version = 0;
+
+        if (hello->kshm_magic == LNET_PROTO_MAGIC)
+                version = hello->kshm_version;
+        else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
+                version = __swab32(hello->kshm_version);
+
+        if (version != 0) {
 #if SOCKNAL_VERSION_DEBUG
 #if SOCKNAL_VERSION_DEBUG
-                if (*ksocknal_tunables.ksnd_protocol != 2)
+                if (*ksocknal_tunables.ksnd_protocol == 1)
+                        return NULL;
+
+                if (*ksocknal_tunables.ksnd_protocol == 2 &&
+                    version == KSOCK_PROTO_V3)
                         return NULL;
 #endif
                         return NULL;
 #endif
-                return &ksocknal_protocol_v2x;
+                if (version == KSOCK_PROTO_V2)
+                        return &ksocknal_protocol_v2x;
+
+                if (version == KSOCK_PROTO_V3)
+                        return &ksocknal_protocol_v3x;
+
+                return NULL;
         }
 
         if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
         }
 
         if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
@@ -1757,332 +1625,6 @@ ksocknal_parse_proto_version (ksock_hello_msg_t *hello)
         return NULL;
 }
 
         return NULL;
 }
 
-static int
-ksocknal_send_hello_v1 (ksock_conn_t *conn, ksock_hello_msg_t *hello)
-{
-        cfs_socket_t        *sock = conn->ksnc_sock;
-        lnet_hdr_t          *hdr;
-        lnet_magicversion_t *hmv;
-        int                  rc;
-        int                  i;
-
-        CLASSERT(sizeof(lnet_magicversion_t) == offsetof(lnet_hdr_t, src_nid));
-
-        LIBCFS_ALLOC(hdr, sizeof(*hdr));
-        if (hdr == NULL) {
-                CERROR("Can't allocate lnet_hdr_t\n");
-                return -ENOMEM;
-        }
-
-        hmv = (lnet_magicversion_t *)&hdr->dest_nid;
-
-        /* Re-organize V2.x message header to V1.x (lnet_hdr_t)
-         * header and send out */
-        hmv->magic         = cpu_to_le32 (LNET_PROTO_TCP_MAGIC);
-        hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR);
-        hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR);
-
-        if (the_lnet.ln_testprotocompat != 0) {
-                /* single-shot proto check */
-                LNET_LOCK();
-                if ((the_lnet.ln_testprotocompat & 1) != 0) {
-                        hmv->version_major++;   /* just different! */
-                        the_lnet.ln_testprotocompat &= ~1;
-                }
-                if ((the_lnet.ln_testprotocompat & 2) != 0) {
-                        hmv->magic = LNET_PROTO_MAGIC;
-                        the_lnet.ln_testprotocompat &= ~2;
-                }
-                LNET_UNLOCK();
-        }
-
-        hdr->src_nid        = cpu_to_le64 (hello->kshm_src_nid);
-        hdr->src_pid        = cpu_to_le32 (hello->kshm_src_pid);
-        hdr->type           = cpu_to_le32 (LNET_MSG_HELLO);
-        hdr->payload_length = cpu_to_le32 (hello->kshm_nips * sizeof(__u32));
-        hdr->msg.hello.type = cpu_to_le32 (hello->kshm_ctype);
-        hdr->msg.hello.incarnation = cpu_to_le64 (hello->kshm_src_incarnation);
-
-        rc = libcfs_sock_write(sock, hdr, sizeof(*hdr), lnet_acceptor_timeout());
-
-        if (rc != 0) {
-                CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
-                        rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
-                goto out;
-        }
-
-        if (hello->kshm_nips == 0)
-                goto out;
-
-        for (i = 0; i < (int) hello->kshm_nips; i++) {
-                hello->kshm_ips[i] = __cpu_to_le32 (hello->kshm_ips[i]);
-        }
-
-        rc = libcfs_sock_write(sock, hello->kshm_ips,
-                               hello->kshm_nips * sizeof(__u32),
-                               lnet_acceptor_timeout());
-        if (rc != 0) {
-                CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)"
-                        " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips,
-                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
-        }
-out:
-        LIBCFS_FREE(hdr, sizeof(*hdr));
-
-        return rc;
-}
-
-static int
-ksocknal_send_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello)
-{
-        cfs_socket_t   *sock = conn->ksnc_sock;
-        int             rc;
-
-        hello->kshm_magic       = LNET_PROTO_MAGIC;
-        hello->kshm_version     = KSOCK_PROTO_V2;
-
-        if (the_lnet.ln_testprotocompat != 0) {
-                /* single-shot proto check */
-                LNET_LOCK();
-                if ((the_lnet.ln_testprotocompat & 1) != 0) {
-                        hello->kshm_version++;   /* just different! */
-                        the_lnet.ln_testprotocompat &= ~1;
-                }
-                LNET_UNLOCK();
-        }
-
-        rc = libcfs_sock_write(sock, hello, offsetof(ksock_hello_msg_t, kshm_ips),
-                               lnet_acceptor_timeout());
-
-        if (rc != 0) {
-                CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
-                        rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
-                return rc;
-        }
-
-        if (hello->kshm_nips == 0)
-                return 0;
-
-        rc = libcfs_sock_write(sock, hello->kshm_ips,
-                               hello->kshm_nips * sizeof(__u32),
-                               lnet_acceptor_timeout());
-        if (rc != 0) {
-                CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)"
-                        " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips,
-                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
-        }
-
-        return rc;
-}
-
-static int
-ksocknal_recv_hello_v1(ksock_conn_t *conn, ksock_hello_msg_t *hello,int timeout)
-{
-        cfs_socket_t        *sock = conn->ksnc_sock;
-        lnet_hdr_t          *hdr;
-        int                  rc;
-        int                  i;
-
-        LIBCFS_ALLOC(hdr, sizeof(*hdr));
-        if (hdr == NULL) {
-                CERROR("Can't allocate lnet_hdr_t\n");
-                return -ENOMEM;
-        }
-
-        rc = libcfs_sock_read(sock, &hdr->src_nid,
-                              sizeof (*hdr) - offsetof (lnet_hdr_t, src_nid),
-                              timeout);
-        if (rc != 0) {
-                CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n",
-                        rc, HIPQUAD(conn->ksnc_ipaddr));
-                LASSERT (rc < 0 && rc != -EALREADY);
-                goto out;
-        }
-
-        /* ...and check we got what we expected */
-        if (hdr->type != cpu_to_le32 (LNET_MSG_HELLO)) {
-                CERROR ("Expecting a HELLO hdr,"
-                        " but got type %d from %u.%u.%u.%u\n",
-                        le32_to_cpu (hdr->type),
-                        HIPQUAD(conn->ksnc_ipaddr));
-                rc = -EPROTO;
-                goto out;
-        }
-
-        hello->kshm_src_nid         = le64_to_cpu (hdr->src_nid);
-        hello->kshm_src_pid         = le32_to_cpu (hdr->src_pid);
-        hello->kshm_src_incarnation = le64_to_cpu (hdr->msg.hello.incarnation);
-        hello->kshm_ctype           = le32_to_cpu (hdr->msg.hello.type);
-        hello->kshm_nips            = le32_to_cpu (hdr->payload_length) /
-                                         sizeof (__u32);
-
-        if (hello->kshm_nips > LNET_MAX_INTERFACES) {
-                CERROR("Bad nips %d from ip %u.%u.%u.%u\n",
-                       hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr));
-                rc = -EPROTO;
-                goto out;
-        }
-
-        if (hello->kshm_nips == 0)
-                goto out;
-
-        rc = libcfs_sock_read(sock, hello->kshm_ips,
-                              hello->kshm_nips * sizeof(__u32), timeout);
-        if (rc != 0) {
-                CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n",
-                        rc, HIPQUAD(conn->ksnc_ipaddr));
-                LASSERT (rc < 0 && rc != -EALREADY);
-                goto out;
-        }
-
-        for (i = 0; i < (int) hello->kshm_nips; i++) {
-                hello->kshm_ips[i] = __le32_to_cpu(hello->kshm_ips[i]);
-
-                if (hello->kshm_ips[i] == 0) {
-                        CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n",
-                               i, HIPQUAD(conn->ksnc_ipaddr));
-                        rc = -EPROTO;
-                        break;
-                }
-        }
-out:
-        LIBCFS_FREE(hdr, sizeof(*hdr));
-
-        return rc;
-}
-
-static int
-ksocknal_recv_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello, int timeout)
-{
-        cfs_socket_t      *sock = conn->ksnc_sock;
-        int                rc;
-        int                i;
-
-        if (hello->kshm_magic == LNET_PROTO_MAGIC)
-                conn->ksnc_flip = 0;
-        else
-                conn->ksnc_flip = 1;
-
-        rc = libcfs_sock_read(sock, &hello->kshm_src_nid,
-                              offsetof(ksock_hello_msg_t, kshm_ips) -
-                                       offsetof(ksock_hello_msg_t, kshm_src_nid),
-                              timeout);
-        if (rc != 0) {
-                CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
-                        rc, HIPQUAD(conn->ksnc_ipaddr));
-                LASSERT (rc < 0 && rc != -EALREADY);
-                return rc;
-        }
-
-        if (conn->ksnc_flip) {
-                __swab32s(&hello->kshm_src_pid);
-                __swab64s(&hello->kshm_src_nid);
-                __swab32s(&hello->kshm_dst_pid);
-                __swab64s(&hello->kshm_dst_nid);
-                __swab64s(&hello->kshm_src_incarnation);
-                __swab64s(&hello->kshm_dst_incarnation);
-                __swab32s(&hello->kshm_ctype);
-                __swab32s(&hello->kshm_nips);
-        }
-
-        if (hello->kshm_nips > LNET_MAX_INTERFACES) {
-                CERROR("Bad nips %d from ip %u.%u.%u.%u\n",
-                       hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr));
-                return -EPROTO;
-        }
-
-        if (hello->kshm_nips == 0)
-                return 0;
-
-        rc = libcfs_sock_read(sock, hello->kshm_ips,
-                              hello->kshm_nips * sizeof(__u32), timeout);
-        if (rc != 0) {
-                CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n",
-                        rc, HIPQUAD(conn->ksnc_ipaddr));
-                LASSERT (rc < 0 && rc != -EALREADY);
-                return rc;
-        }
-
-        for (i = 0; i < (int) hello->kshm_nips; i++) {
-                if (conn->ksnc_flip)
-                        __swab32s(&hello->kshm_ips[i]);
-
-                if (hello->kshm_ips[i] == 0) {
-                        CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n",
-                               i, HIPQUAD(conn->ksnc_ipaddr));
-                        return -EPROTO;
-                }
-        }
-
-        return 0;
-}
-
-static void
-ksocknal_pack_msg_v1(ksock_tx_t *tx)
-{
-        /* V1.x has no KSOCK_MSG_NOOP */
-        LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
-        LASSERT(tx->tx_lnetmsg != NULL);
-
-        tx->tx_iov[0].iov_base = (void *)&tx->tx_lnetmsg->msg_hdr;
-        tx->tx_iov[0].iov_len  = sizeof(lnet_hdr_t);
-
-        tx->tx_resid = tx->tx_nob = tx->tx_lnetmsg->msg_len + sizeof(lnet_hdr_t);
-}
-
-static void
-ksocknal_pack_msg_v2(ksock_tx_t *tx)
-{
-        tx->tx_iov[0].iov_base = (void *)&tx->tx_msg;
-
-        if (tx->tx_lnetmsg != NULL) {
-                LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
-
-                tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = tx->tx_lnetmsg->msg_hdr;
-                tx->tx_iov[0].iov_len = sizeof(ksock_msg_t);
-                tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + tx->tx_lnetmsg->msg_len;
-        } else {
-                LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
-
-                tx->tx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
-                tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t,  ksm_u.lnetmsg.ksnm_hdr);
-        }
-        /* Don't checksum before start sending, because packet can be piggybacked with ACK */
-}
-
-static void
-ksocknal_unpack_msg_v1(ksock_msg_t *msg)
-{
-        msg->ksm_type           = KSOCK_MSG_LNET;
-        msg->ksm_csum           = 0;
-        msg->ksm_zc_req_cookie  = 0;
-        msg->ksm_zc_ack_cookie  = 0;
-}
-
-static void
-ksocknal_unpack_msg_v2(ksock_msg_t *msg)
-{
-        return;  /* Do nothing */
-}
-
-ksock_proto_t  ksocknal_protocol_v1x =
-{
-        KSOCK_PROTO_V1,
-        ksocknal_send_hello_v1,
-        ksocknal_recv_hello_v1,
-        ksocknal_pack_msg_v1,
-        ksocknal_unpack_msg_v1
-};
-
-ksock_proto_t  ksocknal_protocol_v2x =
-{
-        KSOCK_PROTO_V2,
-        ksocknal_send_hello_v2,
-        ksocknal_recv_hello_v2,
-        ksocknal_pack_msg_v2,
-        ksocknal_unpack_msg_v2
-};
-
 int
 ksocknal_send_hello (lnet_ni_t *ni, ksock_conn_t *conn,
                      lnet_nid_t peer_nid, ksock_hello_msg_t *hello)
 int
 ksocknal_send_hello (lnet_ni_t *ni, ksock_conn_t *conn,
                      lnet_nid_t peer_nid, ksock_hello_msg_t *hello)
@@ -2178,9 +1720,11 @@ ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn,
         if (proto == NULL) {
                 if (!active) {
                         /* unknown protocol from peer, tell peer my protocol */
         if (proto == NULL) {
                 if (!active) {
                         /* unknown protocol from peer, tell peer my protocol */
-                        conn->ksnc_proto = &ksocknal_protocol_v2x;
+                        conn->ksnc_proto = &ksocknal_protocol_v3x;
 #if SOCKNAL_VERSION_DEBUG
 #if SOCKNAL_VERSION_DEBUG
-                        if (*ksocknal_tunables.ksnd_protocol != 2)
+                        if (*ksocknal_tunables.ksnd_protocol == 2)
+                                conn->ksnc_proto = &ksocknal_protocol_v2x;
+                        else if (*ksocknal_tunables.ksnd_protocol == 1)
                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
 #endif
                         hello->kshm_nips = 0;
                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
 #endif
                         hello->kshm_nips = 0;
@@ -2391,14 +1935,18 @@ ksocknal_connect (ksock_route_t *route)
         if (!list_empty(&peer->ksnp_tx_queue) &&
             peer->ksnp_accepting == 0 &&
             ksocknal_find_connecting_route_locked(peer) == NULL) {
         if (!list_empty(&peer->ksnp_tx_queue) &&
             peer->ksnp_accepting == 0 &&
             ksocknal_find_connecting_route_locked(peer) == NULL) {
+                ksock_conn_t *conn;
+
                 /* ksnp_tx_queue is queued on a conn on successful
                 /* ksnp_tx_queue is queued on a conn on successful
-                 * connection */
-                LASSERT (list_empty (&peer->ksnp_conns));
+                 * connection for V1.x and V2.x */
+                if (!list_empty (&peer->ksnp_conns)) {
+                        conn = list_entry(peer->ksnp_conns.next, ksock_conn_t, ksnc_list);
+                        LASSERT (conn->ksnc_proto == &ksocknal_protocol_v3x);
+                }
 
                 /* take all the blocked packets while I've got the lock and
                  * complete below... */
 
                 /* take all the blocked packets while I've got the lock and
                  * complete below... */
-                list_add(&zombies, &peer->ksnp_tx_queue);
-                list_del_init(&peer->ksnp_tx_queue);
+                list_splice_init(&peer->ksnp_tx_queue, &zombies);
         }
 
 #if 0           /* irrelevent with only eager routes */
         }
 
 #if 0           /* irrelevent with only eager routes */
@@ -2615,6 +2163,68 @@ ksocknal_flush_stale_txs(ksock_peer_t *peer)
         ksocknal_txlist_done(peer->ksnp_ni, &stale_txs, 1);
 }
 
         ksocknal_txlist_done(peer->ksnp_ni, &stale_txs, 1);
 }
 
+int
+ksocknal_send_keepalive_locked(ksock_peer_t *peer)
+{
+        ksock_sched_t  *sched;
+        ksock_conn_t   *conn;
+        ksock_tx_t     *tx;
+
+        if (list_empty(&peer->ksnp_conns)) /* last_alive will be updated by create_conn */
+                return 0;
+
+        if (peer->ksnp_proto != &ksocknal_protocol_v3x)
+                return 0;
+
+        if (*ksocknal_tunables.ksnd_keepalive <= 0 ||
+            cfs_time_before(cfs_time_current(),
+                            cfs_time_add(peer->ksnp_last_alive,
+                                         cfs_time_seconds(*ksocknal_tunables.ksnd_keepalive))))
+                return 0;
+
+        if (cfs_time_before(cfs_time_current(),
+                            peer->ksnp_send_keepalive))
+                return 0;
+
+        /* retry 10 secs later, so we wouldn't put pressure
+         * on this peer if we failed to send keepalive this time */
+        peer->ksnp_send_keepalive = cfs_time_shift(10);
+
+        conn = ksocknal_find_conn_locked(peer, NULL, 1);
+        if (conn != NULL) {
+                sched = conn->ksnc_scheduler;
+
+                spin_lock_bh (&sched->kss_lock);
+                if (!list_empty(&conn->ksnc_tx_queue)) {
+                        spin_unlock_bh(&sched->kss_lock);
+                        /* there is an queued ACK, don't need keepalive */
+                        return 0;
+                }
+
+                spin_unlock_bh(&sched->kss_lock);
+        }
+
+        read_unlock(&ksocknal_data.ksnd_global_lock);
+
+        /* cookie = 1 is reserved for keepalive PING */
+        tx = ksocknal_alloc_tx_noop(1, 1);
+        if (tx == NULL) {
+                read_lock(&ksocknal_data.ksnd_global_lock);
+                return -ENOMEM;
+        }
+
+        if (ksocknal_launch_packet(peer->ksnp_ni, tx, peer->ksnp_id) == 0) {
+                read_lock(&ksocknal_data.ksnd_global_lock);
+                return 1;
+        }
+
+        ksocknal_free_tx(tx);
+        read_lock(&ksocknal_data.ksnd_global_lock);
+
+        return -EIO;
+}
+
+
 void
 ksocknal_check_peer_timeouts (int idx)
 {
 void
 ksocknal_check_peer_timeouts (int idx)
 {
@@ -2631,6 +2241,11 @@ ksocknal_check_peer_timeouts (int idx)
 
         list_for_each (ptmp, peers) {
                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
 
         list_for_each (ptmp, peers) {
                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
+                if (ksocknal_send_keepalive_locked(peer) != 0) {
+                        read_unlock (&ksocknal_data.ksnd_global_lock);
+                        goto again;
+                }
+
                 conn = ksocknal_find_timed_out_conn (peer);
 
                 if (conn != NULL) {
                 conn = ksocknal_find_timed_out_conn (peer);
 
                 if (conn != NULL) {
index 5b0a9e9..8595e4c 100644 (file)
@@ -55,6 +55,8 @@ enum {
         SOCKLND_TX_BUFFER_SIZE,
         SOCKLND_NAGLE,
         SOCKLND_IRQ_AFFINITY,
         SOCKLND_TX_BUFFER_SIZE,
         SOCKLND_NAGLE,
         SOCKLND_IRQ_AFFINITY,
+        SOCKLND_ROUND_ROBIN,
+        SOCKLND_KEEPALIVE,
         SOCKLND_KEEPALIVE_IDLE,
         SOCKLND_KEEPALIVE_COUNT,
         SOCKLND_KEEPALIVE_INTVL,
         SOCKLND_KEEPALIVE_IDLE,
         SOCKLND_KEEPALIVE_COUNT,
         SOCKLND_KEEPALIVE_INTVL,
@@ -80,6 +82,8 @@ enum {
 #define SOCKLND_TX_BUFFER_SIZE  CTL_UNNUMBERED
 #define SOCKLND_NAGLE           CTL_UNNUMBERED
 #define SOCKLND_IRQ_AFFINITY    CTL_UNNUMBERED
 #define SOCKLND_TX_BUFFER_SIZE  CTL_UNNUMBERED
 #define SOCKLND_NAGLE           CTL_UNNUMBERED
 #define SOCKLND_IRQ_AFFINITY    CTL_UNNUMBERED
+#define SOCKLND_ROUND_ROBIN     CTL_UNNUMBERED
+#define SOCKLND_KEEPALIVE       CTL_UNNUMBERED
 #define SOCKLND_KEEPALIVE_IDLE  CTL_UNNUMBERED
 #define SOCKLND_KEEPALIVE_COUNT CTL_UNNUMBERED
 #define SOCKLND_KEEPALIVE_INTVL CTL_UNNUMBERED
 #define SOCKLND_KEEPALIVE_IDLE  CTL_UNNUMBERED
 #define SOCKLND_KEEPALIVE_COUNT CTL_UNNUMBERED
 #define SOCKLND_KEEPALIVE_INTVL CTL_UNNUMBERED
@@ -157,7 +161,7 @@ static cfs_sysctl_table_t ksocknal_ctl_table[] = {
         {
                 .ctl_name = SOCKLND_ZERO_COPY,
                 .procname = "zero_copy",
         {
                 .ctl_name = SOCKLND_ZERO_COPY,
                 .procname = "zero_copy",
-                .data     = &ksocknal_tunables.ksnd_zc_min_frag,
+                .data     = &ksocknal_tunables.ksnd_zc_min_payload,
                 .maxlen   = sizeof (int),
                 .mode     = 0644,
                 .proc_handler = &proc_dointvec,
                 .maxlen   = sizeof (int),
                 .mode     = 0644,
                 .proc_handler = &proc_dointvec,
@@ -239,6 +243,24 @@ static cfs_sysctl_table_t ksocknal_ctl_table[] = {
         },
 #endif
         {
         },
 #endif
         {
+                .ctl_name = SOCKLND_ROUND_ROBIN,
+                .procname = "round_robin",
+                .data     = &ksocknal_tunables.ksnd_round_robin,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec
+                .strategy = &sysctl_intvec,
+        },
+        {
+                .ctl_name = SOCKLND_KEEPALIVE,
+                .procname = "keepalive",
+                .data     = &ksocknal_tunables.ksnd_keepalive,
+                .maxlen   = sizeof(int),
+                .mode     = 0644,
+                .proc_handler = &proc_dointvec
+                .strategy = &sysctl_intvec,
+        },
+        {
                 .ctl_name = SOCKLND_KEEPALIVE_IDLE,
                 .procname = "keepalive_idle",
                 .data     = &ksocknal_tunables.ksnd_keepalive_idle,
                 .ctl_name = SOCKLND_KEEPALIVE_IDLE,
                 .procname = "keepalive_idle",
                 .data     = &ksocknal_tunables.ksnd_keepalive_idle,
@@ -315,6 +337,18 @@ cfs_sysctl_table_t ksocknal_top_ctl_table[] = {
 int
 ksocknal_lib_tunables_init ()
 {
 int
 ksocknal_lib_tunables_init ()
 {
+        if (!*ksocknal_tunables.ksnd_typed_conns) {
+                int rc = -EINVAL;
+#if SOCKNAL_VERSION_DEBUG
+                if (*ksocknal_tunables.ksnd_protocol < 3)
+                        rc = 0;
+#endif
+                if (rc != 0) {
+                        CERROR("Protocol V3.x MUST have typed connections\n");
+                        return rc;
+                }
+        }
+
         if (*ksocknal_tunables.ksnd_zc_recv_min_nfrags < 2)
                 *ksocknal_tunables.ksnd_zc_recv_min_nfrags = 2;
         if (*ksocknal_tunables.ksnd_zc_recv_min_nfrags > LNET_MAX_IOV)
         if (*ksocknal_tunables.ksnd_zc_recv_min_nfrags < 2)
                 *ksocknal_tunables.ksnd_zc_recv_min_nfrags = 2;
         if (*ksocknal_tunables.ksnd_zc_recv_min_nfrags > LNET_MAX_IOV)
@@ -447,9 +481,12 @@ ksocknal_lib_sock_irq (struct socket *sock)
 }
 
 int
 }
 
 int
-ksocknal_lib_zc_capable(struct socket *sock)
+ksocknal_lib_zc_capable(ksock_conn_t *conn)
 {
 {
-        int  caps = sock->sk->sk_route_caps;
+        int  caps = conn->ksnc_sock->sk->sk_route_caps;
+
+        if (conn->ksnc_proto == &ksocknal_protocol_v1x)
+                return 0;
 
         /* ZC if the socket supports scatter/gather and doesn't need software
          * checksums */
 
         /* ZC if the socket supports scatter/gather and doesn't need software
          * checksums */
@@ -514,15 +551,16 @@ int
 ksocknal_lib_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
 ksocknal_lib_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
-        lnet_kiov_t    *kiov = tx->tx_kiov;
+        lnet_kiov_t   *kiov = tx->tx_kiov;
         int            rc;
         int            nob;
 
         int            rc;
         int            nob;
 
+        /* Not NOOP message */
+        LASSERT (tx->tx_lnetmsg != NULL);
+
         /* NB we can't trust socket ops to either consume our iovs
          * or leave them alone. */
         /* NB we can't trust socket ops to either consume our iovs
          * or leave them alone. */
-
-        if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag &&
-            tx->tx_msg.ksm_zc_req_cookie != 0) {
+        if (tx->tx_msg.ksm_zc_cookies[0] != 0) {
                 /* Zero copy is enabled */
                 struct sock   *sk = sock->sk;
                 struct page   *page = kiov->kiov_page;
                 /* Zero copy is enabled */
                 struct sock   *sk = sock->sk;
                 struct page   *page = kiov->kiov_page;
index 0e5d65d..9631483 100755 (executable)
@@ -120,7 +120,7 @@ ksocknal_lib_tunables_init ()
 
         ksocknal_ctl_table[i].ctl_name = j++;
         ksocknal_ctl_table[i].procname = "zero_copy";
 
         ksocknal_ctl_table[i].ctl_name = j++;
         ksocknal_ctl_table[i].procname = "zero_copy";
-        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_zc_min_frag;
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_zc_min_payload;
         ksocknal_ctl_table[i].maxlen   = sizeof (int);
         ksocknal_ctl_table[i].mode     = 0644;
         ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
         ksocknal_ctl_table[i].maxlen   = sizeof (int);
         ksocknal_ctl_table[i].mode     = 0644;
         ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
@@ -166,6 +166,14 @@ ksocknal_lib_tunables_init ()
         ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
         i++;
 
         ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
         i++;
 
+        ksocknal_ctl_table[i].ctl_name = j++;
+        ksocknal_ctl_table[i].procname = "round_robin";
+        ksocknal_ctl_table[i].data     = ksocknal_tunables.ksnd_round_robin;
+        ksocknal_ctl_table[i].maxlen   = sizeof(int);
+        ksocknal_ctl_table[i].mode     = 0644;
+        ksocknal_ctl_table[i].proc_handler = &proc_dointvec;
+        i++;
+
 #ifdef CPU_AFFINITY
         ksocknal_ctl_table[i].ctl_name = j++;
         ksocknal_ctl_table[i].procname = "irq_affinity";
 #ifdef CPU_AFFINITY
         ksocknal_ctl_table[i].ctl_name = j++;
         ksocknal_ctl_table[i].procname = "irq_affinity";
@@ -646,7 +654,7 @@ ksocknal_lib_reset_callback(struct socket *sock, ksock_conn_t *conn)
 }
 
 int
 }
 
 int
-ksocknal_lib_zc_capable(struct socket *sock)
+ksocknal_lib_zc_capable(ksock_conn_t *conn)
 {
         return 0;
 }
 {
         return 0;
 }
index d42d696..d508509 100644 (file)
@@ -83,6 +83,14 @@ static int nagle = 0;
 CFS_MODULE_PARM(nagle, "i", int, 0644,
                 "enable NAGLE?");
 
 CFS_MODULE_PARM(nagle, "i", int, 0644,
                 "enable NAGLE?");
 
+static int round_robin = 1;
+CFS_MODULE_PARM(round_robin, "i", int, 0644,
+                "Round robin for multiple interfaces");
+
+static int keepalive = 30;
+CFS_MODULE_PARM(keepalive, "i", int, 0644,
+                "# seconds before send keepalive");
+
 static int keepalive_idle = 30;
 CFS_MODULE_PARM(keepalive_idle, "i", int, 0644,
                 "# idle seconds before probe");
 static int keepalive_idle = 30;
 CFS_MODULE_PARM(keepalive_idle, "i", int, 0644,
                 "# idle seconds before probe");
@@ -113,9 +121,9 @@ CFS_MODULE_PARM(enable_irq_affinity, "i", int, 0644,
                 "enable IRQ affinity");
 #endif
 
                 "enable IRQ affinity");
 #endif
 
-static unsigned int zc_min_frag = (2<<10);
-CFS_MODULE_PARM(zc_min_frag, "i", int, 0644,
-                "minimum fragment to zero copy");
+static unsigned int zc_min_payload = (16 << 10);
+CFS_MODULE_PARM(zc_min_payload, "i", int, 0644,
+                "minimum payload size to zero copy");
 
 static unsigned int zc_recv = 0;
 CFS_MODULE_PARM(zc_recv, "i", int, 0644,
 
 static unsigned int zc_recv = 0;
 CFS_MODULE_PARM(zc_recv, "i", int, 0644,
@@ -136,7 +144,7 @@ CFS_MODULE_PARM(backoff_max, "i", int, 0644,
 #endif
 
 #if SOCKNAL_VERSION_DEBUG
 #endif
 
 #if SOCKNAL_VERSION_DEBUG
-static int protocol = 2;
+static int protocol = 3;
 CFS_MODULE_PARM(protocol, "i", int, 0644,
                 "protocol version");
 #endif
 CFS_MODULE_PARM(protocol, "i", int, 0644,
                 "protocol version");
 #endif
@@ -157,6 +165,8 @@ int ksocknal_tunables_init(void)
         ksocknal_tunables.ksnd_tx_buffer_size     = &tx_buffer_size;
         ksocknal_tunables.ksnd_rx_buffer_size     = &rx_buffer_size;
         ksocknal_tunables.ksnd_nagle              = &nagle;
         ksocknal_tunables.ksnd_tx_buffer_size     = &tx_buffer_size;
         ksocknal_tunables.ksnd_rx_buffer_size     = &rx_buffer_size;
         ksocknal_tunables.ksnd_nagle              = &nagle;
+        ksocknal_tunables.ksnd_round_robin        = &round_robin;
+        ksocknal_tunables.ksnd_keepalive          = &keepalive;
         ksocknal_tunables.ksnd_keepalive_idle     = &keepalive_idle;
         ksocknal_tunables.ksnd_keepalive_count    = &keepalive_count;
         ksocknal_tunables.ksnd_keepalive_intvl    = &keepalive_intvl;
         ksocknal_tunables.ksnd_keepalive_idle     = &keepalive_idle;
         ksocknal_tunables.ksnd_keepalive_count    = &keepalive_count;
         ksocknal_tunables.ksnd_keepalive_intvl    = &keepalive_intvl;
@@ -164,11 +174,10 @@ int ksocknal_tunables_init(void)
         ksocknal_tunables.ksnd_peercredits        = &peer_credits;
         ksocknal_tunables.ksnd_enable_csum        = &enable_csum;
         ksocknal_tunables.ksnd_inject_csum_error  = &inject_csum_error;
         ksocknal_tunables.ksnd_peercredits        = &peer_credits;
         ksocknal_tunables.ksnd_enable_csum        = &enable_csum;
         ksocknal_tunables.ksnd_inject_csum_error  = &inject_csum_error;
-        ksocknal_tunables.ksnd_zc_min_frag        = &zc_min_frag;
+        ksocknal_tunables.ksnd_zc_min_payload     = &zc_min_payload;
         ksocknal_tunables.ksnd_zc_recv            = &zc_recv;
         ksocknal_tunables.ksnd_zc_recv_min_nfrags = &zc_recv_min_nfrags;
 
         ksocknal_tunables.ksnd_zc_recv            = &zc_recv;
         ksocknal_tunables.ksnd_zc_recv_min_nfrags = &zc_recv_min_nfrags;
 
-
 #ifdef CPU_AFFINITY
         ksocknal_tunables.ksnd_irq_affinity       = &enable_irq_affinity;
 #endif
 #ifdef CPU_AFFINITY
         ksocknal_tunables.ksnd_irq_affinity       = &enable_irq_affinity;
 #endif
@@ -186,6 +195,9 @@ int ksocknal_tunables_init(void)
         ksocknal_tunables.ksnd_sysctl             =  NULL;
 #endif
 
         ksocknal_tunables.ksnd_sysctl             =  NULL;
 #endif
 
+        if (*ksocknal_tunables.ksnd_zc_min_payload < (2 << 10))
+                *ksocknal_tunables.ksnd_zc_min_payload = (2 << 10);
+
         /* initialize platform-sepcific tunables */
         return ksocknal_lib_tunables_init();
 };
         /* initialize platform-sepcific tunables */
         return ksocknal_lib_tunables_init();
 };
diff --git a/lnet/klnds/socklnd/socklnd_proto.c b/lnet/klnds/socklnd/socklnd_proto.c
new file mode 100644 (file)
index 0000000..20bdc4d
--- /dev/null
@@ -0,0 +1,794 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ *
+ *   Author: Zach Brown <zab@zabbo.net>
+ *   Author: Peter J. Braam <braam@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *   Author: Eric Barton <eric@bartonsoftware.com>
+ *
+ *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
+ *
+ *   Portals is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Portals is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Portals; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#include "socklnd.h"
+
+/*
+ * Protocol entries :
+ *   pro_send_hello       : send hello message
+ *   pro_recv_hello       : receive hello message
+ *   pro_pack             : pack message header
+ *   pro_unpack           : unpack message header
+ *   pro_queue_tx_zcack() : Called holding BH lock: kss_lock
+ *                          return 1 if ACK is piggybacked, otherwise return 0
+ *   pro_queue_tx_msg()   : Called holding BH lock: kss_lock
+ *                          return the ACK that piggybacked by my message, or NULL
+ *   pro_handle_zcreq()   : handler of incoming ZC-REQ
+ *   pro_handle_zcack()   : handler of incoming ZC-ACK
+ *   pro_match_tx()       : Called holding glock
+ */
+
+static ksock_tx_t *
+ksocknal_queue_tx_msg_v1(ksock_conn_t *conn, ksock_tx_t *tx_msg)
+{
+        /* V1.x, just enqueue it */
+        list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
+        return NULL;
+}
+
+void
+ksocknal_next_tx_carrier(ksock_conn_t *conn)
+{
+        ksock_tx_t     *tx = conn->ksnc_tx_carrier;
+
+        /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */
+        LASSERT (!list_empty(&conn->ksnc_tx_queue));
+        LASSERT (tx != NULL);
+
+        /* Next TX that can carry ZC-ACK or LNet message */
+        if (tx->tx_list.next == &conn->ksnc_tx_queue) {
+                /* no more packets queued */
+                conn->ksnc_tx_carrier = NULL;
+        } else {
+                conn->ksnc_tx_carrier = list_entry(tx->tx_list.next, ksock_tx_t, tx_list);
+                LASSERT (conn->ksnc_tx_carrier->tx_msg.ksm_type == tx->tx_msg.ksm_type);
+        }
+}
+
+static int
+ksocknal_queue_tx_zcack_v2(ksock_conn_t *conn,
+                           ksock_tx_t *tx_ack, __u64 cookie)
+{
+        ksock_tx_t *tx = conn->ksnc_tx_carrier;
+
+        LASSERT (tx_ack == NULL ||
+                 tx_ack->tx_msg.ksm_type == KSOCK_MSG_NOOP);
+
+        /*
+         * Enqueue or piggyback tx_ack / cookie
+         * . no tx can piggyback cookie of tx_ack (or cookie), just
+         *   enqueue the tx_ack (if tx_ack != NUL) and return NULL.
+         * . There is tx can piggyback cookie of tx_ack (or cookie),
+         *   piggyback the cookie and return the tx.
+         */
+        if (tx == NULL) {
+                if (tx_ack != NULL) {
+                        list_add_tail(&tx_ack->tx_list, &conn->ksnc_tx_queue);
+                        conn->ksnc_tx_carrier = tx_ack;
+                }
+                return 0;
+        }
+
+        if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) {
+                /* tx is noop zc-ack, can't piggyback zc-ack cookie */
+                if (tx_ack != NULL)
+                        list_add_tail(&tx_ack->tx_list, &conn->ksnc_tx_queue);
+                return 0;
+        }
+
+        LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET);
+        LASSERT(tx->tx_msg.ksm_zc_cookies[1] == 0);
+
+        if (tx_ack != NULL)
+                cookie = tx_ack->tx_msg.ksm_zc_cookies[1];
+
+        /* piggyback the zc-ack cookie */
+        tx->tx_msg.ksm_zc_cookies[1] = cookie;
+        /* move on to the next TX which can carry cookie */
+        ksocknal_next_tx_carrier(conn);
+
+        return 1;
+}
+
+static ksock_tx_t *
+ksocknal_queue_tx_msg_v2(ksock_conn_t *conn, ksock_tx_t *tx_msg)
+{
+        ksock_tx_t  *tx  = conn->ksnc_tx_carrier;
+
+        /*
+         * Enqueue tx_msg:
+         * . If there is no NOOP on the connection, just enqueue
+         *   tx_msg and return NULL
+         * . If there is NOOP on the connection, piggyback the cookie
+         *   and replace the NOOP tx, and return the NOOP tx.
+         */
+        if (tx == NULL) { /* nothing on queue */
+                list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
+                conn->ksnc_tx_carrier = tx_msg;
+                return NULL;
+        }
+
+        if (tx->tx_msg.ksm_type == KSOCK_MSG_LNET) { /* nothing to carry */
+                list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
+                return NULL;
+        }
+
+        LASSERT (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
+
+        /* There is a noop zc-ack can be piggybacked */
+        tx_msg->tx_msg.ksm_zc_cookies[1] = tx->tx_msg.ksm_zc_cookies[1];
+        ksocknal_next_tx_carrier(conn);
+
+        /* use new_tx to replace the noop zc-ack packet */
+        list_add(&tx_msg->tx_list, &tx->tx_list);
+        list_del(&tx->tx_list);
+
+        return tx;
+}
+
+static int
+ksocknal_queue_tx_zcack_v3(ksock_conn_t *conn,
+                           ksock_tx_t *tx_ack, __u64 cookie)
+{
+        ksock_tx_t *tx;
+
+        if (conn->ksnc_type != SOCKLND_CONN_ACK)
+                return ksocknal_queue_tx_zcack_v2(conn, tx_ack, cookie);
+
+        /* non-blocking ZC-ACK (to router) */
+        LASSERT (tx_ack == NULL ||
+                 tx_ack->tx_msg.ksm_type == KSOCK_MSG_NOOP);
+
+        if ((tx = conn->ksnc_tx_carrier) == NULL) {
+                if (tx_ack != NULL) {
+                        list_add_tail(&tx_ack->tx_list, &conn->ksnc_tx_queue);
+                        conn->ksnc_tx_carrier = tx_ack;
+                }
+                return 0;
+        }
+
+        /* conn->ksnc_tx_carrier != NULL */
+
+        if (tx_ack != NULL)
+                cookie = tx_ack->tx_msg.ksm_zc_cookies[1];
+
+        if (cookie == SOCKNAL_KEEPALIVE_PING) /* ignore keepalive PING */
+                return 1;
+
+        if (tx->tx_msg.ksm_zc_cookies[1] == SOCKNAL_KEEPALIVE_PING) {
+                /* replace the keepalive PING with a real ACK */
+                LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0);
+                tx->tx_msg.ksm_zc_cookies[1] = cookie;
+                return 1;
+        }
+
+        if (cookie == tx->tx_msg.ksm_zc_cookies[0] ||
+            cookie == tx->tx_msg.ksm_zc_cookies[1]) {
+                CWARN("%s: duplicated ZC cookie: "LPU64"\n",
+                      libcfs_id2str(conn->ksnc_peer->ksnp_id), cookie);
+                return 1; /* XXX return error in the future */
+        }
+
+        if (tx->tx_msg.ksm_zc_cookies[0] == 0) {
+                /* NOOP tx has only one ZC-ACK cookie, can carry at least one more */
+                if (tx->tx_msg.ksm_zc_cookies[1] > cookie) {
+                        tx->tx_msg.ksm_zc_cookies[0] = tx->tx_msg.ksm_zc_cookies[1];
+                        tx->tx_msg.ksm_zc_cookies[1] = cookie;
+                } else {
+                        tx->tx_msg.ksm_zc_cookies[0] = cookie;
+                }
+
+                if (tx->tx_msg.ksm_zc_cookies[0] - tx->tx_msg.ksm_zc_cookies[1] > 2) {
+                        /* not likely to carry more ACKs, skip it to simplify logic */
+                        ksocknal_next_tx_carrier(conn);
+                }
+
+                return 1;
+        }
+
+        /* takes two or more cookies already */
+
+        if (tx->tx_msg.ksm_zc_cookies[0] > tx->tx_msg.ksm_zc_cookies[1]) {
+                __u64   tmp = 0;
+
+                /* two seperated cookies: (a+2, a) or (a+1, a) */
+                LASSERT (tx->tx_msg.ksm_zc_cookies[0] -
+                         tx->tx_msg.ksm_zc_cookies[1] <= 2);
+
+                if (tx->tx_msg.ksm_zc_cookies[0] -
+                    tx->tx_msg.ksm_zc_cookies[1] == 2) {
+                        if (cookie == tx->tx_msg.ksm_zc_cookies[1] + 1)
+                                tmp = cookie;
+                } else if (cookie == tx->tx_msg.ksm_zc_cookies[1] - 1) {
+                        tmp = tx->tx_msg.ksm_zc_cookies[1];
+                } else if (cookie == tx->tx_msg.ksm_zc_cookies[0] + 1) {
+                        tmp = tx->tx_msg.ksm_zc_cookies[0];
+                }
+
+                if (tmp != 0) {
+                        /* range of cookies */
+                        tx->tx_msg.ksm_zc_cookies[0] = tmp - 1;
+                        tx->tx_msg.ksm_zc_cookies[1] = tmp + 1;
+                        return 1;
+                }
+
+        } else {
+                /* ksm_zc_cookies[0] < ksm_zc_cookies[1], it is range of cookies */
+                if (cookie >= tx->tx_msg.ksm_zc_cookies[0] &&
+                    cookie <= tx->tx_msg.ksm_zc_cookies[1]) {
+                        CWARN("%s: duplicated ZC cookie: "LPU64"\n",
+                              libcfs_id2str(conn->ksnc_peer->ksnp_id), cookie);
+                        return 1; /* XXX: return error in the future */
+                }
+
+                if (cookie == tx->tx_msg.ksm_zc_cookies[1] + 1) {
+                        tx->tx_msg.ksm_zc_cookies[1] = cookie;
+                        return 1;
+                }
+
+                if (cookie == tx->tx_msg.ksm_zc_cookies[0] - 1) {
+                        tx->tx_msg.ksm_zc_cookies[0] = cookie;
+                        return 1;
+                }
+        }
+
+        /* failed to piggyback ZC-ACK */
+        if (tx_ack != NULL) {
+                list_add_tail(&tx_ack->tx_list, &conn->ksnc_tx_queue);
+                /* the next tx can piggyback at least 1 ACK */
+                ksocknal_next_tx_carrier(conn);
+        }
+
+        return 0;
+}
+
+static int
+ksocknal_match_tx(ksock_conn_t *conn, ksock_tx_t *tx, int nonblk)
+{
+        int nob;
+
+#if SOCKNAL_VERSION_DEBUG
+        if (!*ksocknal_tunables.ksnd_typed_conns)
+                return SOCKNAL_MATCH_YES;
+#endif
+
+        if (tx == NULL || tx->tx_lnetmsg == NULL) {
+                /* noop packet */
+                nob = offsetof(ksock_msg_t, ksm_u);
+        } else {
+                nob = tx->tx_lnetmsg->msg_len +
+                      ((conn->ksnc_proto == &ksocknal_protocol_v1x) ?
+                       sizeof(lnet_hdr_t) : sizeof(ksock_msg_t));
+        }
+
+        /* default checking for typed connection */
+        switch (conn->ksnc_type) {
+        default:
+                CERROR("ksnc_type bad: %u\n", conn->ksnc_type);
+                LBUG();
+        case SOCKLND_CONN_ANY:
+                return SOCKNAL_MATCH_YES;
+
+        case SOCKLND_CONN_BULK_IN:
+                return SOCKNAL_MATCH_MAY;
+
+        case SOCKLND_CONN_BULK_OUT:
+                if (nob < *ksocknal_tunables.ksnd_min_bulk)
+                        return SOCKNAL_MATCH_MAY;
+                else
+                        return SOCKNAL_MATCH_YES;
+
+        case SOCKLND_CONN_CONTROL:
+                if (nob >= *ksocknal_tunables.ksnd_min_bulk)
+                        return SOCKNAL_MATCH_MAY;
+                else
+                        return SOCKNAL_MATCH_YES;
+        }
+}
+
+static int
+ksocknal_match_tx_v3(ksock_conn_t *conn, ksock_tx_t *tx, int nonblk)
+{
+        int nob;
+
+        if (tx == NULL || tx->tx_lnetmsg == NULL)
+                nob = offsetof(ksock_msg_t, ksm_u);
+        else
+                nob = tx->tx_lnetmsg->msg_len + sizeof(ksock_msg_t);
+
+        switch (conn->ksnc_type) {
+        default:
+                CERROR("ksnc_type bad: %u\n", conn->ksnc_type);
+                LBUG();
+        case SOCKLND_CONN_ANY:
+                return SOCKNAL_MATCH_NO;
+
+        case SOCKLND_CONN_ACK:
+                if (nonblk)
+                        return SOCKNAL_MATCH_YES;
+                else if (tx == NULL || tx->tx_lnetmsg == NULL)
+                        return SOCKNAL_MATCH_MAY;
+                else
+                        return SOCKNAL_MATCH_NO;
+
+        case SOCKLND_CONN_BULK_OUT:
+                if (nonblk)
+                        return SOCKNAL_MATCH_NO;
+                else if (nob < *ksocknal_tunables.ksnd_min_bulk)
+                        return SOCKNAL_MATCH_MAY;
+                else
+                        return SOCKNAL_MATCH_YES;
+
+        case SOCKLND_CONN_CONTROL:
+                if (nonblk)
+                        return SOCKNAL_MATCH_NO;
+                else if (nob >= *ksocknal_tunables.ksnd_min_bulk)
+                        return SOCKNAL_MATCH_MAY;
+                else
+                        return SOCKNAL_MATCH_YES;
+        }
+}
+
+/* (Sink) handle incoming ZC request from sender */
+static int
+ksocknal_handle_zcreq(ksock_conn_t *c, __u64 cookie, int remote)
+{
+        ksock_peer_t   *peer = c->ksnc_peer;
+        ksock_conn_t   *conn;
+        ksock_tx_t     *tx;
+        int             rc;
+
+        cfs_read_lock (&ksocknal_data.ksnd_global_lock);
+
+        conn = ksocknal_find_conn_locked(peer, NULL, !!remote);
+        if (conn != NULL) {
+                ksock_sched_t *sched = conn->ksnc_scheduler;
+
+                LASSERT (conn->ksnc_proto->pro_queue_tx_zcack != NULL);
+
+                cfs_spin_lock_bh (&sched->kss_lock);
+
+                rc = conn->ksnc_proto->pro_queue_tx_zcack(conn, NULL, cookie);
+
+                cfs_spin_unlock_bh (&sched->kss_lock);
+
+                if (rc) { /* piggybacked */
+                        read_unlock (&ksocknal_data.ksnd_global_lock);
+                        return 0;
+                }
+        }
+
+        cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
+
+        /* ACK connection is not ready, or can't piggyback the ACK */
+        tx = ksocknal_alloc_tx_noop(cookie, !!remote);
+        if (tx == NULL)
+                return -ENOMEM;
+
+        if ((rc = ksocknal_launch_packet(peer->ksnp_ni, tx, peer->ksnp_id)) == 0)
+                return 0;
+
+        ksocknal_free_tx(tx);
+        return rc;
+}
+
+/* (Sender) handle ZC_ACK from sink */
+static int
+ksocknal_handle_zcack(ksock_conn_t *conn, __u64 cookie1, __u64 cookie2)
+{
+        ksock_peer_t      *peer = conn->ksnc_peer;
+        ksock_tx_t        *tx;
+        ksock_tx_t        *tmp;
+        CFS_LIST_HEAD     (zlist);
+        int                count;
+
+        if (cookie1 == 0)
+                cookie1 = cookie2;
+
+        count = (cookie1 > cookie2) ? 2 : (cookie2 - cookie1 + 1);
+
+        if (cookie2 == SOCKNAL_KEEPALIVE_PING &&
+            conn->ksnc_proto == &ksocknal_protocol_v3x) {
+                /* keepalive PING for V3.x, just ignore it */
+                return count == 1 ? 0 : -EPROTO;
+        }
+
+        cfs_spin_lock(&peer->ksnp_lock);
+
+        list_for_each_entry_safe(tx, tmp,
+                                 &peer->ksnp_zc_req_list, tx_zc_list) {
+                __u64 c = tx->tx_msg.ksm_zc_cookies[0];
+
+                if (c == cookie1 || c == cookie2 || (cookie1 < c && c < cookie2)) {
+                        tx->tx_msg.ksm_zc_cookies[0] = 0;
+                        list_del(&tx->tx_zc_list);
+                        list_add(&tx->tx_zc_list, &zlist);
+
+                        if (--count == 0)
+                                break;
+                }
+        }
+
+        cfs_spin_unlock(&peer->ksnp_lock);
+
+        while (!list_empty(&zlist)) {
+                tx = list_entry(zlist.next, ksock_tx_t, tx_zc_list);
+                list_del(&tx->tx_zc_list);
+                ksocknal_tx_decref(tx);
+        }
+
+        return count == 0 ? 0 : -EPROTO;
+}
+
+static int
+ksocknal_send_hello_v1 (ksock_conn_t *conn, ksock_hello_msg_t *hello)
+{
+        cfs_socket_t        *sock = conn->ksnc_sock;
+        lnet_hdr_t          *hdr;
+        lnet_magicversion_t *hmv;
+        int                  rc;
+        int                  i;
+
+        CLASSERT(sizeof(lnet_magicversion_t) == offsetof(lnet_hdr_t, src_nid));
+
+        LIBCFS_ALLOC(hdr, sizeof(*hdr));
+        if (hdr == NULL) {
+                CERROR("Can't allocate lnet_hdr_t\n");
+                return -ENOMEM;
+        }
+
+        hmv = (lnet_magicversion_t *)&hdr->dest_nid;
+
+        /* Re-organize V2.x message header to V1.x (lnet_hdr_t)
+         * header and send out */
+        hmv->magic         = cpu_to_le32 (LNET_PROTO_TCP_MAGIC);
+        hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR);
+        hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR);
+
+        if (the_lnet.ln_testprotocompat != 0) {
+                /* single-shot proto check */
+                LNET_LOCK();
+                if ((the_lnet.ln_testprotocompat & 1) != 0) {
+                        hmv->version_major++;   /* just different! */
+                        the_lnet.ln_testprotocompat &= ~1;
+                }
+                if ((the_lnet.ln_testprotocompat & 2) != 0) {
+                        hmv->magic = LNET_PROTO_MAGIC;
+                        the_lnet.ln_testprotocompat &= ~2;
+                }
+                LNET_UNLOCK();
+        }
+
+        hdr->src_nid        = cpu_to_le64 (hello->kshm_src_nid);
+        hdr->src_pid        = cpu_to_le32 (hello->kshm_src_pid);
+        hdr->type           = cpu_to_le32 (LNET_MSG_HELLO);
+        hdr->payload_length = cpu_to_le32 (hello->kshm_nips * sizeof(__u32));
+        hdr->msg.hello.type = cpu_to_le32 (hello->kshm_ctype);
+        hdr->msg.hello.incarnation = cpu_to_le64 (hello->kshm_src_incarnation);
+
+        rc = libcfs_sock_write(sock, hdr, sizeof(*hdr), lnet_acceptor_timeout());
+
+        if (rc != 0) {
+                CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
+                        rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
+                goto out;
+        }
+
+        if (hello->kshm_nips == 0)
+                goto out;
+
+        for (i = 0; i < (int) hello->kshm_nips; i++) {
+                hello->kshm_ips[i] = __cpu_to_le32 (hello->kshm_ips[i]);
+        }
+
+        rc = libcfs_sock_write(sock, hello->kshm_ips,
+                               hello->kshm_nips * sizeof(__u32),
+                               lnet_acceptor_timeout());
+        if (rc != 0) {
+                CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)"
+                        " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips,
+                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
+        }
+out:
+        LIBCFS_FREE(hdr, sizeof(*hdr));
+
+        return rc;
+}
+
+static int
+ksocknal_send_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello)
+{
+        cfs_socket_t   *sock = conn->ksnc_sock;
+        int             rc;
+
+        hello->kshm_magic   = LNET_PROTO_MAGIC;
+        hello->kshm_version = conn->ksnc_proto->pro_version;
+
+        if (the_lnet.ln_testprotocompat != 0) {
+                /* single-shot proto check */
+                LNET_LOCK();
+                if ((the_lnet.ln_testprotocompat & 1) != 0) {
+                        hello->kshm_version++;   /* just different! */
+                        the_lnet.ln_testprotocompat &= ~1;
+                }
+                LNET_UNLOCK();
+        }
+
+        rc = libcfs_sock_write(sock, hello, offsetof(ksock_hello_msg_t, kshm_ips),
+                               lnet_acceptor_timeout());
+
+        if (rc != 0) {
+                CDEBUG (D_NETERROR, "Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
+                        rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
+                return rc;
+        }
+
+        if (hello->kshm_nips == 0)
+                return 0;
+
+        rc = libcfs_sock_write(sock, hello->kshm_ips,
+                               hello->kshm_nips * sizeof(__u32),
+                               lnet_acceptor_timeout());
+        if (rc != 0) {
+                CDEBUG (D_NETERROR, "Error %d sending HELLO payload (%d)"
+                        " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips,
+                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
+        }
+
+        return rc;
+}
+
+static int
+ksocknal_recv_hello_v1(ksock_conn_t *conn, ksock_hello_msg_t *hello,int timeout)
+{
+        cfs_socket_t        *sock = conn->ksnc_sock;
+        lnet_hdr_t          *hdr;
+        int                  rc;
+        int                  i;
+
+        LIBCFS_ALLOC(hdr, sizeof(*hdr));
+        if (hdr == NULL) {
+                CERROR("Can't allocate lnet_hdr_t\n");
+                return -ENOMEM;
+        }
+
+        rc = libcfs_sock_read(sock, &hdr->src_nid,
+                              sizeof (*hdr) - offsetof (lnet_hdr_t, src_nid),
+                              timeout);
+        if (rc != 0) {
+                CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n",
+                        rc, HIPQUAD(conn->ksnc_ipaddr));
+                LASSERT (rc < 0 && rc != -EALREADY);
+                goto out;
+        }
+
+        /* ...and check we got what we expected */
+        if (hdr->type != cpu_to_le32 (LNET_MSG_HELLO)) {
+                CERROR ("Expecting a HELLO hdr,"
+                        " but got type %d from %u.%u.%u.%u\n",
+                        le32_to_cpu (hdr->type),
+                        HIPQUAD(conn->ksnc_ipaddr));
+                rc = -EPROTO;
+                goto out;
+        }
+
+        hello->kshm_src_nid         = le64_to_cpu (hdr->src_nid);
+        hello->kshm_src_pid         = le32_to_cpu (hdr->src_pid);
+        hello->kshm_src_incarnation = le64_to_cpu (hdr->msg.hello.incarnation);
+        hello->kshm_ctype           = le32_to_cpu (hdr->msg.hello.type);
+        hello->kshm_nips            = le32_to_cpu (hdr->payload_length) /
+                                         sizeof (__u32);
+
+        if (hello->kshm_nips > LNET_MAX_INTERFACES) {
+                CERROR("Bad nips %d from ip %u.%u.%u.%u\n",
+                       hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr));
+                rc = -EPROTO;
+                goto out;
+        }
+
+        if (hello->kshm_nips == 0)
+                goto out;
+
+        rc = libcfs_sock_read(sock, hello->kshm_ips,
+                              hello->kshm_nips * sizeof(__u32), timeout);
+        if (rc != 0) {
+                CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n",
+                        rc, HIPQUAD(conn->ksnc_ipaddr));
+                LASSERT (rc < 0 && rc != -EALREADY);
+                goto out;
+        }
+
+        for (i = 0; i < (int) hello->kshm_nips; i++) {
+                hello->kshm_ips[i] = __le32_to_cpu(hello->kshm_ips[i]);
+
+                if (hello->kshm_ips[i] == 0) {
+                        CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n",
+                               i, HIPQUAD(conn->ksnc_ipaddr));
+                        rc = -EPROTO;
+                        break;
+                }
+        }
+out:
+        LIBCFS_FREE(hdr, sizeof(*hdr));
+
+        return rc;
+}
+
+static int
+ksocknal_recv_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello, int timeout)
+{
+        cfs_socket_t      *sock = conn->ksnc_sock;
+        int                rc;
+        int                i;
+
+        if (hello->kshm_magic == LNET_PROTO_MAGIC)
+                conn->ksnc_flip = 0;
+        else
+                conn->ksnc_flip = 1;
+
+        rc = libcfs_sock_read(sock, &hello->kshm_src_nid,
+                              offsetof(ksock_hello_msg_t, kshm_ips) -
+                                       offsetof(ksock_hello_msg_t, kshm_src_nid),
+                              timeout);
+        if (rc != 0) {
+                CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
+                        rc, HIPQUAD(conn->ksnc_ipaddr));
+                LASSERT (rc < 0 && rc != -EALREADY);
+                return rc;
+        }
+
+        if (conn->ksnc_flip) {
+                __swab32s(&hello->kshm_src_pid);
+                __swab64s(&hello->kshm_src_nid);
+                __swab32s(&hello->kshm_dst_pid);
+                __swab64s(&hello->kshm_dst_nid);
+                __swab64s(&hello->kshm_src_incarnation);
+                __swab64s(&hello->kshm_dst_incarnation);
+                __swab32s(&hello->kshm_ctype);
+                __swab32s(&hello->kshm_nips);
+        }
+
+        if (hello->kshm_nips > LNET_MAX_INTERFACES) {
+                CERROR("Bad nips %d from ip %u.%u.%u.%u\n",
+                       hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr));
+                return -EPROTO;
+        }
+
+        if (hello->kshm_nips == 0)
+                return 0;
+
+        rc = libcfs_sock_read(sock, hello->kshm_ips,
+                              hello->kshm_nips * sizeof(__u32), timeout);
+        if (rc != 0) {
+                CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n",
+                        rc, HIPQUAD(conn->ksnc_ipaddr));
+                LASSERT (rc < 0 && rc != -EALREADY);
+                return rc;
+        }
+
+        for (i = 0; i < (int) hello->kshm_nips; i++) {
+                if (conn->ksnc_flip)
+                        __swab32s(&hello->kshm_ips[i]);
+
+                if (hello->kshm_ips[i] == 0) {
+                        CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n",
+                               i, HIPQUAD(conn->ksnc_ipaddr));
+                        return -EPROTO;
+                }
+        }
+
+        return 0;
+}
+
+static void
+ksocknal_pack_msg_v1(ksock_tx_t *tx)
+{
+        /* V1.x has no KSOCK_MSG_NOOP */
+        LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
+        LASSERT(tx->tx_lnetmsg != NULL);
+
+        tx->tx_iov[0].iov_base = (void *)&tx->tx_lnetmsg->msg_hdr;
+        tx->tx_iov[0].iov_len  = sizeof(lnet_hdr_t);
+
+        tx->tx_resid = tx->tx_nob = tx->tx_lnetmsg->msg_len + sizeof(lnet_hdr_t);
+}
+
+static void
+ksocknal_pack_msg_v2(ksock_tx_t *tx)
+{
+        tx->tx_iov[0].iov_base = (void *)&tx->tx_msg;
+
+        if (tx->tx_lnetmsg != NULL) {
+                LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
+
+                tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = tx->tx_lnetmsg->msg_hdr;
+                tx->tx_iov[0].iov_len = sizeof(ksock_msg_t);
+                tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + tx->tx_lnetmsg->msg_len;
+        } else {
+                LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
+
+                tx->tx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
+                tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t,  ksm_u.lnetmsg.ksnm_hdr);
+        }
+        /* Don't checksum before start sending, because packet can be piggybacked with ACK */
+}
+
+static void
+ksocknal_unpack_msg_v1(ksock_msg_t *msg)
+{
+        msg->ksm_csum           = 0;
+        msg->ksm_type           = KSOCK_MSG_LNET;
+        msg->ksm_zc_cookies[0]  = msg->ksm_zc_cookies[1]  = 0;
+}
+
+static void
+ksocknal_unpack_msg_v2(ksock_msg_t *msg)
+{
+        return;  /* Do nothing */
+}
+
+ksock_proto_t  ksocknal_protocol_v1x =
+{
+        .pro_version            = KSOCK_PROTO_V1,
+        .pro_send_hello         = ksocknal_send_hello_v1,
+        .pro_recv_hello         = ksocknal_recv_hello_v1,
+        .pro_pack               = ksocknal_pack_msg_v1,
+        .pro_unpack             = ksocknal_unpack_msg_v1,
+        .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v1,
+        .pro_handle_zcreq       = NULL,
+        .pro_handle_zcack       = NULL,
+        .pro_queue_tx_zcack     = NULL,
+        .pro_match_tx           = ksocknal_match_tx
+};
+
+ksock_proto_t  ksocknal_protocol_v2x =
+{
+        .pro_version            = KSOCK_PROTO_V2,
+        .pro_send_hello         = ksocknal_send_hello_v2,
+        .pro_recv_hello         = ksocknal_recv_hello_v2,
+        .pro_pack               = ksocknal_pack_msg_v2,
+        .pro_unpack             = ksocknal_unpack_msg_v2,
+        .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v2,
+        .pro_queue_tx_zcack     = ksocknal_queue_tx_zcack_v2,
+        .pro_handle_zcreq       = ksocknal_handle_zcreq,
+        .pro_handle_zcack       = ksocknal_handle_zcack,
+        .pro_match_tx           = ksocknal_match_tx
+};
+
+ksock_proto_t  ksocknal_protocol_v3x =
+{
+        .pro_version            = KSOCK_PROTO_V3,
+        .pro_send_hello         = ksocknal_send_hello_v2,
+        .pro_recv_hello         = ksocknal_recv_hello_v2,
+        .pro_pack               = ksocknal_pack_msg_v2,
+        .pro_unpack             = ksocknal_unpack_msg_v2,
+        .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v2,
+        .pro_queue_tx_zcack     = ksocknal_queue_tx_zcack_v3,
+        .pro_handle_zcreq       = ksocknal_handle_zcreq,
+        .pro_handle_zcack       = ksocknal_handle_zcack,
+        .pro_match_tx           = ksocknal_match_tx_v3
+};
+
index 2415165..a386bb1 100644 (file)
@@ -392,15 +392,6 @@ usocklnd_set_sock_options(int fd)
         return libcfs_fcntl_nonblock(fd);
 }
 
         return libcfs_fcntl_nonblock(fd);
 }
 
-void
-usocklnd_init_msg(ksock_msg_t *msg, int type)
-{
-        msg->ksm_type           = type;
-        msg->ksm_csum           = 0;
-        msg->ksm_zc_req_cookie  = 0;
-        msg->ksm_zc_ack_cookie  = 0;
-}
-
 usock_tx_t *
 usocklnd_create_noop_tx(__u64 cookie)
 {
 usock_tx_t *
 usocklnd_create_noop_tx(__u64 cookie)
 {
@@ -413,8 +404,8 @@ usocklnd_create_noop_tx(__u64 cookie)
         tx->tx_size = sizeof(usock_tx_t);
         tx->tx_lnetmsg = NULL;
 
         tx->tx_size = sizeof(usock_tx_t);
         tx->tx_lnetmsg = NULL;
 
-        usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
-        tx->tx_msg.ksm_zc_ack_cookie = cookie;
+        socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
+        tx->tx_msg.ksm_zc_cookies[1] = cookie;
         
         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
         
         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
         tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =
@@ -445,7 +436,7 @@ usocklnd_create_tx(lnet_msg_t *lntmsg)
 
         tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
         
 
         tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + payload_nob;
         
-        usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
+        socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
         tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
         tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;
         tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;
         tx->tx_iova[0].iov_len = sizeof(ksock_msg_t);
index e9fc984..32ef7d1 100644 (file)
@@ -153,13 +153,13 @@ usocklnd_read_msg(usock_conn_t *conn, int *cont_flag)
                 if (conn->uc_flip) {
                         __swab32s(&conn->uc_rx_msg.ksm_type);
                         __swab32s(&conn->uc_rx_msg.ksm_csum);
                 if (conn->uc_flip) {
                         __swab32s(&conn->uc_rx_msg.ksm_type);
                         __swab32s(&conn->uc_rx_msg.ksm_csum);
-                        __swab64s(&conn->uc_rx_msg.ksm_zc_req_cookie);
-                        __swab64s(&conn->uc_rx_msg.ksm_zc_ack_cookie);
+                        __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[0]);
+                        __swab64s(&conn->uc_rx_msg.ksm_zc_cookies[1]);
                 } 
 
                 /* we never send packets for wich zc-acking is required */
                 if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
                 } 
 
                 /* we never send packets for wich zc-acking is required */
                 if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||
-                    conn->uc_rx_msg.ksm_zc_ack_cookie != 0) {
+                    conn->uc_rx_msg.ksm_zc_cookies[1] != 0) {
                         conn->uc_errored = 1;
                         return -EPROTO;
                 }
                         conn->uc_errored = 1;
                         return -EPROTO;
                 }
@@ -230,7 +230,7 @@ usocklnd_read_msg(usock_conn_t *conn, int *cont_flag)
 
                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);
 
 
                 lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);
 
-                cookie = conn->uc_rx_msg.ksm_zc_req_cookie;
+                cookie = conn->uc_rx_msg.ksm_zc_cookies[0];
                 if (cookie != 0)
                         rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
                 
                 if (cookie != 0)
                         rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);
                 
@@ -742,7 +742,7 @@ usocklnd_try_piggyback(struct list_head *tx_list_p,
                 list_del(&tx->tx_list);
 
                 /* already piggybacked or partially send */
                 list_del(&tx->tx_list);
 
                 /* already piggybacked or partially send */
-                if (tx->tx_msg.ksm_zc_ack_cookie ||
+                if (tx->tx_msg.ksm_zc_cookies[1] != 0 ||
                     tx->tx_resid != tx->tx_nob)
                         return tx;
         }
                     tx->tx_resid != tx->tx_nob)
                         return tx;
         }
@@ -758,7 +758,7 @@ usocklnd_try_piggyback(struct list_head *tx_list_p,
                 
         if (tx != NULL)
                 /* piggyback the zc-ack cookie */
                 
         if (tx != NULL)
                 /* piggyback the zc-ack cookie */
-                tx->tx_msg.ksm_zc_ack_cookie = zc_ack->zc_cookie;
+                tx->tx_msg.ksm_zc_cookies[1] = zc_ack->zc_cookie;
         else
                 /* cannot piggyback, need noop */
                 tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);                     
         else
                 /* cannot piggyback, need noop */
                 tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);                     
index a7fc493..82680ba 100644 (file)
@@ -324,7 +324,6 @@ int usocklnd_create_active_conn(usock_peer_t *peer, int type,
 int usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port);
 int usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port);
 int usocklnd_set_sock_options(int fd);
 int usocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port);
 int usocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port);
 int usocklnd_set_sock_options(int fd);
-void usocklnd_init_msg(ksock_msg_t *msg, int type);
 usock_tx_t *usocklnd_create_noop_tx(__u64 cookie);
 usock_tx_t *usocklnd_create_tx(lnet_msg_t *lntmsg);
 void usocklnd_init_hello_msg(ksock_hello_msg_t *hello,
 usock_tx_t *usocklnd_create_noop_tx(__u64 cookie);
 usock_tx_t *usocklnd_create_tx(lnet_msg_t *lntmsg);
 void usocklnd_init_hello_msg(ksock_hello_msg_t *hello,