Whamcloud - gitweb
* lctl set_route <nid> <up/down> enables or disables particular portals
authoreeb <eeb>
Thu, 25 Sep 2003 12:15:16 +0000 (12:15 +0000)
committereeb <eeb>
Thu, 25 Sep 2003 12:15:16 +0000 (12:15 +0000)
  routers (i.e. gateways) without forgetting network topology
  (i.e. adding/deleting routes).

* Socknal and qswnal automatically notify their local router and make an
  upcall when they detect peer death.

* portals router load balances over equivalent routes

* ENETUNREACH returned when a NAL thinks the router isn't loaded.

* Improved socknal network failure detection.

* /proc/sys/socknal/* interface

  - timeout is the socknal I/O timeout (50 by default) in seconds .

  - eager_ack is a boolean (set by default) that enables setting
    TCP_QUICKACK after every incoming message to ensure peer zero-copy
    sends complete quickly.

  - zero_copy is the size (2k by default) in bytes, below which message
    fragments are copied into a socket, rather than using zero-copy sends.
    Setting this above PAGE_SIZE will disable zero copy.

* Socknal autoconnect option to create all peer connections eagerly or not.
  If more than one autoconnect reaches the same peer NID, with the eager
  option (this was the previous default), all connections will be made when
  one is required, otherwise only one at a time will be attempted (the new
  default).  NB socknal still load balances over all established
  connections.

* Generalised portals upcall to "cmd <action> [params]".  Current upcalls
  are...

  - upcall LBUG file fn line

  - upcall ROUTER_NOTIFY <nal> <nid> <timestamp> up|down
    where <nal> is the kernel NAL number (defined in <linux/kp30.h>)
          <nid> is the peer's NID in 0xhex
          <timestamp> is seconds since 1/1/1970

* Added 'lctl --net' option, to make 1-line network lctl commands easy.

* Added Light-weight event tracing.

42 files changed:
lnet/include/linux/kp30.h
lnet/include/lnet/lnetctl.h
lnet/include/lnet/ptlctl.h
lnet/klnds/qswlnd/qswlnd.c
lnet/klnds/qswlnd/qswlnd.h
lnet/klnds/qswlnd/qswlnd_cb.c
lnet/klnds/socklnd/socklnd.c
lnet/klnds/socklnd/socklnd.h
lnet/klnds/socklnd/socklnd_cb.c
lnet/klnds/toelnd/toenal_cb.c
lnet/libcfs/Makefile.am
lnet/libcfs/debug.c
lnet/libcfs/lwt.c [new file with mode: 0644]
lnet/libcfs/module.c
lnet/router/router.c
lnet/router/router.h
lnet/utils/parser.c
lnet/utils/portals.c
lnet/utils/ptlctl.c
lustre/portals/include/linux/kp30.h
lustre/portals/include/portals/ptlctl.h
lustre/portals/knals/qswnal/qswnal.c
lustre/portals/knals/qswnal/qswnal.h
lustre/portals/knals/qswnal/qswnal_cb.c
lustre/portals/knals/socknal/socknal.c
lustre/portals/knals/socknal/socknal.h
lustre/portals/knals/socknal/socknal_cb.c
lustre/portals/knals/toenal/toenal_cb.c
lustre/portals/libcfs/Makefile.am
lustre/portals/libcfs/debug.c
lustre/portals/libcfs/lwt.c [new file with mode: 0644]
lustre/portals/libcfs/module.c
lustre/portals/router/router.c
lustre/portals/router/router.h
lustre/portals/utils/parser.c
lustre/portals/utils/portals.c
lustre/portals/utils/ptlctl.c
lustre/utils/lconf
lustre/utils/lctl.c
lustre/utils/obd.c
lustre/utils/obdctl.h
lustre/utils/parser.c

index 750d16c..0a1cc57 100644 (file)
@@ -371,12 +371,14 @@ typedef struct {
 } kpr_fwd_desc_t;
 
 typedef void  (*kpr_fwd_t)(void *arg, kpr_fwd_desc_t *fwd);
+typedef void  (*kpr_notify_t)(void *arg, ptl_nid_t peer, int alive);
 
 /* NAL's routing interface (Kernel Portals Routing Nal Interface) */
 typedef const struct {
         int             kprni_nalid;    /* NAL's id */
         void           *kprni_arg;      /* Arg to pass when calling into NAL */
         kpr_fwd_t       kprni_fwd;      /* NAL's forwarding entrypoint */
+        kpr_notify_t    kprni_notify;   /* NAL's notification entrypoint */
 } kpr_nal_interface_t;
 
 /* Router's routing interface (Kernel Portals Routing Router Interface) */
@@ -386,9 +388,10 @@ typedef const struct {
         int     (*kprri_register) (kpr_nal_interface_t *nal_interface,
                                    void **router_arg);
 
-        /* ask the router to find a gateway that forwards to 'nid' and is a peer
-         * of the calling NAL */
-        int     (*kprri_lookup) (void *router_arg, ptl_nid_t nid,
+        /* ask the router to find a gateway that forwards to 'nid' and is a
+         * peer of the calling NAL; assume caller will send 'nob' bytes of
+         * payload there */
+        int     (*kprri_lookup) (void *router_arg, ptl_nid_t nid, int nob,
                                  ptl_nid_t *gateway_nid);
 
         /* hand a packet over to the router for forwarding */
@@ -398,6 +401,10 @@ typedef const struct {
         void    (*kprri_fwd_done) (void *router_arg, kpr_fwd_desc_t *fwd,
                                    int error);
 
+        /* notify the router about peer state */
+        void    (*kprri_notify) (void *router_arg, ptl_nid_t peer,
+                                 int alive, time_t when);
+
         /* the calling NAL is shutting down */
         void    (*kprri_shutdown) (void *router_arg);
 
@@ -416,10 +423,14 @@ typedef struct {
 typedef const struct {
         int     (*kprci_add_route)(int gateway_nal, ptl_nid_t gateway_nid,
                                    ptl_nid_t lo_nid, ptl_nid_t hi_nid);
-        int     (*kprci_del_route)(ptl_nid_t nid);
+        int     (*kprci_del_route)(int gateway_nal, ptl_nid_t gateway_nid,
+                                   ptl_nid_t lo_nid, ptl_nid_t hi_nid);
         int     (*kprci_get_route)(int index, int *gateway_nal,
-                                   ptl_nid_t *gateway, ptl_nid_t *lo_nid,
-                                   ptl_nid_t *hi_nid);
+                                   ptl_nid_t *gateway,
+                                   ptl_nid_t *lo_nid, ptl_nid_t *hi_nid,
+                                   int *alive);
+        int     (*kprci_notify)(int gateway_nal, ptl_nid_t gateway_nid, 
+                                int alive, time_t when);
 } kpr_control_interface_t;
 
 extern kpr_control_interface_t  kpr_control_interface;
@@ -449,12 +460,12 @@ kpr_routing (kpr_router_t *router)
 }
 
 static inline int
-kpr_lookup (kpr_router_t *router, ptl_nid_t nid, ptl_nid_t *gateway_nid)
+kpr_lookup (kpr_router_t *router, ptl_nid_t nid, int nob, ptl_nid_t *gateway_nid)
 {
         if (!kpr_routing (router))
-                return (-EHOSTUNREACH);
+                return (-ENETUNREACH);
 
-        return (router->kpr_interface->kprri_lookup(router->kpr_arg, nid,
+        return (router->kpr_interface->kprri_lookup(router->kpr_arg, nid, nob,
                                                     gateway_nid));
 }
 
@@ -476,7 +487,7 @@ static inline void
 kpr_fwd_start (kpr_router_t *router, kpr_fwd_desc_t *fwd)
 {
         if (!kpr_routing (router))
-                fwd->kprfd_callback (fwd->kprfd_callback_arg, -EHOSTUNREACH);
+                fwd->kprfd_callback (fwd->kprfd_callback_arg, -ENETUNREACH);
         else
                 router->kpr_interface->kprri_fwd_start (router->kpr_arg, fwd);
 }
@@ -489,6 +500,16 @@ kpr_fwd_done (kpr_router_t *router, kpr_fwd_desc_t *fwd, int error)
 }
 
 static inline void
+kpr_notify (kpr_router_t *router, 
+            ptl_nid_t peer, int alive, time_t when)
+{
+        if (!kpr_routing (router))
+                return;
+        
+        router->kpr_interface->kprri_notify(router->kpr_arg, peer, alive, when);
+}
+
+static inline void
 kpr_shutdown (kpr_router_t *router)
 {
         if (kpr_routing (router))
@@ -554,6 +575,7 @@ extern struct prof_ent prof_ents[MAX_PROFS];
 #endif /* PORTALS_PROFILING */
 
 /* debug.c */
+void portals_run_upcall(char **argv);
 void portals_run_lbug_upcall(char * file, const char *fn, const int line);
 void portals_debug_dumplog(void);
 int portals_debug_init(unsigned long bufsize);
@@ -622,6 +644,92 @@ extern void kportal_blockallsigs (void);
 # define CURRENT_TIME time(0)
 #endif
 
+/******************************************************************************/
+/* Light-weight trace 
+ * Support for temporary event tracing with minimal Heisenberg effect. */
+#define LWT_SUPPORT  1
+
+typedef struct {
+        cycles_t    lwte_when;
+        char       *lwte_where;
+        void       *lwte_task;
+        long        lwte_p1;
+        long        lwte_p2;
+        long        lwte_p3;
+        long        lwte_p4;
+} lwt_event_t;
+
+#if LWT_SUPPORT
+#ifdef __KERNEL__
+#define LWT_EVENTS_PER_PAGE (PAGE_SIZE / sizeof (lwt_event_t))
+
+typedef struct _lwt_page {
+        struct list_head     lwtp_list;
+        struct page         *lwtp_page;
+        lwt_event_t         *lwtp_events;
+} lwt_page_t;
+
+typedef struct {
+        int                lwtc_current_index;
+        lwt_page_t        *lwtc_current_page;
+} lwt_cpu_t;
+
+extern int       lwt_enabled;
+extern lwt_cpu_t lwt_cpus[];
+
+extern int  lwt_init (void);
+extern void lwt_fini (void);
+extern int  lwt_lookup_string (int *size, char *knlptr,
+                               char *usrptr, int usrsize);
+extern int  lwt_control (int enable, int clear);
+extern int  lwt_snapshot (int *ncpu, int *total_size,
+                          void *user_ptr, int user_size);
+
+/* Note that we _don't_ define LWT_EVENT at all if LWT_SUPPORT isn't set.
+ * This stuff is meant for finding specific problems; it never stays in
+ * production code... */
+
+#define LWTSTR(n)      #n
+#define LWTWHERE(f,l)   f ":" LWTSTR(l)
+
+#define LWT_EVENT(p1, p2, p3, p4)                                       \
+do {                                                                    \
+        unsigned long    flags;                                         \
+        lwt_cpu_t       *cpu;                                           \
+        lwt_page_t      *p;                                             \
+        lwt_event_t     *e;                                             \
+                                                                        \
+        local_irq_save (flags);                                         \
+                                                                        \
+        if (lwt_enabled) {                                              \
+                cpu = &lwt_cpus[smp_processor_id()];                    \
+                p = cpu->lwtc_current_page;                             \
+                e = &p->lwtp_events[cpu->lwtc_current_index++];         \
+                                                                        \
+                if (cpu->lwtc_current_index >= LWT_EVENTS_PER_PAGE) {   \
+                        cpu->lwtc_current_page =                        \
+                                list_entry (p->lwtp_list.next,          \
+                                            lwt_page_t, lwtp_list);     \
+                        cpu->lwtc_current_index = 0;                    \
+                }                                                       \
+                                                                        \
+                e->lwte_when  = get_cycles();                           \
+                e->lwte_where = LWTWHERE(__FILE__,__LINE__);            \
+                e->lwte_task  = current;                                \
+                e->lwte_p1    = (long)(p1);                             \
+                e->lwte_p2    = (long)(p2);                             \
+                e->lwte_p3    = (long)(p3);                             \
+                e->lwte_p4    = (long)(p4);                             \
+        }                                                               \
+                                                                        \
+        local_irq_restore (flags);                                      \
+} while (0)
+#else  /* __KERNEL__ */
+#define LWT_EVENT(p1,p2,p3,p4)     /* no userland implementation yet */
+#endif /* __KERNEL__ */
+#endif /* LWT_SUPPORT */
+
+
 #include <linux/portals_lib.h>
 
 /*
@@ -856,8 +964,11 @@ static inline int portal_ioctl_getdata(char *buf, char *end, void *arg)
 #define IOC_PORTAL_GET_NID                 _IOWR('e', 39, long)
 #define IOC_PORTAL_FAIL_NID                _IOWR('e', 40, long)
 #define IOC_PORTAL_SET_DAEMON              _IOWR('e', 41, long)
-
-#define IOC_PORTAL_MAX_NR               41
+#define IOC_PORTAL_NOTIFY_ROUTER           _IOWR('e', 42, long)
+#define IOC_PORTAL_LWT_CONTROL             _IOWR('e', 43, long)
+#define IOC_PORTAL_LWT_SNAPSHOT            _IOWR('e', 44, long)
+#define IOC_PORTAL_LWT_LOOKUP_STRING       _IOWR('e', 45, long)
+#define IOC_PORTAL_MAX_NR                             45
 
 enum {
         QSWNAL  =  1,
index 8278111..7763f1b 100644 (file)
@@ -54,8 +54,10 @@ int jt_ptl_txmem (int argc, char **argv);
 int jt_ptl_nagle (int argc, char **argv);
 int jt_ptl_add_route (int argc, char **argv);
 int jt_ptl_del_route (int argc, char **argv);
+int jt_ptl_notify_router (int argc, char **argv);
 int jt_ptl_print_routes (int argc, char **argv);
 int jt_ptl_fail_nid (int argc, char **argv);
+int jt_ptl_lwt(int argc, char **argv);
 
 int dbg_initialize(int argc, char **argv);
 int jt_dbg_filter(int argc, char **argv);
index 8278111..7763f1b 100644 (file)
@@ -54,8 +54,10 @@ int jt_ptl_txmem (int argc, char **argv);
 int jt_ptl_nagle (int argc, char **argv);
 int jt_ptl_add_route (int argc, char **argv);
 int jt_ptl_del_route (int argc, char **argv);
+int jt_ptl_notify_router (int argc, char **argv);
 int jt_ptl_print_routes (int argc, char **argv);
 int jt_ptl_fail_nid (int argc, char **argv);
+int jt_ptl_lwt(int argc, char **argv);
 
 int dbg_initialize(int argc, char **argv);
 int jt_dbg_filter(int argc, char **argv);
index b5e1e39..0841d64 100644 (file)
@@ -32,6 +32,7 @@ kpr_nal_interface_t kqswnal_router_interface = {
        kprni_nalid:    QSWNAL,
        kprni_arg:      NULL,
        kprni_fwd:      kqswnal_fwd_packet,
+       kprni_notify:   NULL,                   /* we're connectionless */
 };
 
 
index 294f3e5..0d8e2fa 100644 (file)
@@ -164,6 +164,7 @@ typedef struct
         void             *ktx_args[2];          /* completion passthru */
         E3_Addr           ktx_ebuffer;          /* elan address of ktx_buffer */
         char             *ktx_buffer;           /* pre-allocated contiguous buffer for hdr + small payloads */
+        unsigned long     ktx_launchtime;       /* when (in jiffies) the transmit was launched */
 
         /* debug/info fields */
         pid_t             ktx_launcher;         /* pid of launching process */
index 8d5f70b..99f299f 100644 (file)
@@ -421,7 +421,9 @@ static void
 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 {
         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
-
+        struct timeval     now;
+        time_t             then;
+        
         LASSERT (txd != NULL);
         LASSERT (ktx != NULL);
 
@@ -432,7 +434,15 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 
         if (status != EP_SUCCESS)
         {
-                CERROR ("kqswnal: Transmit failed with %d\n", status);
+                CERROR ("Tx completion to "LPX64" failed: %d\n", 
+                        ktx->ktx_nid, status);
+
+                do_gettimeofday (&now);
+                then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
+        
+                kpr_notify (&kqswnal_data.kqn_router, 
+                            ktx->ktx_nid, 0, then);
+
                 status = -EIO;
         }
 
@@ -447,33 +457,40 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
         long  flags;
         int   rc;
-        
+
+        ktx->ktx_launchtime = jiffies;
+
         LASSERT (dest >= 0);                    /* must be a peer */
         rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
                                ktx->ktx_port, attr, kqswnal_txhandler,
                                ktx, ktx->ktx_iov, ktx->ktx_niov);
-        if (rc == 0)
+        switch (rc) {
+        case 0: /* success */
                 atomic_inc (&kqswnal_packets_launched);
+                return (0);
 
-        if (rc != ENOMEM)
-                return (rc);
+        case ENOMEM: /* can't allocate ep txd => queue for later */
+                LASSERT (in_interrupt());
 
-        /* can't allocate ep txd => queue for later */
+                spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
-        LASSERT (in_interrupt());      /* not called by thread (not looping) */
+                list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
+                if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
+                        wake_up (&kqswnal_data.kqn_sched_waitq);
 
-        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+                spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+                return (0);
 
-        list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
-        if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
-                wake_up (&kqswnal_data.kqn_sched_waitq);
-
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+        default: /* fatal error */
+                CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
 
-        return (0);
+                /* Tell router I think a node is down */
+                kpr_notify (&kqswnal_data.kqn_router, ktx->ktx_nid,
+                            0, ktx->ktx_launchtime);
+                return (rc);
+        }
 }
 
-
 static char *
 hdr_type_string (ptl_hdr_t *hdr)
 {
@@ -584,7 +601,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         }
 
         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
-                rc = kpr_lookup (&kqswnal_data.kqn_router, nid, &gatewaynid);
+                rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
+                                 sizeof (ptl_hdr_t) + payload_nob, &gatewaynid);
                 if (rc != 0) {
                         CERROR("Can't route to "LPX64": router error %d\n",
                                nid, rc);
@@ -678,7 +696,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_FAIL);
         }
 
-        CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
+        CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
         return (PTL_OK);
 }
 
index e7232a0..c844dd6 100644 (file)
@@ -37,8 +37,34 @@ kpr_nal_interface_t ksocknal_router_interface = {
         kprni_nalid:      SOCKNAL,
         kprni_arg:        &ksocknal_data,
         kprni_fwd:        ksocknal_fwd_packet,
+        kprni_notify:     ksocknal_notify,
 };
 
+#define SOCKNAL_SYSCTL 200
+
+#define SOCKNAL_SYSCTL_TIMEOUT     1
+#define SOCKNAL_SYSCTL_EAGER_ACK   2
+#define SOCKNAL_SYSCTL_ZERO_COPY   3
+
+static ctl_table ksocknal_ctl_table[] = {
+        {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
+         &ksocknal_data.ksnd_io_timeout, sizeof (int),
+         0644, NULL, &proc_dointvec},
+        {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
+         &ksocknal_data.ksnd_eager_ack, sizeof (int),
+         0644, NULL, &proc_dointvec},
+#if SOCKNAL_ZC
+        {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy", 
+         &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
+         0644, NULL, &proc_dointvec},
+#endif
+        { 0 }
+};
+
+static ctl_table ksocknal_top_ctl_table[] = {
+        {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
+        { 0 }
+};
 
 int
 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
@@ -172,7 +198,7 @@ ksocknal_bind_irq (unsigned int irq)
 
 ksock_route_t *
 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
-                       int irq_affinity, int xchange_nids, int nonagel)
+                       int nonagel, int xchange_nids, int irq_affinity, int eager)
 {
         ksock_route_t *route;
 
@@ -183,7 +209,7 @@ ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
         atomic_set (&route->ksnr_refcount, 1);
         route->ksnr_sharecount = 0;
         route->ksnr_peer = NULL;
-        route->ksnr_timeout = jiffies_64;
+        route->ksnr_timeout = jiffies;
         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
         route->ksnr_ipaddr = ipaddr;
         route->ksnr_port = port;
@@ -191,6 +217,7 @@ ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
         route->ksnr_irq_affinity = irq_affinity;
         route->ksnr_xchange_nids = xchange_nids;
         route->ksnr_nonagel = nonagel;
+        route->ksnr_eager = eager;
         route->ksnr_connecting = 0;
         route->ksnr_deleted = 0;
         route->ksnr_generation = 0;
@@ -371,7 +398,8 @@ ksocknal_get_route_by_idx (int index)
 
 int
 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
-                    int nonagle, int xchange_nids, int bind_irq, int share)
+                    int nonagle, int xchange_nids, int bind_irq, 
+                    int share, int eager)
 {
         unsigned long      flags;
         ksock_peer_t      *peer;
@@ -388,8 +416,8 @@ ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
         if (peer == NULL)
                 return (-ENOMEM);
 
-        route = ksocknal_create_route (ipaddr, port, bufnob,
-                                       nonagle, xchange_nids, bind_irq);
+        route = ksocknal_create_route (ipaddr, port, bufnob, nonagle,
+                                       xchange_nids, bind_irq, eager);
         if (route == NULL) {
                 ksocknal_put_peer (peer);
                 return (-ENOMEM);
@@ -454,7 +482,7 @@ ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
 
         if (conn != NULL) {
                 if (!keep_conn)
-                        ksocknal_close_conn_locked (conn);
+                        ksocknal_close_conn_locked (conn, 0);
                 else {
                         /* keeping the conn; just dissociate it and route... */
                         conn->ksnc_route = NULL;
@@ -568,14 +596,12 @@ ksocknal_get_peer_addr (ksock_conn_t *conn)
         struct sockaddr_in sin;
         int                len = sizeof (sin);
         int                rc;
-
-        rc = ksocknal_getconnsock (conn);
-        LASSERT (rc == 0);
         
         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
                                             (struct sockaddr *)&sin, &len, 2);
+        /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
+        LASSERT (!conn->ksnc_closing);
         LASSERT (len <= sizeof (sin));
-        ksocknal_putconnsock (conn);
 
         if (rc != 0) {
                 CERROR ("Error %d getting sock peer IP\n", rc);
@@ -590,12 +616,8 @@ unsigned int
 ksocknal_conn_irq (ksock_conn_t *conn)
 {
         int                irq = 0;
-        int                rc;
         struct dst_entry  *dst;
 
-        rc = ksocknal_getconnsock (conn);
-        LASSERT (rc == 0);
-        
         dst = sk_dst_get (conn->ksnc_sock->sk);
         if (dst != NULL) {
                 if (dst->dev != NULL) {
@@ -608,7 +630,8 @@ ksocknal_conn_irq (ksock_conn_t *conn)
                 dst_release (dst);
         }
         
-        ksocknal_putconnsock (conn);
+        /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
+        LASSERT (!conn->ksnc_closing);
         return (irq);
 }
 
@@ -660,11 +683,13 @@ ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
         int                rc;
 
         /* NB, sock has an associated file since (a) this connection might
-         * have been created in userland and (b) we need the refcounting so
-         * that we don't close the socket while I/O is being done on it. */
+         * have been created in userland and (b) we need to refcount the
+         * socket so that we don't close it while I/O is being done on
+         * it, and sock->file has that pre-cooked... */
         LASSERT (sock->file != NULL);
+        LASSERT (file_count(sock->file) > 0);
 
-        rc = ksocknal_set_linger (sock);
+        rc = ksocknal_setup_sock (sock);
         if (rc != 0)
                 return (rc);
 
@@ -696,9 +721,6 @@ ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
         ksocknal_new_packet (conn, 0);
 
         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
-#if SOCKNAL_ZC
-        INIT_LIST_HEAD (&conn->ksnc_tx_pending);
-#endif
         conn->ksnc_tx_ready = 0;
         conn->ksnc_tx_scheduled = 0;
         atomic_set (&conn->ksnc_tx_nob, 0);
@@ -753,6 +775,8 @@ ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
 
         conn->ksnc_peer = peer;
         atomic_inc (&peer->ksnp_refcount);
+        peer->ksnp_last_alive = jiffies;
+        peer->ksnp_error = 0;
 
         list_add (&conn->ksnc_list, &peer->ksnp_conns);
         atomic_inc (&conn->ksnc_refcount);
@@ -797,7 +821,7 @@ ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
 }
 
 void
-ksocknal_close_conn_locked (ksock_conn_t *conn)
+ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
 {
         /* This just does the immmediate housekeeping, and queues the
          * connection for the reaper to terminate. 
@@ -805,6 +829,7 @@ ksocknal_close_conn_locked (ksock_conn_t *conn)
         ksock_peer_t   *peer = conn->ksnc_peer;
         ksock_route_t  *route;
 
+        LASSERT (peer->ksnp_error == 0);
         LASSERT (!conn->ksnc_closing);
         conn->ksnc_closing = 1;
         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
@@ -825,11 +850,16 @@ ksocknal_close_conn_locked (ksock_conn_t *conn)
         /* ksnd_deathrow_conns takes over peer's ref */
         list_del (&conn->ksnc_list);
 
-        if (list_empty (&peer->ksnp_conns) &&
-            list_empty (&peer->ksnp_routes)) {
-                /* I've just closed last conn belonging to a
-                 * non-autoconnecting peer */
-                ksocknal_unlink_peer_locked (peer);
+        if (list_empty (&peer->ksnp_conns)) {
+                /* No more connections to this peer */
+
+                peer->ksnp_error = error;       /* stash last conn close reason */
+
+                if (list_empty (&peer->ksnp_routes)) {
+                        /* I've just closed last conn belonging to a
+                         * non-autoconnecting peer */
+                        ksocknal_unlink_peer_locked (peer);
+                }
         }
 
         spin_lock (&ksocknal_data.ksnd_reaper_lock);
@@ -842,7 +872,7 @@ ksocknal_close_conn_locked (ksock_conn_t *conn)
 }
 
 int
-ksocknal_close_conn_unlocked (ksock_conn_t *conn) 
+ksocknal_close_conn_unlocked (ksock_conn_t *conn, int why
 {
         unsigned long flags;
         int           did_it = 0;
@@ -851,7 +881,7 @@ ksocknal_close_conn_unlocked (ksock_conn_t *conn)
                 
         if (!conn->ksnc_closing) {
                 did_it = 1;
-                ksocknal_close_conn_locked (conn);
+                ksocknal_close_conn_locked (conn, why);
         }
         
         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
@@ -867,6 +897,10 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
          * ksnc_refcount will eventually hit zero, and then the reaper will
          * destroy it. */
         unsigned long   flags;
+        ksock_peer_t   *peer = conn->ksnc_peer;
+        struct timeval  now;
+        time_t          then = 0;
+        int             notify = 0;
 
         /* serialise with callbacks */
         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
@@ -886,6 +920,16 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
 
         conn->ksnc_scheduler->kss_nconns--;
 
+        if (peer->ksnp_error != 0) {
+                /* peer's last conn closed in error */
+                LASSERT (list_empty (&peer->ksnp_conns));
+                
+                /* convert peer's last-known-alive timestamp from jiffies */
+                do_gettimeofday (&now);
+                then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
+                notify = 1;
+        }
+        
         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
 
         /* The socket is closed on the final put; either here, or in
@@ -894,6 +938,10 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
          * immediately, aborting anything buffered in it. Any hung
          * zero-copy transmits will therefore complete in finite time. */
         ksocknal_putconnsock (conn);
+
+        if (notify)
+                kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
+                            0, then);
 }
 
 void
@@ -906,9 +954,7 @@ ksocknal_destroy_conn (ksock_conn_t *conn)
         LASSERT (conn->ksnc_route == NULL);
         LASSERT (!conn->ksnc_tx_scheduled);
         LASSERT (!conn->ksnc_rx_scheduled);
-#if SOCKNAL_ZC
-        LASSERT (list_empty (&conn->ksnc_tx_pending));
-#endif
+
         /* complete queued packets */
         while (!list_empty (&conn->ksnc_tx_queue)) {
                 ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
@@ -1010,7 +1056,7 @@ ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
                                         continue;
 
                                 rc = 0;
-                                ksocknal_close_conn_locked (conn);
+                                ksocknal_close_conn_locked (conn, 0);
                         }
                 }
         }
@@ -1020,6 +1066,24 @@ ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
         return (rc);
 }
 
+void
+ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
+{
+        /* The router is telling me she's been notified of a change in
+         * gateway state.... */
+
+        CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
+
+        if (!alive) {
+                /* If the gateway crashed, close all open connections... */
+                ksocknal_close_conn (gw_nid, 0);
+                return;
+        }
+        
+        /* ...otherwise do nothing.  We can only establish new connections
+         * if we have autroutes, and these connect on demand. */
+}
+
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 struct tcp_opt *sock2tcp_opt(struct sock *sk)
 {
@@ -1177,7 +1241,8 @@ ksocknal_cmd(struct portal_ioctl_data * data, void * private)
                         data->ioc_wait  = route->ksnr_sharecount;
                         data->ioc_flags = (route->ksnr_nonagel      ? 1 : 0) |
                                           (route->ksnr_xchange_nids ? 2 : 0) |
-                                          (route->ksnr_irq_affinity ? 4 : 0);
+                                          (route->ksnr_irq_affinity ? 4 : 0) |
+                                          (route->ksnr_eager        ? 8 : 0);
                         ksocknal_put_route (route);
                 }
                 break;
@@ -1185,10 +1250,11 @@ ksocknal_cmd(struct portal_ioctl_data * data, void * private)
         case NAL_CMD_ADD_AUTOCONN: {
                 rc = ksocknal_add_route (data->ioc_nid, data->ioc_id,
                                          data->ioc_misc, data->ioc_size,
-                                         (data->ioc_flags & 1) != 0,
-                                         (data->ioc_flags & 2) != 0,
-                                         (data->ioc_flags & 4) != 0,
-                                         (data->ioc_flags & 8) != 0);
+                                         (data->ioc_flags & 0x01) != 0,
+                                         (data->ioc_flags & 0x02) != 0,
+                                         (data->ioc_flags & 0x04) != 0,
+                                         (data->ioc_flags & 0x08) != 0,
+                                         (data->ioc_flags & 0x10) != 0);
                 break;
         }
         case NAL_CMD_DEL_AUTOCONN: {
@@ -1287,6 +1353,10 @@ ksocknal_module_fini (void)
                 LASSERT (0);
 
         case SOCKNAL_INIT_ALL:
+#if CONFIG_SYSCTL
+                if (ksocknal_data.ksnd_sysctl != NULL)
+                        unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
+#endif
                 kportal_nal_unregister(SOCKNAL);
                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
                 /* fall through */
@@ -1364,6 +1434,9 @@ ksocknal_module_init (void)
 
         /* packet descriptor must fit in a router descriptor's scratchpad */
         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
+        /* the following must be sizeof(int) for proc_dointvec() */
+        LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
+        LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
 
         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
 
@@ -1379,6 +1452,12 @@ ksocknal_module_init (void)
 
         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
 
+        ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
+        ksocknal_data.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
+#if SOCKNAL_ZC
+        ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
+#endif
+
         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
@@ -1561,9 +1640,13 @@ ksocknal_module_init (void)
 
         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
 
+#ifdef CONFIG_SYSCTL
+        /* Press on regardless even if registering sysctl doesn't work */
+        ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
+#endif
         /* flag everything initialised */
         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
-
+        
         printk(KERN_INFO "Routing socket NAL loaded (Routing %s, initial "
                "mem %d)\n",
                kpr_routing (&ksocknal_data.ksnd_router) ?
index 69daa02..abd8e7b 100644 (file)
@@ -48,6 +48,7 @@
 #include <linux/stat.h>
 #include <linux/list.h>
 #include <linux/kmod.h>
+#include <linux/sysctl.h>
 #include <asm/uaccess.h>
 #include <asm/segment.h>
 #include <asm/div64.h>
 #define SOCKNAL_MIN_RECONNECT_INTERVAL HZ      /* first failed connection retry... */
 #define SOCKNAL_MAX_RECONNECT_INTERVAL (60*HZ) /* ...exponentially increasing to this */
 
-#define SOCKNAL_IO_TIMEOUT      (60*HZ)         /* default comms timeout */
+/* default vals for runtime tunables */
+#define SOCKNAL_IO_TIMEOUT       50             /* default comms timeout (seconds) */
+#define SOCKNAL_EAGER_ACK        1              /* default eager ack (boolean) */
+#define SOCKNAL_ZC_MIN_FRAG     (2<<10)         /* default smallest zerocopy fragment */
+
+#define SOCKNAL_USE_KEEPALIVES   0              /* use tcp/ip keepalive? */
 
 #define SOCKNAL_PEER_HASH_SIZE   101            /* # peer lists */
 
@@ -78,8 +84,6 @@
 # define SOCKNAL_MAX_FWD_PAYLOAD (64<<10)       /* biggest payload I can forward */
 #endif
 
-#define SOCKNAL_ZC_MIN_FRAG    (2<<10)          /* default smallest zerocopy fragment */
-
 #define SOCKNAL_NLTXS           128             /* # normal transmit messages */
 #define SOCKNAL_NNBLK_LTXS     128             /* # transmit messages reserved if can't block */
 
 
 #define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10)
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-# define jiffies_64  jiffies
-#endif
-
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,72))
 # define sk_data_ready data_ready
 # define sk_write_space write_space
@@ -136,6 +136,12 @@ typedef struct {
 
 typedef struct {
         int               ksnd_init;            /* initialisation state */
+        int               ksnd_io_timeout;      /* "stuck" socket timeout (seconds) */
+        int               ksnd_eager_ack;       /* make TCP ack eagerly? */
+#if SOCKNAL_ZC
+        unsigned int      ksnd_zc_min_frag;     /* minimum zero copy frag size */
+#endif
+        struct ctl_table_header *ksnd_sysctl;   /* sysctl interface */
         
         rwlock_t          ksnd_global_lock;     /* stabilize peer/conn ops */
         struct list_head *ksnd_peers;           /* hash table of all my known peers */
@@ -205,7 +211,6 @@ struct ksock_route;                             /* forward ref */
 typedef struct                                  /* transmit packet */
 {
         struct list_head        tx_list;        /* queue on conn for transmission etc */
-        __u64                   tx_deadline;    /* when (in jiffies) tx times out */
         char                    tx_isfwd;       /* forwarding / sourced here */
         int                     tx_nob;         /* # packet bytes */
         int                     tx_resid;       /* residual bytes */
@@ -291,10 +296,11 @@ typedef struct ksock_conn
         __u32               ksnc_ipaddr;        /* peer's IP */
         int                 ksnc_port;          /* peer's port */
         int                 ksnc_closing;       /* being shut down */
-
+        
         /* READER */
         struct list_head    ksnc_rx_list;       /* where I enq waiting input or a forwarding descriptor */
-        __u64               ksnc_rx_deadline;   /* when receive times out */
+        unsigned long       ksnc_rx_deadline;   /* when (in jiffies) receive times out */
+        int                 ksnc_rx_started;    /* started receiving a message */
         int                 ksnc_rx_ready;      /* data ready to read */
         int                 ksnc_rx_scheduled;  /* being progressed */
         int                 ksnc_rx_state;      /* what is being read */
@@ -311,9 +317,7 @@ typedef struct ksock_conn
         /* WRITER */
         struct list_head    ksnc_tx_list;       /* where I enq waiting for output space */
         struct list_head    ksnc_tx_queue;      /* packets waiting to be sent */
-#if SOCKNAL_ZC
-        struct list_head    ksnc_tx_pending;    /* zc packets pending callback */
-#endif
+        unsigned long       ksnc_tx_deadline;   /* when (in jiffies) tx times out */
         atomic_t            ksnc_tx_nob;        /* # bytes queued */
         int                 ksnc_tx_ready;      /* write space */
         int                 ksnc_tx_scheduled;  /* being progressed */
@@ -326,7 +330,7 @@ typedef struct ksock_route
         struct ksock_peer  *ksnr_peer;          /* owning peer */
         atomic_t            ksnr_refcount;      /* # users */
         int                 ksnr_sharecount;    /* lconf usage counter */
-        __u64               ksnr_timeout;       /* when reconnection can happen next */
+        unsigned long       ksnr_timeout;       /* when (in jiffies) reconnection can happen next */
         unsigned int        ksnr_retry_interval; /* how long between retries */
         __u32               ksnr_ipaddr;        /* an IP address for this peer */
         int                 ksnr_port;          /* port to connect to */
@@ -334,8 +338,9 @@ typedef struct ksock_route
         unsigned int        ksnr_irq_affinity:1; /* set affinity? */
         unsigned int        ksnr_xchange_nids:1; /* do hello protocol? */
         unsigned int        ksnr_nonagel:1;     /* disable nagle? */
-        unsigned int        ksnr_connecting;    /* autoconnect in progress? */
-        unsigned int        ksnr_deleted;       /* been removed from peer? */
+        unsigned int        ksnr_eager:1;       /* connect eagery? */
+        unsigned int        ksnr_connecting:1;  /* autoconnect in progress? */
+        unsigned int        ksnr_deleted:1;     /* been removed from peer? */
         int                 ksnr_generation;    /* connection incarnation # */
         ksock_conn_t       *ksnr_conn;          /* NULL/active connection */
 } ksock_route_t;
@@ -346,13 +351,14 @@ typedef struct ksock_peer
         ptl_nid_t           ksnp_nid;           /* who's on the other end(s) */
         atomic_t            ksnp_refcount;      /* # users */
         int                 ksnp_closing;       /* being closed */
+        int                 ksnp_error;         /* errno on closing last conn */
         struct list_head    ksnp_conns;         /* all active connections */
         struct list_head    ksnp_routes;        /* routes */
         struct list_head    ksnp_tx_queue;      /* waiting packets */
+        unsigned long       ksnp_last_alive;    /* when (in jiffies) I was last alive */
 } ksock_peer_t;
 
 
-
 extern nal_cb_t         ksocknal_lib;
 extern ksock_nal_data_t ksocknal_data;
 
@@ -393,8 +399,8 @@ extern int ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr,
                                int single, int keep_conn);
 extern int ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
                                  struct socket *sock, int bind_irq);
-extern void ksocknal_close_conn_locked (ksock_conn_t *conn);
-extern int ksocknal_close_conn_unlocked (ksock_conn_t *conn);
+extern void ksocknal_close_conn_locked (ksock_conn_t *conn, int why);
+extern int ksocknal_close_conn_unlocked (ksock_conn_t *conn, int why);
 extern void ksocknal_terminate_conn (ksock_conn_t *conn);
 extern void ksocknal_destroy_conn (ksock_conn_t *conn);
 extern void ksocknal_put_conn (ksock_conn_t *conn);
@@ -404,6 +410,7 @@ extern void ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn);
 extern void ksocknal_tx_done (ksock_tx_t *tx, int asynch);
 extern void ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
 extern void ksocknal_fmb_callback (void *arg, int error);
+extern void ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive);
 extern int ksocknal_thread_start (int (*fn)(void *arg), void *arg);
 extern int ksocknal_new_packet (ksock_conn_t *conn, int skip);
 extern int ksocknal_scheduler (void *arg);
@@ -411,4 +418,4 @@ extern void ksocknal_data_ready(struct sock *sk, int n);
 extern void ksocknal_write_space(struct sock *sk);
 extern int ksocknal_autoconnectd (void *arg);
 extern int ksocknal_reaper (void *arg);
-extern int ksocknal_set_linger (struct socket *sock);
+extern int ksocknal_setup_sock (struct socket *sock);
index 656a0c5..f946d87 100644 (file)
 
 #include "socknal.h"
 
-int        ksocknal_io_timeout = SOCKNAL_IO_TIMEOUT;
-#if SOCKNAL_ZC
-int        ksocknal_do_zc = 1;
-int        ksocknal_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
-#endif
-
 /*
  *  LIB functions follow
  *
@@ -225,7 +219,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         struct iovec  *iov = tx->tx_iov;
         int            fragsize = iov->iov_len;
         unsigned long  vaddr = (unsigned long)iov->iov_base;
-        int            more = !list_empty (&conn->ksnc_tx_queue) |
+        int            more = (!list_empty (&conn->ksnc_tx_queue)) |
                               (tx->tx_niov > 1) |
                               (tx->tx_nkiov > 1);
 #if SOCKNAL_ZC
@@ -241,10 +235,9 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (tx->tx_niov > 0);
         
 #if SOCKNAL_ZC
-        if (ksocknal_do_zc &&
+        if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
-            zcsize >= ksocknal_zc_min_frag &&
             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
                 
                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
@@ -306,7 +299,7 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
         int            fragsize = kiov->kiov_len;
         struct page   *page = kiov->kiov_page;
         int            offset = kiov->kiov_offset;
-        int            more = !list_empty (&conn->ksnc_tx_queue) |
+        int            more = (!list_empty (&conn->ksnc_tx_queue)) |
                               (tx->tx_nkiov > 1);
         int            rc;
 
@@ -318,10 +311,9 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (tx->tx_nkiov > 0);
 
 #if SOCKNAL_ZC
-        if (ksocknal_do_zc &&
+        if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
-            (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
-            fragsize >= ksocknal_zc_min_frag) {
+            (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
 
                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
                                page, offset, fragsize);
@@ -349,6 +341,7 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
                 set_fs (KERNEL_DS);
                 rc = sock_sendmsg(sock, &msg, fragsize);
                 set_fs (oldmm);
+
                 kunmap (page);
         }
 
@@ -410,6 +403,16 @@ ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
                         break;
                 }
 
+                /* Consider the connection alive since we managed to chuck
+                 * more data into it.  Really, we'd like to consider it
+                 * alive only when the peer ACKs something, but
+                 * write_space() only gets called back while SOCK_NOSPACE
+                 * is set.  Instead, we presume peer death has occurred if
+                 * the socket doesn't drain within a timout */
+                conn->ksnc_tx_deadline = jiffies + 
+                                         ksocknal_data.ksnd_io_timeout * HZ;
+                conn->ksnc_peer->ksnp_last_alive = jiffies;
+
                 if (tx->tx_resid == 0) {        /* sent everything */
                         rc = 0;
                         break;
@@ -420,6 +423,24 @@ ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
         RETURN (rc);
 }
 
+void
+ksocknal_eager_ack (ksock_conn_t *conn)
+{
+        int            opt = 1;
+        mm_segment_t   oldmm = get_fs();
+        struct socket *sock = conn->ksnc_sock;
+        
+        /* Remind the socket to ACK eagerly.  If I don't, the socket might
+         * think I'm about to send something it could piggy-back the ACK
+         * on, introducing delay in completing zero-copy sends in my
+         * peer. */
+
+        set_fs(KERNEL_DS);
+        sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
+                               (char *)&opt, sizeof (opt));
+        set_fs(oldmm);
+}
+
 int
 ksocknal_recv_iov (ksock_conn_t *conn)
 {
@@ -453,6 +474,13 @@ ksocknal_recv_iov (ksock_conn_t *conn)
         if (rc <= 0)
                 return (rc);
 
+        /* received something... */
+        conn->ksnc_peer->ksnp_last_alive = jiffies;
+        conn->ksnc_rx_deadline = jiffies + 
+                                 ksocknal_data.ksnd_io_timeout * HZ;
+        mb();                           /* order with setting rx_started */
+        conn->ksnc_rx_started = 1;
+
         conn->ksnc_rx_nob_wanted -= rc;
         conn->ksnc_rx_nob_left -= rc;
                 
@@ -499,11 +527,19 @@ ksocknal_recv_kiov (ksock_conn_t *conn)
         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
         /* NB this is just a boolean............................^ */
         set_fs (oldmm);
+
         kunmap (page);
         
         if (rc <= 0)
                 return (rc);
         
+        /* received something... */
+        conn->ksnc_peer->ksnp_last_alive = jiffies;
+        conn->ksnc_rx_deadline = jiffies + 
+                                 ksocknal_data.ksnd_io_timeout * HZ;
+        mb();                           /* order with setting rx_started */
+        conn->ksnc_rx_started = 1;
+
         conn->ksnc_rx_nob_wanted -= rc;
         conn->ksnc_rx_nob_left -= rc;
                 
@@ -541,20 +577,33 @@ ksocknal_recvmsg (ksock_conn_t *conn)
                         rc = -ESHUTDOWN;
                         break;
                 }
-                
+
                 if (conn->ksnc_rx_niov != 0)
                         rc = ksocknal_recv_iov (conn);
                 else
                         rc = ksocknal_recv_kiov (conn);
-                
+
                 if (rc <= 0) {
                         /* error/EOF or partial receive */
-                        if (rc == -EAGAIN)
+                        if (rc == -EAGAIN) {
                                 rc = 1;
+                        } else if (rc == 0 && conn->ksnc_rx_started) {
+                                /* EOF in the middle of a message */
+                                rc = -EPROTO;
+                        }
                         break;
                 }
 
+                /* Completed a fragment */
+
                 if (conn->ksnc_rx_nob_wanted == 0) {
+                        /* Completed a message segment (header or payload) */
+                        if (ksocknal_data.ksnd_eager_ack &&
+                            (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
+                             conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
+                                /* Remind the socket to ack eagerly... */
+                                ksocknal_eager_ack(conn);
+                        }
                         rc = 1;
                         break;
                 }
@@ -577,7 +626,6 @@ ksocknal_zc_callback (zccd_t *zcd)
 
         spin_lock_irqsave (&sched->kss_lock, flags);
 
-        list_del (&tx->tx_list); /* remove from kss_zctxpending_list */
         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
         if (waitqueue_active (&sched->kss_waitq))
                 wake_up (&sched->kss_waitq);
@@ -628,23 +676,12 @@ ksocknal_tx_launched (ksock_tx_t *tx)
 {
 #if SOCKNAL_ZC
         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
-                unsigned long  flags;
                 ksock_conn_t  *conn = tx->tx_conn;
-                ksock_sched_t *sched = conn->ksnc_scheduler;
                 
                 /* zccd skbufs are still in-flight.  First take a ref on
                  * conn, so it hangs about for ksocknal_tx_done... */
                 atomic_inc (&conn->ksnc_refcount);
 
-                /* Stash it for timeout...
-                 * NB We have to hold a lock to stash the tx, and we have
-                 * stash it before we zcc_put(), but we have to _not_ hold
-                 * this lock when we zcc_put(), otherwise we could deadlock
-                 * if it turns out to be the last put. Aaaaarrrrggghhh! */
-                spin_lock_irqsave (&sched->kss_lock, flags);
-                list_add_tail (&tx->tx_list, &conn->ksnc_tx_pending);
-                spin_unlock_irqrestore (&sched->kss_lock, flags);
-                
                 /* ...then drop the initial ref on zccd, so the zero copy
                  * callback can occur */
                 zccd_put (&tx->tx_zccd);
@@ -659,10 +696,10 @@ ksocknal_tx_launched (ksock_tx_t *tx)
 void
 ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
 {
-        ksock_conn_t *conn;
-        ksock_tx_t *tx;
-        int         rc;
-
+        ksock_conn_t  *conn;
+        ksock_tx_t    *tx;
+        int            rc;
+        
         LASSERT (!list_empty (&sched->kss_tx_conns));
         conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
         list_del (&conn->ksnc_tx_list);
@@ -686,7 +723,7 @@ ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
 
         if (rc != 0) {
-                if (ksocknal_close_conn_unlocked (conn)) {
+                if (ksocknal_close_conn_unlocked (conn, rc)) {
                         /* I'm the first to close */
                         CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
                                 conn, rc, conn->ksnc_peer->ksnp_nid,
@@ -696,7 +733,6 @@ ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
 
         } else if (tx->tx_resid == 0) {  
-
                 /* everything went; assume more can go, and avoid
                  * write_space locking */
                 conn->ksnc_tx_ready = 1;
@@ -763,7 +799,8 @@ ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
                 return (NULL);
         }
         
-        rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, &target_nid);
+        rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
+                         &target_nid);
         if (rc != 0) {
                 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
                 return (NULL);
@@ -820,8 +857,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 #endif
 
         spin_lock_irqsave (&sched->kss_lock, flags);
-                
-        tx->tx_deadline = jiffies_64 + ksocknal_io_timeout;
+
         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
                 
         if (conn->ksnc_tx_ready &&      /* able to send */
@@ -839,7 +875,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 }
 
 ksock_route_t *
-ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
+ksocknal_find_connectable_route_locked (ksock_peer_t *peer, int eager_only)
 {
         struct list_head  *tmp;
         ksock_route_t     *route;
@@ -849,7 +885,8 @@ ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
                 
                 if (route->ksnr_conn == NULL && /* not connected */
                     !route->ksnr_connecting &&  /* not connecting */
-                    route->ksnr_timeout <= jiffies_64) /* OK to retry */
+                    (!eager_only || route->ksnr_eager) && /* wants to connect */
+                    time_after_eq (jiffies, route->ksnr_timeout)) /* OK to retry */
                         return (route);
         }
         
@@ -880,7 +917,7 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
         ksock_conn_t     *conn;
         ksock_route_t    *route;
         rwlock_t         *g_lock;
-        
+
         /* Ensure the frags we've been given EXACTLY match the number of
          * bytes we want to send.  Many TCP/IP stacks disregard any total
          * size parameters passed to them and just look at the frags. 
@@ -907,17 +944,19 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
                 return (PTL_FAIL);
         }
 
-        /* Any routes need to be connected? (need write lock if so) */
-        if (ksocknal_find_connectable_route_locked (peer) == NULL) {
+        if (ksocknal_find_connectable_route_locked(peer, 1) == NULL) {
                 conn = ksocknal_find_conn_locked (tx, peer);
                 if (conn != NULL) {
+                        /* I've got no unconnected autoconnect routes that
+                         * need to be connected, and I do have an actual
+                         * connection... */
                         ksocknal_queue_tx_locked (tx, conn);
                         read_unlock (g_lock);
                         return (PTL_OK);
                 }
         }
         
-        /* need a write lock now to change peer state... */
+        /* Making one or more connections; I'll need a write lock... */
 
         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
         read_unlock (g_lock);
@@ -930,23 +969,37 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
         }
         ksocknal_put_peer (peer);               /* drop ref I got above */
 
-        /* I may launch autoconnects, now we're write locked... */
-        while ((route = ksocknal_find_connectable_route_locked (peer)) != NULL)
+
+        for (;;) {
+                /* launch all eager autoconnections */
+                route = ksocknal_find_connectable_route_locked (peer, 1);
+                if (route == NULL)
+                        break;
+
                 ksocknal_launch_autoconnect_locked (route);
+        }
 
         conn = ksocknal_find_conn_locked (tx, peer);
         if (conn != NULL) {
+                /* Connection exists; queue message on it */
                 ksocknal_queue_tx_locked (tx, conn);
                 write_unlock_irqrestore (g_lock, flags);
                 return (PTL_OK);
         }
-                
+
         if (ksocknal_find_connecting_route_locked (peer) == NULL) {
-                /* no routes actually connecting now */
-                write_unlock_irqrestore (g_lock, flags);
-                return (PTL_FAIL);
+                /* no autoconnect routes actually connecting now.  Scrape
+                 * the barrel for non-eager autoconnects */
+                route = ksocknal_find_connectable_route_locked (peer, 0);
+                if (route != NULL) {
+                        ksocknal_launch_autoconnect_locked (route);
+                } else {
+                        write_unlock_irqrestore (g_lock, flags);
+                        return (PTL_FAIL);
+                }
         }
 
+        /* At least 1 connection is being established; queue the message... */
         list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
 
         write_unlock_irqrestore (g_lock, flags);
@@ -1279,7 +1332,6 @@ ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 
         conn->ksnc_cookie = fmb;                /* stash fmb for later */
         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
-        conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout; /* start timeout */
         
         /* payload is desc's iov-ed buffer, but skipping the hdr */
         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
@@ -1323,7 +1375,7 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
                        dest_nid, body_len);
 
                 ksocknal_new_packet (conn, 0);  /* on to new packet */
-                ksocknal_close_conn_unlocked (conn); /* give up on conn */
+                ksocknal_close_conn_unlocked (conn, -EINVAL); /* give up on conn */
                 return;
         }
 
@@ -1373,6 +1425,9 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
         int   skipped;
 
         if (nob_to_skip == 0) {         /* right at next packet boundary now */
+                conn->ksnc_rx_started = 0;
+                mb ();                          /* racing with timeout thread */
+                
                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
@@ -1464,7 +1519,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
         rc = ksocknal_recvmsg(conn);
 
         if (rc <= 0) {
-                if (ksocknal_close_conn_unlocked (conn)) {
+                if (ksocknal_close_conn_unlocked (conn, rc)) {
                         /* I'm the first to close */
                         if (rc < 0)
                                 CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
@@ -1478,6 +1533,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                 goto out;
         }
 
+        
         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
                 goto out;                       /* try again later */
 
@@ -1506,9 +1562,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                 /* sets wanted_len, iovs etc */
                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
 
-                /* start timeout (lib is waiting for finalize) */
-                conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout;
-
                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
                         goto try_read;          /* go read the payload */
@@ -1517,7 +1570,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 
         case SOCKNAL_RX_BODY:
                 /* payload all received */
-                conn->ksnc_rx_deadline = 0;     /* cancel timeout */
                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
                 /* Fall through */
 
@@ -1534,9 +1586,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
                         conn->ksnc_rx_nob_left);
 
-                /* cancel timeout (only needed it while fmb allocated) */
-                conn->ksnc_rx_deadline = 0;
-
                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
                  * conn->ksnc_cookie */
                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
@@ -1631,15 +1680,20 @@ int ksocknal_scheduler (void *arg)
         int                id = sched - ksocknal_data.ksnd_schedulers;
         char               name[16];
 
-        snprintf (name, sizeof (name),"ksocknald[%d]", id);
+        snprintf (name, sizeof (name),"ksocknald_%02d", id);
         kportal_daemonize (name);
         kportal_blockallsigs ();
 
 #if (CONFIG_SMP && CPU_AFFINITY)
-        if ((cpu_online_map & (1 << id)) != 0)
+        if ((cpu_online_map & (1 << id)) != 0) {
+#if 0
                 current->cpus_allowed = (1 << id);
-        else
+#else
+                set_cpus_allowed (current, 1<<id);
+#endif
+        } else {
                 CERROR ("Can't set CPU affinity for %s\n", name);
+        }
 #endif /* CONFIG_SMP && CPU_AFFINITY */
         
         spin_lock_irqsave (&sched->kss_lock, flags);
@@ -1722,7 +1776,10 @@ ksocknal_data_ready (struct sock *sk, int n)
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
                 sk->sk_data_ready (sk, n);
-        } else if (!conn->ksnc_rx_ready) {        /* new news */
+                goto out;
+        }
+
+        if (!conn->ksnc_rx_ready) {        /* new news */
                 /* Set ASAP in case of concurrent calls to me */
                 conn->ksnc_rx_ready = 1;
 
@@ -1747,6 +1804,7 @@ ksocknal_data_ready (struct sock *sk, int n)
                 spin_unlock_irqrestore (&sched->kss_lock, flags);
         }
 
+ out:
         read_unlock (&ksocknal_data.ksnd_global_lock);
 
         EXIT;
@@ -1776,7 +1834,12 @@ ksocknal_write_space (struct sock *sk)
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
                 sk->sk_write_space (sk);
-        } else if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
+
+                read_unlock (&ksocknal_data.ksnd_global_lock);
+                return;
+        }
+
+        if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
 
                 if (!conn->ksnc_tx_ready) {      /* new news */
@@ -1967,7 +2030,7 @@ ksocknal_exchange_nids (struct socket *sock, ptl_nid_t nid)
 }
 
 int
-ksocknal_set_linger (struct socket *sock) 
+ksocknal_setup_sock (struct socket *sock) 
 {
         mm_segment_t    oldmm = get_fs ();
         int             rc;
@@ -1998,7 +2061,51 @@ ksocknal_set_linger (struct socket *sock)
                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
                 return (rc);
         }
+
+#if SOCKNAL_USE_KEEPALIVES
+        /* Keepalives: If 3/4 of the timeout elapses, start probing every
+         * second until the timeout elapses. */
+
+        option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
+                return (rc);
+        }
         
+        option = 1;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
+                return (rc);
+        }
+        
+        option = ksocknal_data.ksnd_io_timeout / 4;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
+                return (rc);
+        }
+
+        option = 1;
+        set_fs (KERNEL_DS);
+        rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
+                              (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
+                return (rc);
+        }
+#endif
         return (0);
 }
 
@@ -2007,7 +2114,6 @@ ksocknal_connect_peer (ksock_route_t *route)
 {
         struct sockaddr_in  peer_addr;
         mm_segment_t        oldmm = get_fs();
-        __u64               n;
         struct timeval      tv;
         int                 fd;
         struct socket      *sock;
@@ -2020,8 +2126,8 @@ ksocknal_connect_peer (ksock_route_t *route)
         }
 
         /* Ugh; have to map_fd for compatibility with sockets passed in
-         * from userspace.  And we actually need the refcounting that
-         * this gives you :) */
+         * from userspace.  And we actually need the sock->file refcounting
+         * that this gives you :) */
 
         fd = sock_map_fd (sock);
         if (fd < 0) {
@@ -2033,22 +2139,19 @@ ksocknal_connect_peer (ksock_route_t *route)
         /* NB the fd now owns the ref on sock->file */
         LASSERT (sock->file != NULL);
         LASSERT (file_count(sock->file) == 1);
-        
+
         /* Set the socket timeouts, so our connection attempt completes in
          * finite time */
-        tv.tv_sec = ksocknal_io_timeout / HZ;
-        n = ksocknal_io_timeout % HZ;
-        n = n * 1000000 + HZ - 1;
-        do_div (n, HZ);
-        tv.tv_usec = n;
+        tv.tv_sec = ksocknal_data.ksnd_io_timeout;
+        tv.tv_usec = 0;
 
         set_fs (KERNEL_DS);
         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
                               (char *)&tv, sizeof (tv));
         set_fs (oldmm);
         if (rc != 0) {
-                CERROR ("Can't set send timeout %d (in HZ): %d\n", 
-                        ksocknal_io_timeout, rc);
+                CERROR ("Can't set send timeout %d: %d\n", 
+                        ksocknal_data.ksnd_io_timeout, rc);
                 goto out;
         }
         
@@ -2057,8 +2160,8 @@ ksocknal_connect_peer (ksock_route_t *route)
                               (char *)&tv, sizeof (tv));
         set_fs (oldmm);
         if (rc != 0) {
-                CERROR ("Can't set receive timeout %d (in HZ): %d\n",
-                        ksocknal_io_timeout, rc);
+                CERROR ("Can't set receive timeout %d: %d\n",
+                        ksocknal_data.ksnd_io_timeout, rc);
                 goto out;
         }
 
@@ -2154,7 +2257,7 @@ ksocknal_autoconnect (ksock_route_t *route)
         route->ksnr_connecting = 0;
 
         LASSERT (route->ksnr_retry_interval != 0);
-        route->ksnr_timeout = jiffies_64 + route->ksnr_retry_interval;
+        route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
 
@@ -2198,7 +2301,7 @@ ksocknal_autoconnectd (void *arg)
         ksock_route_t     *route;
         int                rc;
 
-        snprintf (name, sizeof (name), "ksocknal_ad[%ld]", id);
+        snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
         kportal_daemonize (name);
         kportal_blockallsigs ();
 
@@ -2239,55 +2342,47 @@ ksock_conn_t *
 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
 {
         /* We're called with a shared lock on ksnd_global_lock */
-        unsigned long      flags;
         ksock_conn_t      *conn;
         struct list_head  *ctmp;
-        ksock_tx_t        *tx;
-        struct list_head  *ttmp;
         ksock_sched_t     *sched;
 
         list_for_each (ctmp, &peer->ksnp_conns) {
                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
                 sched = conn->ksnc_scheduler;
-                
-                if (conn->ksnc_rx_deadline != 0 &&
-                    conn->ksnc_rx_deadline <= jiffies_64)
-                        goto timed_out;
 
-                spin_lock_irqsave (&sched->kss_lock, flags);
+                /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
+                LASSERT (!conn->ksnc_closing);
                 
-                list_for_each (ttmp, &conn->ksnc_tx_queue) {
-                        tx = list_entry (ttmp, ksock_tx_t, tx_list);
-                        LASSERT (tx->tx_deadline != 0);
-                        
-                        if (tx->tx_deadline <= jiffies_64)
-                                goto timed_out_locked;
+                if (conn->ksnc_rx_started &&
+                    time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
+                        /* Timed out incomplete incoming message */
+                        atomic_inc (&conn->ksnc_refcount);
+                        CERROR ("Timed out RX from "LPX64" %p\n", 
+                                peer->ksnp_nid, conn);
+                        return (conn);
                 }
-#if SOCKNAL_ZC
-                list_for_each (ttmp, &conn->ksnc_tx_pending) {
-                        tx = list_entry (ttmp, ksock_tx_t, tx_list);
-                        LASSERT (tx->tx_deadline != 0);
-
-                        if (tx->tx_deadline <= jiffies_64)
-                                goto timed_out_locked;
+                
+                if ((!list_empty (&conn->ksnc_tx_queue) ||
+                     conn->ksnc_sock->sk->wmem_queued != 0) &&
+                    time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
+                        /* Timed out messages queued for sending, or
+                         * messages buffered in the socket's send buffer */
+                        atomic_inc (&conn->ksnc_refcount);
+                        CERROR ("Timed out TX to "LPX64" %s%d %p\n", 
+                                peer->ksnp_nid, 
+                                list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
+                                conn->ksnc_sock->sk->wmem_queued, conn);
+                        return (conn);
                 }
-#endif                
-                spin_unlock_irqrestore (&sched->kss_lock, flags);
-                continue;
-
-        timed_out_locked:
-                spin_unlock_irqrestore (&sched->kss_lock, flags);
-        timed_out:
-                atomic_inc (&conn->ksnc_refcount);
-                return (conn);
         }
 
         return (NULL);
 }
 
 void
-ksocknal_check_peer_timeouts (struct list_head *peers)
+ksocknal_check_peer_timeouts (int idx)
 {
+        struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
         struct list_head *ptmp;
         ksock_peer_t     *peer;
         ksock_conn_t     *conn;
@@ -2305,7 +2400,7 @@ ksocknal_check_peer_timeouts (struct list_head *peers)
                 if (conn != NULL) {
                         read_unlock (&ksocknal_data.ksnd_global_lock);
 
-                        if (ksocknal_close_conn_unlocked (conn)) {
+                        if (ksocknal_close_conn_unlocked (conn, -ETIMEDOUT)) {
                                 /* I actually closed... */
                                 CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
                                         peer->ksnp_nid, conn->ksnc_ipaddr,
@@ -2330,8 +2425,9 @@ ksocknal_reaper (void *arg)
         unsigned long      flags;
         ksock_conn_t      *conn;
         int                timeout;
+        int                i;
         int                peer_index = 0;
-        __u64              deadline = jiffies_64;
+        unsigned long      deadline = jiffies;
         
         kportal_daemonize ("ksocknal_reaper");
         kportal_blockallsigs ();
@@ -2371,12 +2467,32 @@ ksocknal_reaper (void *arg)
                 
                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
 
-                while ((timeout = deadline - jiffies_64) <= 0) {
-                        /* Time to check for timeouts on a few more peers */
-                        ksocknal_check_peer_timeouts (&ksocknal_data.ksnd_peers[peer_index]);
+                /* careful with the jiffy wrap... */
+                while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
+                        const int n = 4;
+                        const int p = 1;
+                        int       chunk = ksocknal_data.ksnd_peer_hash_size;
+                        
+                        /* Time to check for timeouts on a few more peers: I do
+                         * checks every 'p' seconds on a proportion of the peer
+                         * table and I need to check every connection 'n' times
+                         * within a timeout interval, to ensure I detect a
+                         * timeout on any connection within (n+1)/n times the
+                         * timeout interval. */
+
+                        if (ksocknal_data.ksnd_io_timeout > n * p)
+                                chunk = (chunk * n * p) / 
+                                        ksocknal_data.ksnd_io_timeout;
+                        if (chunk == 0)
+                                chunk = 1;
+
+                        for (i = 0; i < chunk; i++) {
+                                ksocknal_check_peer_timeouts (peer_index);
+                                peer_index = (peer_index + 1) % 
+                                             ksocknal_data.ksnd_peer_hash_size;
+                        }
 
-                        peer_index = (peer_index + 1) % SOCKNAL_PEER_HASH_SIZE;
-                        deadline += HZ;
+                        deadline += p * HZ;
                 }
 
                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
index abd0731..478f3c1 100644 (file)
@@ -431,7 +431,8 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie,
         if ((conn = ktoenal_get_conn (nid)) == NULL)
         {
                 /* It's not a peer; try to find a gateway */
-                rc = kpr_lookup (&ktoenal_data.ksnd_router, nid, &gatewaynid);
+                rc = kpr_lookup (&ktoenal_data.ksnd_router, nid, payload_niov,
+                                 &gatewaynid);
                 if (rc != 0)
                 {
                         CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
index 20d7fbd..cf9220b 100644 (file)
@@ -20,7 +20,7 @@ link-stamp:
        echo timestamp > link-stamp
 
 DEFS =
-portals_SOURCES = $(LINKS) module.c proc.c debug.c
+portals_SOURCES = $(LINKS) module.c proc.c debug.c lwt.c
 
 # Don't distribute any patched files.
 dist-hook:
index 63e71ee..90eb185 100644 (file)
@@ -790,41 +790,61 @@ void portals_debug_set_level(unsigned int debug_level)
         portal_debug = debug_level;
 }
 
+void portals_run_upcall(char **argv)
+{
+        int   rc;
+        int   argc;
+        char *envp[] = {
+                "HOME=/",
+                "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
+                NULL};
+        ENTRY;
+
+        argv[0] = portals_upcall;
+        argc = 1;
+        while (argv[argc] != NULL)
+                argc++;
+
+        LASSERT(argc >= 2);
+        
+        rc = call_usermodehelper(argv[0], argv, envp);
+        if (rc < 0) {
+                CERROR("Error %d invoking portals upcall %s %s%s%s%s%s%s%s%s; "
+                       "check /proc/sys/portals/upcall\n",
+                       rc, argv[0], argv[1],
+                       argc < 3 ? "" : ",", argc < 3 ? "" : argv[2],
+                       argc < 4 ? "" : ",", argc < 4 ? "" : argv[3],
+                       argc < 5 ? "" : ",", argc < 5 ? "" : argv[4],
+                       argc < 6 ? "" : ",...");
+        } else {
+                CERROR("Invoked portals upcall %s %s%s%s%s%s%s%s%s\n",
+                       argv[0], argv[1],
+                       argc < 3 ? "" : ",", argc < 3 ? "" : argv[2],
+                       argc < 4 ? "" : ",", argc < 4 ? "" : argv[3],
+                       argc < 5 ? "" : ",", argc < 5 ? "" : argv[4],
+                       argc < 6 ? "" : ",...");
+        }
+}
+
 void portals_run_lbug_upcall(char *file, const char *fn, const int line)
 {
         char *argv[6];
-        char *envp[3];
         char buf[32];
-        int rc;
 
         ENTRY;
         snprintf (buf, sizeof buf, "%d", line);
 
-        argv[0] = portals_upcall;
         argv[1] = "LBUG";
         argv[2] = file;
         argv[3] = (char *)fn;
         argv[4] = buf;
         argv[5] = NULL;
 
-        envp[0] = "HOME=/";
-        envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
-        envp[2] = NULL;
-
-        rc = call_usermodehelper(argv[0], argv, envp);
-        if (rc < 0) {
-                CERROR("Error invoking lbug upcall %s %s %s %s %s: %d; check "
-                       "/proc/sys/portals/upcall\n",                
-                       argv[0], argv[1], argv[2], argv[3], argv[4], rc);
-                
-        } else {
-                CERROR("Invoked upcall %s %s %s %s %s\n",
-                       argv[0], argv[1], argv[2], argv[3], argv[4]);
-        }
+        portals_run_upcall (argv);
 }
 
-
 EXPORT_SYMBOL(portals_debug_dumplog);
 EXPORT_SYMBOL(portals_debug_msg);
 EXPORT_SYMBOL(portals_debug_set_level);
+EXPORT_SYMBOL(portals_run_upcall);
 EXPORT_SYMBOL(portals_run_lbug_upcall);
diff --git a/lnet/libcfs/lwt.c b/lnet/libcfs/lwt.c
new file mode 100644 (file)
index 0000000..a40a7ed
--- /dev/null
@@ -0,0 +1,235 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2003 Cluster File Systems, Inc.
+ *   Author: Eric Barton <eeb@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kmod.h>
+#include <linux/kernel.h>
+#include <linux/kernel.h>
+#include <linux/mm.h>
+#include <linux/string.h>
+#include <linux/stat.h>
+#include <linux/errno.h>
+#include <linux/smp_lock.h>
+#include <linux/unistd.h>
+#include <linux/interrupt.h>
+#include <asm/system.h>
+#include <asm/uaccess.h>
+
+#define DEBUG_SUBSYSTEM S_PORTALS
+
+#include <linux/kp30.h>
+
+#if LWT_SUPPORT
+
+#define LWT_MEMORY              (1<<20)         /* 1Mb of trace memory */
+#define LWT_MAX_CPUS             4
+
+int         lwt_enabled;
+int         lwt_pages_per_cpu;
+lwt_cpu_t   lwt_cpus[LWT_MAX_CPUS];
+
+/* NB only root is allowed to retrieve LWT info; it's an open door into the
+ * kernel... */
+
+int
+lwt_lookup_string (int *size, char *knl_ptr,
+                   char *user_ptr, int user_size)
+{
+        /* knl_ptr was retrieved from an LWT snapshot and the caller wants to
+         * turn it into a string.  NB we can crash with an access violation
+         * trying to determine the string length, so we're trusting our
+         * caller... */
+
+        if (!capable(CAP_SYS_ADMIN))
+                return (-EPERM);
+
+        *size = strlen (knl_ptr) + 1;
+        
+        if (user_ptr != NULL &&
+            copy_to_user (user_ptr, knl_ptr, *size))
+                return (-EFAULT);
+        
+        return (0);
+}
+
+int
+lwt_control (int enable, int clear)
+{
+        lwt_page_t  *p;
+        int          i;
+        int          j;
+
+        if (!capable(CAP_SYS_ADMIN))
+                return (-EPERM);
+
+        if (clear)
+                for (i = 0; i < num_online_cpus(); i++) {
+                        p = lwt_cpus[i].lwtc_current_page;
+                        
+                        for (j = 0; j < lwt_pages_per_cpu; j++) {
+                                
+                                memset (p->lwtp_events, 0, PAGE_SIZE);
+                                
+                                p = list_entry (p->lwtp_list.next,
+                                                lwt_page_t, lwtp_list);
+                        }
+        }
+
+        lwt_enabled = enable;
+        mb();
+        if (!enable) {
+                /* give people some time to stop adding traces */
+                schedule_timeout(10);
+        }
+
+        return (0);
+}
+
+int
+lwt_snapshot (int *ncpu, int *total_size, 
+              void *user_ptr, int user_size) 
+{
+        const int    events_per_page = PAGE_SIZE / sizeof(lwt_event_t);
+        const int    bytes_per_page = events_per_page * sizeof(lwt_event_t);
+        lwt_page_t  *p;
+        int          i;
+        int          j;
+
+        if (!capable(CAP_SYS_ADMIN))
+                return (-EPERM);
+
+        *ncpu = num_online_cpus();
+        *total_size = num_online_cpus() * lwt_pages_per_cpu * bytes_per_page;
+
+        if (user_ptr == NULL)
+                return (0);
+
+        for (i = 0; i < num_online_cpus(); i++) {
+                p = lwt_cpus[i].lwtc_current_page;
+                
+                for (j = 0; j < lwt_pages_per_cpu; j++) {
+                        if (copy_to_user(user_ptr, p->lwtp_events,
+                                         bytes_per_page))
+                                return (-EFAULT);
+
+                        user_ptr = ((char *)user_ptr) + bytes_per_page;
+                        p = list_entry(p->lwtp_list.next,
+                                       lwt_page_t, lwtp_list);
+                        
+                }
+        }
+
+        return (0);
+}
+
+int
+lwt_init () 
+{
+       int     i;
+        int     j;
+        
+        if (num_online_cpus() > LWT_MAX_CPUS) {
+                CERROR ("Too many CPUs\n");
+                return (-EINVAL);
+        }
+
+       /* NULL pointers, zero scalars */
+       memset (lwt_cpus, 0, sizeof (lwt_cpus));
+        lwt_pages_per_cpu = LWT_MEMORY / (num_online_cpus() * PAGE_SIZE);
+
+       for (i = 0; i < num_online_cpus(); i++)
+               for (j = 0; j < lwt_pages_per_cpu; j++) {
+                       struct page *page = alloc_page (GFP_KERNEL);
+                       lwt_page_t  *lwtp;
+
+                       if (page == NULL) {
+                               CERROR ("Can't allocate page\n");
+                                lwt_fini ();
+                               return (-ENOMEM);
+                       }
+
+                        PORTAL_ALLOC(lwtp, sizeof (*lwtp));
+                       if (lwtp == NULL) {
+                               CERROR ("Can't allocate lwtp\n");
+                                __free_page(page);
+                               lwt_fini ();
+                               return (-ENOMEM);
+                       }
+
+                        lwtp->lwtp_page = page;
+                        lwtp->lwtp_events = page_address(page);
+                       memset (lwtp->lwtp_events, 0, PAGE_SIZE);
+
+                       if (j == 0) {
+                               INIT_LIST_HEAD (&lwtp->lwtp_list);
+                               lwt_cpus[i].lwtc_current_page = lwtp;
+                       } else {
+                               list_add (&lwtp->lwtp_list,
+                                   &lwt_cpus[i].lwtc_current_page->lwtp_list);
+                       }
+                }
+
+        lwt_enabled = 1;
+        mb();
+
+        return (0);
+}
+
+void
+lwt_fini () 
+{
+        int    i;
+        
+        if (num_online_cpus() > LWT_MAX_CPUS)
+                return;
+
+        for (i = 0; i < num_online_cpus(); i++)
+                while (lwt_cpus[i].lwtc_current_page != NULL) {
+                        lwt_page_t *lwtp = lwt_cpus[i].lwtc_current_page;
+                        
+                        if (list_empty (&lwtp->lwtp_list)) {
+                                lwt_cpus[i].lwtc_current_page = NULL;
+                        } else {
+                                lwt_cpus[i].lwtc_current_page =
+                                        list_entry (lwtp->lwtp_list.next,
+                                                    lwt_page_t, lwtp_list);
+
+                                list_del (&lwtp->lwtp_list);
+                        }
+                        
+                        __free_page (lwtp->lwtp_page);
+                        PORTAL_FREE (lwtp, sizeof (*lwtp));
+                }
+}
+
+EXPORT_SYMBOL(lwt_enabled);
+EXPORT_SYMBOL(lwt_cpus);
+
+EXPORT_SYMBOL(lwt_init);
+EXPORT_SYMBOL(lwt_fini);
+EXPORT_SYMBOL(lwt_lookup_string);
+EXPORT_SYMBOL(lwt_control);
+EXPORT_SYMBOL(lwt_snapshot);
+#endif
index c72dffc..11e4f4c 100644 (file)
@@ -122,8 +122,8 @@ static inline void freedata(void *data, int len)
 }
 
 static int
-kportal_add_route(int gateway_nalid, ptl_nid_t gateway_nid, ptl_nid_t lo_nid,
-                  ptl_nid_t hi_nid)
+kportal_add_route(int gateway_nalid, ptl_nid_t gateway_nid, 
+                  ptl_nid_t lo_nid, ptl_nid_t hi_nid)
 {
         int rc;
         kpr_control_interface_t *ci;
@@ -139,7 +139,8 @@ kportal_add_route(int gateway_nalid, ptl_nid_t gateway_nid, ptl_nid_t lo_nid,
 }
 
 static int
-kportal_del_route(ptl_nid_t target)
+kportal_del_route(int gw_nalid, ptl_nid_t gw_nid, 
+                  ptl_nid_t lo, ptl_nid_t hi)
 {
         int rc;
         kpr_control_interface_t *ci;
@@ -148,7 +149,24 @@ kportal_del_route(ptl_nid_t target)
         if (ci == NULL)
                 return (-ENODEV);
 
-        rc = ci->kprci_del_route (target);
+        rc = ci->kprci_del_route (gw_nalid, gw_nid, lo, hi);
+
+        PORTAL_SYMBOL_PUT(kpr_control_interface);
+        return (rc);
+}
+
+static int
+kportal_notify_router (int gw_nalid, ptl_nid_t gw_nid,
+                       int alive, time_t when)
+{
+        int rc;
+        kpr_control_interface_t *ci;
+
+        ci = (kpr_control_interface_t *)PORTAL_SYMBOL_GET(kpr_control_interface);
+        if (ci == NULL)
+                return (-ENODEV);
+
+        rc = ci->kprci_notify (gw_nalid, gw_nid, alive, when);
 
         PORTAL_SYMBOL_PUT(kpr_control_interface);
         return (rc);
@@ -156,12 +174,13 @@ kportal_del_route(ptl_nid_t target)
 
 static int
 kportal_get_route(int index, __u32 *gateway_nalidp, ptl_nid_t *gateway_nidp,
-                  ptl_nid_t *lo_nidp, ptl_nid_t *hi_nidp)
+                  ptl_nid_t *lo_nidp, ptl_nid_t *hi_nidp, int *alivep)
 {
         int       gateway_nalid;
         ptl_nid_t gateway_nid;
         ptl_nid_t lo_nid;
         ptl_nid_t hi_nid;
+        int       alive;
         int       rc;
         kpr_control_interface_t *ci;
 
@@ -169,17 +188,19 @@ kportal_get_route(int index, __u32 *gateway_nalidp, ptl_nid_t *gateway_nidp,
         if (ci == NULL)
                 return (-ENODEV);
 
-        rc = ci->kprci_get_route(index, &gateway_nalid, &gateway_nid, &lo_nid,
-                                 &hi_nid);
+        rc = ci->kprci_get_route(index, &gateway_nalid, &gateway_nid,
+                                 &lo_nid, &hi_nid, &alive);
 
         if (rc == 0) {
-                CDEBUG(D_IOCTL, "got route [%d] %d "LPX64":"LPX64" - "LPX64"\n",
-                       index, gateway_nalid, gateway_nid, lo_nid, hi_nid);
+                CDEBUG(D_IOCTL, "got route [%d] %d "LPX64":"LPX64" - "LPX64", %s\n",
+                       index, gateway_nalid, gateway_nid, lo_nid, hi_nid,
+                       alive ? "up" : "down");
 
                 *gateway_nalidp = (__u32)gateway_nalid;
-                *gateway_nidp   = (__u32)gateway_nid;
-                *lo_nidp        = (__u32)lo_nid;
-                *hi_nidp        = (__u32)hi_nid;
+                *gateway_nidp   = gateway_nid;
+                *lo_nidp        = lo_nid;
+                *hi_nidp        = hi_nid;
+                *alivep         = alive;
         }
 
         PORTAL_SYMBOL_PUT (kpr_control_interface);
@@ -367,23 +388,38 @@ static int kportal_ioctl(struct inode *inode, struct file *file,
 
         case IOC_PORTAL_ADD_ROUTE:
                 CDEBUG(D_IOCTL, "Adding route: [%d] "LPU64" : "LPU64" - "LPU64"\n",
-                       data->ioc_nal, data->ioc_nid, data->ioc_nid2,
-                       data->ioc_nid3);
+                       data->ioc_nal, data->ioc_nid, 
+                       data->ioc_nid2, data->ioc_nid3);
                 err = kportal_add_route(data->ioc_nal, data->ioc_nid,
-                                        MIN (data->ioc_nid2, data->ioc_nid3),
-                                        MAX (data->ioc_nid2, data->ioc_nid3));
+                                        data->ioc_nid2, data->ioc_nid3);
                 break;
 
         case IOC_PORTAL_DEL_ROUTE:
-                CDEBUG (D_IOCTL, "Removing route to "LPU64"\n", data->ioc_nid);
-                err = kportal_del_route (data->ioc_nid);
+                CDEBUG (D_IOCTL, "Removing routes via [%d] "LPU64" : "LPU64" - "LPU64"\n",
+                        data->ioc_nal, data->ioc_nid, 
+                        data->ioc_nid2, data->ioc_nid3);
+                err = kportal_del_route (data->ioc_nal, data->ioc_nid,
+                                         data->ioc_nid2, data->ioc_nid3);
                 break;
 
+        case IOC_PORTAL_NOTIFY_ROUTER: {
+                CDEBUG (D_IOCTL, "Notifying peer [%d] "LPU64" %s @ %ld\n",
+                        data->ioc_nal, data->ioc_nid,
+                        data->ioc_flags ? "Enabling" : "Disabling",
+                        (time_t)data->ioc_nid3);
+                
+                err = kportal_notify_router (data->ioc_nal, data->ioc_nid,
+                                             data->ioc_flags, 
+                                             (time_t)data->ioc_nid3);
+                break;
+        }
+                
         case IOC_PORTAL_GET_ROUTE:
                 CDEBUG (D_IOCTL, "Getting route [%d]\n", data->ioc_count);
                 err = kportal_get_route(data->ioc_count, &data->ioc_nal,
-                                        &data->ioc_nid, &data->ioc_nid2,
-                                        &data->ioc_nid3);
+                                        &data->ioc_nid, 
+                                        &data->ioc_nid2, &data->ioc_nid3,
+                                        &data->ioc_flags);
                 if (err == 0)
                         if (copy_to_user((char *)arg, data, sizeof (*data)))
                                 err = -EFAULT;
@@ -432,7 +468,27 @@ static int kportal_ioctl(struct inode *inode, struct file *file,
                 kportal_put_ni (data->ioc_nal);
                 break;
         }
-
+#if LWT_SUPPORT
+        case IOC_PORTAL_LWT_CONTROL: 
+                err = lwt_control (data->ioc_flags, data->ioc_misc);
+                break;
+                
+        case IOC_PORTAL_LWT_SNAPSHOT:
+                err = lwt_snapshot (&data->ioc_count, &data->ioc_misc,
+                                    data->ioc_pbuf1, data->ioc_plen1);
+                if (err == 0 &&
+                    copy_to_user((char *)arg, data, sizeof (*data)))
+                        err = -EFAULT;
+                break;
+                
+        case IOC_PORTAL_LWT_LOOKUP_STRING:
+                err = lwt_lookup_string (&data->ioc_count, data->ioc_pbuf1,
+                                         data->ioc_pbuf2, data->ioc_plen2);
+                if (err == 0 &&
+                    copy_to_user((char *)arg, data, sizeof (*data)))
+                        err = -EFAULT;
+                break;
+#endif                        
         default:
                 err = -EINVAL;
                 break;
@@ -471,12 +527,19 @@ static int init_kportals_module(void)
                 return (rc);
         }
 
+#if LWT_SUPPORT
+        rc = lwt_init();
+        if (rc != 0) {
+                CERROR("lwt_init: error %d\n", rc);
+                goto cleanup_debug;
+        }
+#endif
         sema_init(&nal_cmd_sem, 1);
 
         rc = misc_register(&portal_dev);
         if (rc) {
                 CERROR("misc_register: error %d\n", rc);
-                goto cleanup_debug;
+                goto cleanup_lwt;
         }
 
         rc = PtlInit();
@@ -498,6 +561,10 @@ static int init_kportals_module(void)
         PtlFini();
  cleanup_deregister:
         misc_deregister(&portal_dev);
+ cleanup_lwt:
+#if LWT_SUPPORT
+        lwt_fini();
+#endif
  cleanup_debug:
         portals_debug_cleanup();
         return rc;
@@ -518,6 +585,10 @@ static void exit_kportals_module(void)
         if (rc)
                 CERROR("misc_deregister error %d\n", rc);
 
+#if LWT_SUPPORT
+        lwt_fini();
+#endif
+
         if (atomic_read(&portal_kmemory) != 0)
                 CERROR("Portals memory leaked: %d bytes\n",
                        atomic_read(&portal_kmemory));
index 27a7fba..a03fb42 100644 (file)
@@ -24,6 +24,7 @@
 #include "router.h"
 
 LIST_HEAD(kpr_routes);
+LIST_HEAD(kpr_gateways);
 LIST_HEAD(kpr_nals);
 
 unsigned long long kpr_fwd_bytes;
@@ -42,6 +43,7 @@ kpr_router_interface_t kpr_router_interface = {
        kprri_lookup:           kpr_lookup_target,
        kprri_fwd_start:        kpr_forward_packet,
        kprri_fwd_done:         kpr_complete_packet,
+        kprri_notify:           kpr_nal_notify,
        kprri_shutdown:         kpr_shutdown_nal,
        kprri_deregister:       kpr_deregister_nal,
 };
@@ -50,6 +52,7 @@ kpr_control_interface_t kpr_control_interface = {
        kprci_add_route:        kpr_add_route,
        kprci_del_route:        kpr_del_route,
        kprci_get_route:        kpr_get_route,
+       kprci_notify:           kpr_sys_notify,
 };
 
 int
@@ -59,7 +62,7 @@ kpr_register_nal (kpr_nal_interface_t *nalif, void **argp)
        struct list_head  *e;
        kpr_nal_entry_t   *ne;
 
-        CDEBUG (D_OTHER, "Registering NAL %d\n", nalif->kprni_nalid);
+        CDEBUG (D_NET, "Registering NAL %d\n", nalif->kprni_nalid);
 
        PORTAL_ALLOC (ne, sizeof (*ne));
        if (ne == NULL)
@@ -96,12 +99,181 @@ kpr_register_nal (kpr_nal_interface_t *nalif, void **argp)
 }
 
 void
+kpr_do_upcall (void *arg)
+{
+        kpr_upcall_t *u = (kpr_upcall_t *)arg;
+        char          nalstr[10];
+        char          nidstr[36];
+        char          whenstr[36];
+        char         *argv[] = {
+                NULL,
+                "ROUTER_NOTIFY",
+                nalstr,
+                nidstr,
+                u->kpru_alive ? "up" : "down",
+                whenstr,
+                NULL};
+        
+        snprintf (nalstr, sizeof(nalstr), "%d", u->kpru_nal_id);
+        snprintf (nidstr, sizeof(nidstr), LPX64, u->kpru_nid);
+        snprintf (whenstr, sizeof(whenstr), "%ld", u->kpru_when);
+
+        portals_run_upcall (argv);
+}
+
+void
+kpr_upcall (int gw_nalid, ptl_nid_t gw_nid, int alive, time_t when)
+{
+        /* May be in arbitrary context */
+        kpr_upcall_t  *u = kmalloc (sizeof (kpr_upcall_t), GFP_ATOMIC);
+
+        if (u == NULL) {
+                CERROR ("Upcall out of memory: nal %d nid "LPX64" %s\n",
+                        gw_nalid, gw_nid, alive ? "up" : "down");
+                return;
+        }
+
+        u->kpru_nal_id     = gw_nalid;
+        u->kpru_nid        = gw_nid;
+        u->kpru_alive      = alive;
+        u->kpru_when       = when;
+
+        prepare_work (&u->kpru_tq, kpr_do_upcall, u);
+        schedule_work (&u->kpru_tq);
+}
+
+int
+kpr_do_notify (int byNal, int gateway_nalid, ptl_nid_t gateway_nid,
+               int alive, time_t when)
+{
+       unsigned long        flags;
+        int                  rc = -ENOENT;
+        kpr_nal_entry_t     *ne = NULL;
+        kpr_gateway_entry_t *ge = NULL;
+        struct timeval       now;
+       struct list_head    *e;
+       struct list_head    *n;
+
+        CDEBUG (D_ERROR, "%s notifying [%d] "LPX64": %s\n", 
+                byNal ? "NAL" : "userspace", 
+                gateway_nalid, gateway_nid, alive ? "up" : "down");
+
+        /* can't do predictions... */
+        do_gettimeofday (&now);
+        if (when > now.tv_sec) {
+                CERROR ("Ignoring prediction from %s of [%d] "LPX64" %s "
+                        "%ld seconds in the future\n", 
+                byNal ? "NAL" : "userspace", 
+                gateway_nalid, gateway_nid, alive ? "up" : "down",
+                        when - now.tv_sec);
+                return (EINVAL);
+        }
+
+        LASSERT (when <= now.tv_sec);
+
+        /* Serialise with lookups (i.e. write lock) */
+       write_lock_irqsave(&kpr_rwlock, flags);
+
+        list_for_each_safe (e, n, &kpr_gateways) {
+
+                ge = list_entry(e, kpr_gateway_entry_t, kpge_list);
+                if ((gateway_nalid != 0 &&
+                     ge->kpge_nalid != gateway_nalid) ||
+                    ge->kpge_nid != gateway_nid)
+                        continue;
+
+                rc = 0;
+                break;
+        }
+
+        if (rc != 0) {
+                /* gateway not found */
+                write_unlock_irqrestore(&kpr_rwlock, flags);
+                CERROR ("Gateway not found\n");
+                return (rc);
+        }
+        
+        if (when < ge->kpge_timestamp) {
+                /* out of date information */
+                write_unlock_irqrestore (&kpr_rwlock, flags);
+                CERROR ("Out of date\n");
+                return (0);
+        }
+
+        /* update timestamp */
+        ge->kpge_timestamp = when;
+
+        if ((!ge->kpge_alive) == (!alive)) {
+                /* new date for old news */
+                write_unlock_irqrestore (&kpr_rwlock, flags);
+                CERROR ("Old news\n");
+                return (0);
+        }
+
+        ge->kpge_alive = alive;
+        CDEBUG(D_NET, "set "LPX64" [%p] %d\n", gateway_nid, ge, alive);
+
+        if (alive) {
+                /* Reset all gateway weights so the newly-enabled gateway
+                 * doesn't have to play catch-up */
+                list_for_each_safe (e, n, &kpr_gateways) {
+                        kpr_gateway_entry_t *ge = list_entry(e, kpr_gateway_entry_t,
+                                                             kpge_list);
+                        atomic_set (&ge->kpge_weight, 0);
+                }
+        }
+
+        if (!byNal) {
+                /* userland notified me: notify NAL? */
+                ne = kpr_find_nal_entry_locked (ge->kpge_nalid);
+                if (ne != NULL) {
+                        if (ne->kpne_shutdown ||
+                            ne->kpne_interface.kprni_notify == NULL) {
+                                /* no need to notify */
+                                ne = NULL;
+                        } else {
+                                /* take a ref on this NAL until notifying
+                                 * it has completed... */
+                                atomic_inc (&ne->kpne_refcount);
+                        }
+                }
+        }
+
+        write_unlock_irqrestore(&kpr_rwlock, flags);
+
+        if (ne != NULL) {
+                ne->kpne_interface.kprni_notify (ne->kpne_interface.kprni_arg,
+                                                 gateway_nid, alive);
+                /* 'ne' can disappear now... */
+                atomic_dec (&ne->kpne_refcount);
+        }
+        
+        if (byNal) {
+                /* It wasn't userland that notified me... */
+                CERROR ("Doing upcall\n");
+                kpr_upcall (gateway_nalid, gateway_nid, alive, when);
+        } else {
+                CERROR (" NOT Doing upcall\n");
+        }
+        
+        return (0);
+}
+
+void
+kpr_nal_notify (void *arg, ptl_nid_t peer, int alive, time_t when)
+{
+        kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
+        
+        kpr_do_notify (1, ne->kpne_interface.kprni_nalid, peer, alive, when);
+}
+
+void
 kpr_shutdown_nal (void *arg)
 {
        unsigned long    flags;
        kpr_nal_entry_t *ne = (kpr_nal_entry_t *)arg;
 
-        CDEBUG (D_OTHER, "Shutting down NAL %d\n", ne->kpne_interface.kprni_nalid);
+        CDEBUG (D_NET, "Shutting down NAL %d\n", ne->kpne_interface.kprni_nalid);
 
        LASSERT (!ne->kpne_shutdown);
        LASSERT (!in_interrupt());
@@ -126,7 +298,7 @@ kpr_deregister_nal (void *arg)
        unsigned long     flags;
        kpr_nal_entry_t  *ne = (kpr_nal_entry_t *)arg;
 
-        CDEBUG (D_OTHER, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
+        CDEBUG (D_NET, "Deregister NAL %d\n", ne->kpne_interface.kprni_nalid);
 
        LASSERT (ne->kpne_shutdown);            /* caller must have issued shutdown already */
        LASSERT (atomic_read (&ne->kpne_refcount) == 0); /* can't be busy */
@@ -142,15 +314,58 @@ kpr_deregister_nal (void *arg)
         PORTAL_MODULE_UNUSE;
 }
 
+int
+kpr_ge_isbetter (kpr_gateway_entry_t *ge1, kpr_gateway_entry_t *ge2)
+{
+        const int significant_bits = 0x00ffffff;
+        /* We use atomic_t to record/compare route weights for
+         * load-balancing.  Here we limit ourselves to only using
+         * 'significant_bits' when we do an 'after' comparison */
+
+        int    diff = (atomic_read (&ge1->kpge_weight) -
+                       atomic_read (&ge2->kpge_weight)) & significant_bits;
+        int    rc = (diff > (significant_bits >> 1));
+
+        CDEBUG(D_NET, "[%p]"LPX64"=%d %s [%p]"LPX64"=%d\n",
+               ge1, ge1->kpge_nid, atomic_read (&ge1->kpge_weight),
+               rc ? ">" : "<",
+               ge2, ge2->kpge_nid, atomic_read (&ge2->kpge_weight));
+
+        return (rc);
+}
+
+void
+kpr_update_weight (kpr_gateway_entry_t *ge, int nob)
+{
+        int weight = 1 + (nob + sizeof (ptl_hdr_t)/2)/sizeof (ptl_hdr_t);
+
+        /* We've chosen this route entry (i.e. gateway) to forward payload
+         * of length 'nob'; update the route's weight to make it less
+         * favoured.  Note that the weight is 1 plus the payload size
+         * rounded and scaled to the portals header size, so we get better
+         * use of the significant bits in kpge_weight. */
+
+        CDEBUG(D_NET, "gateway [%p]"LPX64" += %d\n", ge,
+               ge->kpge_nid, weight);
+        
+        atomic_add (weight, &ge->kpge_weight);
+}
 
 int
-kpr_lookup_target (void *arg, ptl_nid_t target_nid, ptl_nid_t *gateway_nidp)
+kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob,
+                   ptl_nid_t *gateway_nidp)
 {
-       kpr_nal_entry_t  *ne = (kpr_nal_entry_t *)arg;
-       struct list_head *e;
-       int               rc = -ENOENT;
+       kpr_nal_entry_t     *ne = (kpr_nal_entry_t *)arg;
+       struct list_head    *e;
+        kpr_route_entry_t   *re;
+        kpr_gateway_entry_t *ge = NULL;
+       int                  rc = -ENOENT;
 
-        CDEBUG (D_OTHER, "lookup "LPX64" from NAL %d\n", target_nid, ne->kpne_interface.kprni_nalid);
+        /* Caller wants to know if 'target_nid' can be reached via a gateway
+         * ON HER OWN NETWORK */
+
+        CDEBUG (D_NET, "lookup "LPX64" from NAL %d\n", target_nid, 
+                ne->kpne_interface.kprni_nalid);
 
        if (ne->kpne_shutdown)          /* caller is shutting down */
                return (-ENOENT);
@@ -159,9 +374,8 @@ kpr_lookup_target (void *arg, ptl_nid_t target_nid, ptl_nid_t *gateway_nidp)
 
        /* Search routes for one that has a gateway to target_nid on the callers network */
 
-       for (e = kpr_routes.next; e != &kpr_routes; e = e->next)
-       {
-               kpr_route_entry_t *re = list_entry (e, kpr_route_entry_t, kpre_list);
+        list_for_each (e, &kpr_routes) {
+               re = list_entry (e, kpr_route_entry_t, kpre_list);
 
                if (re->kpre_lo_nid > target_nid ||
                     re->kpre_hi_nid < target_nid)
@@ -169,33 +383,66 @@ kpr_lookup_target (void *arg, ptl_nid_t target_nid, ptl_nid_t *gateway_nidp)
 
                /* found table entry */
 
-               if (re->kpre_gateway_nalid != ne->kpne_interface.kprni_nalid) /* different NAL */
-                       rc = -EHOSTUNREACH;
-               else
-               {
-                       rc = 0;
-                       *gateway_nidp = re->kpre_gateway_nid;
-               }
-               break;
+               if (re->kpre_gateway->kpge_nalid != ne->kpne_interface.kprni_nalid ||
+                    !re->kpre_gateway->kpge_alive) {
+                        /* different NAL or gateway down */
+                        rc = -EHOSTUNREACH;
+                        continue;
+                }
+                
+                if (ge == NULL ||
+                    kpr_ge_isbetter (re->kpre_gateway, ge))
+                    ge = re->kpre_gateway;
        }
 
+        if (ge != NULL) {
+                kpr_update_weight (ge, nob);
+                *gateway_nidp = ge->kpge_nid;
+                rc = 0;
+        }
+        
        read_unlock (&kpr_rwlock);
 
-        CDEBUG (D_OTHER, "lookup "LPX64" from NAL %d: %d ("LPX64")\n",
+        /* NB can't deref 're' now; it might have been removed! */
+
+        CDEBUG (D_NET, "lookup "LPX64" from NAL %d: %d ("LPX64")\n",
                 target_nid, ne->kpne_interface.kprni_nalid, rc,
                 (rc == 0) ? *gateway_nidp : (ptl_nid_t)0);
        return (rc);
 }
 
+kpr_nal_entry_t *
+kpr_find_nal_entry_locked (int nal_id)
+{
+        struct list_head    *e;
+        
+        /* Called with kpr_rwlock held */
+
+        list_for_each (e, &kpr_nals) {
+                kpr_nal_entry_t *ne = list_entry (e, kpr_nal_entry_t, kpne_list);
+
+                if (nal_id != ne->kpne_interface.kprni_nalid) /* no match */
+                        continue;
+
+                return (ne);
+        }
+        
+        return (NULL);
+}
+
 void
 kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
-       kpr_nal_entry_t  *src_ne = (kpr_nal_entry_t *)arg;
-       ptl_nid_t         target_nid = fwd->kprfd_target_nid;
-        int               nob = fwd->kprfd_nob;
-       struct list_head *e;
-
-        CDEBUG (D_OTHER, "forward [%p] "LPX64" from NAL %d\n", fwd,
+       kpr_nal_entry_t     *src_ne = (kpr_nal_entry_t *)arg;
+       ptl_nid_t            target_nid = fwd->kprfd_target_nid;
+        int                  nob = fwd->kprfd_nob;
+        kpr_gateway_entry_t *ge = NULL;
+        kpr_nal_entry_t     *dst_ne = NULL;
+       struct list_head    *e;
+        kpr_route_entry_t   *re;
+        kpr_nal_entry_t     *tmp_ne;
+
+        CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d\n", fwd,
                 target_nid, src_ne->kpne_interface.kprni_nalid);
 
         LASSERT (nob >= sizeof (ptl_hdr_t)); /* at least got a packet header */
@@ -216,53 +463,58 @@ kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd)
 
        /* Search routes for one that has a gateway to target_nid NOT on the caller's network */
 
-       for (e = kpr_routes.next; e != &kpr_routes; e = e->next)
-       {
-               kpr_route_entry_t *re = list_entry (e, kpr_route_entry_t, kpre_list);
+        list_for_each (e, &kpr_routes) {
+               re = list_entry (e, kpr_route_entry_t, kpre_list);
 
                if (re->kpre_lo_nid > target_nid || /* no match */
                     re->kpre_hi_nid < target_nid)
                        continue;
 
-                CDEBUG (D_OTHER, "forward [%p] "LPX64" from NAL %d: match "LPX64" on NAL %d\n", fwd,
-                        target_nid, src_ne->kpne_interface.kprni_nalid,
-                        re->kpre_gateway_nid, re->kpre_gateway_nalid);
-
-               if (re->kpre_gateway_nalid == src_ne->kpne_interface.kprni_nalid)
-                       break;                  /* don't route to same NAL */
+               if (re->kpre_gateway->kpge_nalid == src_ne->kpne_interface.kprni_nalid)
+                       continue;               /* don't route to same NAL */
 
-               /* Search for gateway's NAL's entry */
-
-               for (e = kpr_nals.next; e != &kpr_nals; e = e->next)
-               {
-                       kpr_nal_entry_t *dst_ne = list_entry (e, kpr_nal_entry_t, kpne_list);
+                if (!re->kpre_gateway->kpge_alive)
+                        continue;               /* gateway is dead */
+                
+                tmp_ne = kpr_find_nal_entry_locked (re->kpre_gateway->kpge_nalid);
 
-                       if (re->kpre_gateway_nalid != dst_ne->kpne_interface.kprni_nalid) /* no match */
-                               continue;
+                if (tmp_ne == NULL ||
+                    tmp_ne->kpne_shutdown) {
+                        /* NAL must be registered and not shutting down */
+                        continue;
+                }
 
-                       if (dst_ne->kpne_shutdown) /* don't route if NAL is shutting down */
-                               break;
+                if (ge == NULL ||
+                    kpr_ge_isbetter (re->kpre_gateway, ge)) {
+                        ge = re->kpre_gateway;
+                        dst_ne = tmp_ne;
+                }
+        }
+        
+        if (ge != NULL) {
+                LASSERT (dst_ne != NULL);
+                
+                kpr_update_weight (ge, nob);
 
-                       fwd->kprfd_gateway_nid = re->kpre_gateway_nid;
-                       atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */
+                fwd->kprfd_gateway_nid = ge->kpge_nid;
+                atomic_inc (&dst_ne->kpne_refcount); /* dest nal is busy until fwd completes */
 
-                       read_unlock (&kpr_rwlock);
+                read_unlock (&kpr_rwlock);
 
-                        CDEBUG (D_OTHER, "forward [%p] "LPX64" from NAL %d: "LPX64" on NAL %d\n", fwd,
-                                target_nid, src_ne->kpne_interface.kprni_nalid,
-                                fwd->kprfd_gateway_nid, dst_ne->kpne_interface.kprni_nalid);
+                CDEBUG (D_NET, "forward [%p] "LPX64" from NAL %d: "
+                        "to "LPX64" on NAL %d\n", 
+                        fwd, target_nid, src_ne->kpne_interface.kprni_nalid,
+                        fwd->kprfd_gateway_nid, dst_ne->kpne_interface.kprni_nalid);
 
-                       dst_ne->kpne_interface.kprni_fwd (dst_ne->kpne_interface.kprni_arg, fwd);
-                       return;
-               }
-               break;
+                dst_ne->kpne_interface.kprni_fwd (dst_ne->kpne_interface.kprni_arg, fwd);
+                return;
        }
 
-       read_unlock (&kpr_rwlock);
+        read_unlock (&kpr_rwlock);
  out:
         kpr_fwd_errors++;
 
-        CDEBUG (D_OTHER, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd,
+        CDEBUG (D_NET, "Failed to forward [%p] "LPX64" from NAL %d\n", fwd,
                 target_nid, src_ne->kpne_interface.kprni_nalid);
 
        /* Can't find anywhere to forward to */
@@ -278,14 +530,14 @@ kpr_complete_packet (void *arg, kpr_fwd_desc_t *fwd, int error)
        kpr_nal_entry_t *dst_ne = (kpr_nal_entry_t *)arg;
        kpr_nal_entry_t *src_ne = (kpr_nal_entry_t *)fwd->kprfd_router_arg;
 
-        CDEBUG (D_OTHER, "complete(1) [%p] from NAL %d to NAL %d: %d\n", fwd,
+        CDEBUG (D_NET, "complete(1) [%p] from NAL %d to NAL %d: %d\n", fwd,
                 src_ne->kpne_interface.kprni_nalid, dst_ne->kpne_interface.kprni_nalid, error);
 
        atomic_dec (&dst_ne->kpne_refcount);    /* CAVEAT EMPTOR dst_ne can disappear now!!! */
 
        (fwd->kprfd_callback)(fwd->kprfd_callback_arg, error);
 
-        CDEBUG (D_OTHER, "complete(2) [%p] from NAL %d: %d\n", fwd,
+        CDEBUG (D_NET, "complete(2) [%p] from NAL %d: %d\n", fwd,
                 src_ne->kpne_interface.kprni_nalid, error);
 
         atomic_dec (&kpr_queue_depth);
@@ -293,49 +545,76 @@ kpr_complete_packet (void *arg, kpr_fwd_desc_t *fwd, int error)
 }
 
 int
-kpr_add_route (int gateway_nalid, ptl_nid_t gateway_nid, ptl_nid_t lo_nid,
-               ptl_nid_t hi_nid)
+kpr_add_route (int gateway_nalid, ptl_nid_t gateway_nid, 
+               ptl_nid_t lo_nid, ptl_nid_t hi_nid)
 {
-       unsigned long      flags;
-       struct list_head  *e;
-       kpr_route_entry_t *re;
+       unsigned long        flags;
+       struct list_head    *e;
+       kpr_route_entry_t   *re;
+        kpr_gateway_entry_t *ge;
+        int                  dup = 0;
 
-        CDEBUG(D_OTHER, "Add route: %d "LPX64" : "LPX64" - "LPX64"\n",
+        CDEBUG(D_NET, "Add route: %d "LPX64" : "LPX64" - "LPX64"\n",
                gateway_nalid, gateway_nid, lo_nid, hi_nid);
 
-        LASSERT(lo_nid <= hi_nid);
+        if (gateway_nalid == PTL_NID_ANY ||
+            lo_nid == PTL_NID_ANY ||
+            hi_nid == PTL_NID_ANY ||
+            lo_nid > hi_nid)
+                return (-EINVAL);
+
+        PORTAL_ALLOC (ge, sizeof (*ge));
+        if (ge == NULL)
+                return (-ENOMEM);
+
+        ge->kpge_nalid = gateway_nalid;
+        ge->kpge_nid   = gateway_nid;
+        ge->kpge_alive = 1;
+        ge->kpge_timestamp = 0;
+        ge->kpge_refcount = 0;
+        atomic_set (&ge->kpge_weight, 0);
 
         PORTAL_ALLOC (re, sizeof (*re));
         if (re == NULL)
                 return (-ENOMEM);
 
-        re->kpre_gateway_nalid = gateway_nalid;
-        re->kpre_gateway_nid = gateway_nid;
         re->kpre_lo_nid = lo_nid;
         re->kpre_hi_nid = hi_nid;
 
         LASSERT(!in_interrupt());
        write_lock_irqsave (&kpr_rwlock, flags);
 
-        for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {
-                kpr_route_entry_t *re2 = list_entry(e, kpr_route_entry_t,
-                                                    kpre_list);
-
-                if (re->kpre_lo_nid > re2->kpre_hi_nid ||
-                    re->kpre_hi_nid < re2->kpre_lo_nid)
-                        continue;
+        list_for_each (e, &kpr_gateways) {
+                kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
+                                                      kpge_list);
+                
+                if (ge2->kpge_nalid == gateway_nalid &&
+                    ge2->kpge_nid == gateway_nid) {
+                        PORTAL_FREE (ge, sizeof (*ge));
+                        ge = ge2;
+                        dup = 1;
+                        break;
+                }
+        }
 
-                CERROR ("Attempt to add duplicate routes ["LPX64" - "LPX64"]"
-                        "to ["LPX64" - "LPX64"]\n",
-                        re->kpre_lo_nid, re->kpre_hi_nid,
-                        re2->kpre_lo_nid, re2->kpre_hi_nid);
+        if (!dup) {
+                /* Adding a new gateway... */
+                list_add (&ge->kpge_list, &kpr_gateways);
 
-                write_unlock_irqrestore (&kpr_rwlock, flags);
+                /* ...zero all gateway weights so this one doesn't have to
+                 * play catch-up */
 
-                PORTAL_FREE (re, sizeof (*re));
-                return (-EINVAL);
+                list_for_each (e, &kpr_gateways) {
+                        kpr_gateway_entry_t *ge2 = list_entry(e, kpr_gateway_entry_t,
+                                                              kpge_list);
+                        atomic_set (&ge2->kpge_weight, 0);
+                }
+                
         }
 
+        re->kpre_gateway = ge;
+        ge->kpge_refcount++;
         list_add (&re->kpre_list, &kpr_routes);
 
         write_unlock_irqrestore (&kpr_rwlock, flags);
@@ -343,49 +622,82 @@ kpr_add_route (int gateway_nalid, ptl_nid_t gateway_nid, ptl_nid_t lo_nid,
 }
 
 int
-kpr_del_route (ptl_nid_t nid)
+kpr_sys_notify (int gateway_nalid, ptl_nid_t gateway_nid,
+            int alive, time_t when)
+{
+        return (kpr_do_notify (0, gateway_nalid, gateway_nid, alive, when));
+}
+
+int
+kpr_del_route (int gw_nalid, ptl_nid_t gw_nid,
+               ptl_nid_t lo, ptl_nid_t hi)
 {
+        int                specific = (lo != PTL_NID_ANY);
        unsigned long      flags;
+        int                rc = -ENOENT;
        struct list_head  *e;
+       struct list_head  *n;
 
-        CDEBUG(D_OTHER, "Del route "LPX64"\n", nid);
+        CDEBUG(D_NET, "Del route [%d] "LPX64" : "LPX64" - "LPX64"\n", 
+               gw_nalid, gw_nid, lo, hi);
 
         LASSERT(!in_interrupt());
+
+        /* NB Caller may specify either all routes via the given gateway
+         * (lo/hi == PTL_NID_ANY) or a specific route entry (lo/hi are
+         * actual NIDs) */
+        
+        if (specific ? (hi == PTL_NID_ANY || hi < lo) : (hi != PTL_NID_ANY))
+                return (-EINVAL);
+
        write_lock_irqsave(&kpr_rwlock, flags);
 
-        for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {
-                kpr_route_entry_t *re = list_entry(e, kpr_route_entry_t,
+        list_for_each_safe (e, n, &kpr_routes) {
+                kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
                                                    kpre_list);
-
-                if (re->kpre_lo_nid > nid || re->kpre_hi_nid < nid)
+                kpr_gateway_entry_t *ge = re->kpre_gateway;
+                
+                if (ge->kpge_nalid != gw_nalid ||
+                    ge->kpge_nid != gw_nid ||
+                    (specific && 
+                     (lo != re->kpre_lo_nid || hi != re->kpre_hi_nid)))
                         continue;
 
-                list_del (&re->kpre_list);
-                write_unlock_irqrestore(&kpr_rwlock, flags);
+                rc = 0;
 
+                if (--ge->kpge_refcount == 0) {
+                        list_del (&ge->kpge_list);
+                        PORTAL_FREE (ge, sizeof (*ge));
+                }
+
+                list_del (&re->kpre_list);
                 PORTAL_FREE(re, sizeof (*re));
-                return (0);
+
+                if (specific)
+                        break;
         }
 
         write_unlock_irqrestore(&kpr_rwlock, flags);
-        return (-ENOENT);
+        return (rc);
 }
 
 int
-kpr_get_route(int idx, int *gateway_nalid, ptl_nid_t *gateway_nid,
-              ptl_nid_t *lo_nid, ptl_nid_t *hi_nid)
+kpr_get_route (int idx, int *gateway_nalid, ptl_nid_t *gateway_nid,
+               ptl_nid_t *lo_nid, ptl_nid_t *hi_nid, int *alive)
 {
        struct list_head  *e;
 
        read_lock(&kpr_rwlock);
 
         for (e = kpr_routes.next; e != &kpr_routes; e = e->next) {
-                kpr_route_entry_t *re = list_entry(e, kpr_route_entry_t,
-                                                   kpre_list);
-
+                kpr_route_entry_t   *re = list_entry(e, kpr_route_entry_t,
+                                                     kpre_list);
+                kpr_gateway_entry_t *ge = re->kpre_gateway;
+                
                 if (idx-- == 0) {
-                        *gateway_nalid = re->kpre_gateway_nalid;
-                        *gateway_nid = re->kpre_gateway_nid;
+                        *gateway_nalid = ge->kpge_nalid;
+                        *gateway_nid = ge->kpge_nid;
+                        *alive = ge->kpge_alive;
                         *lo_nid = re->kpre_lo_nid;
                         *hi_nid = re->kpre_hi_nid;
 
index b8c3bec..c5cc1d3 100644 (file)
@@ -50,17 +50,40 @@ typedef struct
 
 typedef struct
 {
+        struct list_head        kpge_list;
+        atomic_t                kpge_weight;
+        time_t                  kpge_timestamp;
+        int                     kpge_alive;
+        int                     kpge_nalid;
+        int                     kpge_refcount;
+        ptl_nid_t               kpge_nid;
+} kpr_gateway_entry_t;
+
+typedef struct
+{
        struct list_head        kpre_list;
-       int                     kpre_gateway_nalid;
-       ptl_nid_t               kpre_gateway_nid;
+        kpr_gateway_entry_t    *kpre_gateway;
        ptl_nid_t               kpre_lo_nid;
         ptl_nid_t               kpre_hi_nid;
 } kpr_route_entry_t;
 
+typedef struct
+{
+        struct tq_struct        kpru_tq;
+        int                     kpru_nal_id;
+        ptl_nid_t               kpru_nid;
+        int                     kpru_alive;
+        time_t                  kpru_when;
+} kpr_upcall_t;
+
 extern int kpr_register_nal (kpr_nal_interface_t *nalif, void **argp);
-extern int kpr_lookup_target (void *arg, ptl_nid_t target_nid, ptl_nid_t *gateway_nidp);
+extern int kpr_lookup_target (void *arg, ptl_nid_t target_nid, int nob, 
+                              ptl_nid_t *gateway_nidp);
+extern kpr_nal_entry_t *kpr_find_nal_entry_locked (int nal_id);
 extern void kpr_forward_packet (void *arg, kpr_fwd_desc_t *fwd);
 extern void kpr_complete_packet (void *arg, kpr_fwd_desc_t *fwd, int error);
+extern void kpr_nal_notify (void *arg, ptl_nid_t peer,
+                            int alive, time_t when);
 extern void kpr_shutdown_nal (void *arg);
 extern void kpr_deregister_nal (void *arg);
 
@@ -69,9 +92,12 @@ extern void kpr_proc_fini (void);
 
 extern int kpr_add_route (int gateway_nal, ptl_nid_t gateway_nid, 
                           ptl_nid_t lo_nid, ptl_nid_t hi_nid);
-extern int kpr_del_route (ptl_nid_t nid);
+extern int kpr_del_route (int gw_nal, ptl_nid_t gw_nid,
+                          ptl_nid_t lo, ptl_nid_t hi);
 extern int kpr_get_route (int idx, int *gateway_nal, ptl_nid_t *gateway_nid, 
-                          ptl_nid_t *lo_nid, ptl_nid_t *hi_nid);
+                          ptl_nid_t *lo_nid, ptl_nid_t *hi_nid, int *alive);
+extern int kpr_sys_notify (int gw_nalid, ptl_nid_t gw_nid,
+                           int alive, time_t when);
 
 extern unsigned long long kpr_fwd_bytes;
 extern unsigned long      kpr_fwd_packets;
index 4d93645..eccf507 100644 (file)
@@ -676,6 +676,7 @@ int Parser_bool (int *b, char *str) {
         if (!strcasecmp (str, "no") ||
             !strcasecmp (str, "n") ||
             !strcasecmp (str, "off") ||
+            !strcasecmp (str, "down") ||
             !strcasecmp (str, "disable"))
         {
                 *b = 0;
@@ -685,6 +686,7 @@ int Parser_bool (int *b, char *str) {
         if (!strcasecmp (str, "yes") ||
             !strcasecmp (str, "y") ||
             !strcasecmp (str, "on") ||
+            !strcasecmp (str, "up") ||
             !strcasecmp (str, "enable"))
         {
                 *b = 1;
index 4a05234..5fe1777 100644 (file)
@@ -60,6 +60,7 @@ typedef struct
 } name2num_t;
 
 static name2num_t nalnames[] = {
+        {"any",         0},
         {"tcp",                SOCKNAL},
         {"toe",                TOENAL},
         {"elan",       QSWNAL},
@@ -95,7 +96,7 @@ ptl_name2nal (char *str)
 {
         name2num_t *e = name2num_lookup_name (nalnames, str);
 
-        return ((e == NULL) ? 0 : e->num);
+        return ((e == NULL) ? -1 : e->num);
 }
 
 static char *
@@ -128,6 +129,49 @@ ptl_gethostbyname(char * hname) {
 }
 
 int
+ptl_parse_port (int *port, char *str)
+{
+        char      *end;
+        
+        *port = strtol (str, &end, 0);
+
+        if (*end == 0 &&                        /* parsed whole string */
+            *port > 0 && *port < 65536)         /* minimal sanity check */
+                return (0);
+        
+        return (-1);
+}
+
+int
+ptl_parse_time (time_t *t, char *str) 
+{
+        char          *end;
+        int            n;
+        struct tm      tm;
+        
+        *t = strtol (str, &end, 0);
+        if (*end == 0) /* parsed whole string */
+                return (0);
+        
+        memset (&tm, 0, sizeof (tm));
+        n = sscanf (str, "%d-%d-%d %d:%d:%d",
+                    &tm.tm_year, &tm.tm_mon, &tm.tm_mday, 
+                    &tm.tm_hour, &tm.tm_min, &tm.tm_sec);
+        if (n != 6)
+                return (-1);
+        
+        tm.tm_mon--;                    /* convert to 0 == Jan */
+        tm.tm_year -= 1900;             /* y2k quirk */
+        tm.tm_isdst = -1;               /* dunno if it's daylight savings... */
+        
+        *t = mktime (&tm);
+        if (*t == (time_t)-1)
+                return (-1);
+                        
+        return (0);
+}
+
+int
 ptl_parse_ipaddr (__u32 *ipaddrp, char *str)
 {
         struct hostent *he;
@@ -220,21 +264,29 @@ ptl_nid2str (char *buffer, ptl_nid_t nid)
         if (he != NULL)
                 strcpy (buffer, he->h_name);
         else
-                sprintf (buffer, "0x"LPX64, nid);
+                sprintf (buffer, LPX64, nid);
         
         return (buffer);
 }
 
-int g_nal_is_compatible (char *cmd, ...)
+int g_nal_is_set () 
 {
-        va_list       ap;
-        int           nal;
-        
         if (g_nal == 0) {
                 fprintf (stderr, "Error: you must run the 'network' command first.\n");
                 return (0);
         }
-        
+
+        return (1);
+}
+
+int g_nal_is_compatible (char *cmd, ...)
+{
+        va_list       ap;
+        int           nal;
+
+        if (!g_nal_is_set ())
+                return (0);
+
         va_start (ap, cmd);
 
         do {
@@ -320,7 +372,7 @@ int jt_ptl_network(int argc, char **argv)
         int         nal;
         
         if (argc == 2 &&
-            (nal = ptl_name2nal (argv[1])) != 0) {
+            (nal = ptl_name2nal (argv[1])) >= 0) {
                 g_nal = nal;
                 return (0);
         }
@@ -353,12 +405,14 @@ jt_ptl_print_autoconnects (int argc, char **argv)
                 if (rc != 0)
                         break;
 
-                printf (LPX64"@%s:%d #%d buffer %d nonagle %s xchg %s affinity %s share %d\n",
+                printf (LPX64"@%s:%d #%d buffer %d nonagle %s xchg %s "
+                        "affinity %s eager %s share %d\n",
                         data.ioc_nid, ptl_ipaddr_2_str (data.ioc_id, buffer),
                         data.ioc_misc, data.ioc_count, data.ioc_size, 
                         (data.ioc_flags & 1) ? "on" : "off",
                         (data.ioc_flags & 2) ? "on" : "off",
                         (data.ioc_flags & 4) ? "on" : "off",
+                        (data.ioc_flags & 8) ? "on" : "off",
                         data.ioc_wait);
         }
 
@@ -377,10 +431,11 @@ jt_ptl_add_autoconnect (int argc, char **argv)
         int                      xchange_nids = 0;
         int                      irq_affinity = 0;
         int                      share = 0;
+        int                      eager = 0;
         int                      rc;
 
         if (argc < 4 || argc > 5) {
-                fprintf (stderr, "usage: %s nid ipaddr port [ixs]\n", argv[0]);
+                fprintf (stderr, "usage: %s nid ipaddr port [ixse]\n", argv[0]);
                 return 0;
         }
 
@@ -398,8 +453,11 @@ jt_ptl_add_autoconnect (int argc, char **argv)
                 return -1;
         }
 
-        port = atol (argv[3]);
-        
+        if (ptl_parse_port (&port, argv[3]) != 0) {
+                fprintf (stderr, "Can't parse port: %s\n", argv[3]);
+                return -1;
+        }
+
         if (argc > 4) {
                 char *opts = argv[4];
                 
@@ -414,6 +472,9 @@ jt_ptl_add_autoconnect (int argc, char **argv)
                         case 's':
                                 share = 1;
                                 break;
+                        case 'e':
+                                eager = 1;
+                                break;
                         default:
                                 fprintf (stderr, "Can't parse options: %s\n",
                                          argv[4]);
@@ -429,10 +490,11 @@ jt_ptl_add_autoconnect (int argc, char **argv)
         data.ioc_misc    = port;
         /* only passing one buffer size! */
         data.ioc_size    = MAX (g_socket_rxmem, g_socket_txmem);
-        data.ioc_flags   = (g_socket_nonagle ? 1 : 0) |
-                           (xchange_nids     ? 2 : 0) |
-                           (irq_affinity     ? 4 : 0) |
-                           (share            ? 8 : 0);
+        data.ioc_flags   = (g_socket_nonagle ? 0x01 : 0) |
+                           (xchange_nids     ? 0x02 : 0) |
+                           (irq_affinity     ? 0x04 : 0) |
+                           (share            ? 0x08 : 0) |
+                           (eager            ? 0x10 : 0);
 
         rc = l_ioctl (PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
         if (rc != 0) {
@@ -532,7 +594,7 @@ jt_ptl_print_connections (int argc, char **argv)
                 if (rc != 0)
                         break;
 
-                printf (LPD64"@%s:%d\n",
+                printf (LPX64"@%s:%d\n",
                         data.ioc_nid, 
                         ptl_ipaddr_2_str (data.ioc_id, buffer),
                         data.ioc_misc);
@@ -646,7 +708,11 @@ int jt_ptl_connect(int argc, char **argv)
                 return -1;
         }
 
-        port = atol(argv[2]);
+        if (ptl_parse_port (&port, argv[2]) != 0) {
+                fprintf (stderr, "Can't parse port: %s\n", argv[2]);
+                return -1;
+        }
+
         if (argc > 3)
                 for (flag = argv[3]; *flag != 0; flag++)
                         switch (*flag)
@@ -902,11 +968,8 @@ int jt_ptl_ping(int argc, char **argv)
                 return 0;
         }
 
-        if (g_nal == 0) {
-                fprintf(stderr, "Error: you must run the 'network' command "
-                        "first.\n");
+        if (!g_nal_is_set())
                 return -1;
-        }
 
         if (ptl_parse_nid (&nid, argv[1]) != 0)
         {
@@ -957,11 +1020,9 @@ int jt_ptl_shownid(int argc, char **argv)
                 return 0;
         }
         
-        if (g_nal == 0) {
-                fprintf(stderr, "Error: you must run the 'network' command first\n");
+        if (!g_nal_is_set())
                 return -1;
-        }
-        
+
         PORTAL_IOC_INIT (data);
         data.ioc_nal = g_nal;
         rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_GET_NID, &data);
@@ -987,11 +1048,8 @@ int jt_ptl_mynid(int argc, char **argv)
                 return 0;
         }
 
-        if (g_nal == 0) {
-                fprintf(stderr, "Error: you must run the 'network' command "
-                        "first.\n");
+        if (!g_nal_is_set())
                 return -1;
-        }
 
         if (argc >= 2)
                 nidstr = argv[1];
@@ -1037,11 +1095,8 @@ jt_ptl_fail_nid (int argc, char **argv)
                 return (0);
         }
         
-        if (g_nal == 0) {
-                fprintf(stderr, "Error: you must run the 'network' command "
-                        "first.\n");
+        if (!g_nal_is_set())
                 return (-1);
-        }
 
         if (!strcmp (argv[1], "_all_"))
                 nid = PTL_NID_ANY;
@@ -1120,7 +1175,7 @@ jt_ptl_nagle (int argc, char **argv)
                 if (Parser_bool (&enable, argv[1]) != 0)
                 {
                         fprintf (stderr, "Can't parse boolean %s\n", argv[1]);
-                        return (0);
+                        return (-1);
                 }
                 g_socket_nonagle = !enable;
         }
@@ -1143,11 +1198,8 @@ jt_ptl_add_route (int argc, char **argv)
                 return (0);
         }
 
-        if (g_nal == 0) {
-                fprintf(stderr, "Error: you must run the 'network' command "
-                        "first.\n");
+        if (!g_nal_is_set())
                 return (-1);
-        }
 
         if (ptl_parse_nid (&gateway_nid, argv[1]) != 0)
         {
@@ -1190,6 +1242,8 @@ jt_ptl_del_route (int argc, char **argv)
 {
         struct portal_ioctl_data data;
         ptl_nid_t                nid;
+        ptl_nid_t                nid1 = PTL_NID_ANY;
+        ptl_nid_t                nid2 = PTL_NID_ANY;
         int                      rc;
         
         if (argc < 2)
@@ -1198,14 +1252,43 @@ jt_ptl_del_route (int argc, char **argv)
                 return (0);
         }
 
+        if (!g_nal_is_set())
+                return (-1);
+
         if (ptl_parse_nid (&nid, argv[1]) != 0)
         {
-                fprintf (stderr, "Can't parse target NID \"%s\"\n", argv[1]);
+                fprintf (stderr, "Can't parse gateway NID \"%s\"\n", argv[1]);
                 return (-1);
         }
 
+        if (argc >= 3 &&
+            ptl_parse_nid (&nid1, argv[2]) != 0)
+        {
+                fprintf (stderr, "Can't parse target NID \"%s\"\n", argv[2]);
+                return (-1);
+        }
+
+        if (argc < 4) {
+                nid2 = nid1;
+        } else {
+                if (ptl_parse_nid (&nid2, argv[3]) != 0) {
+                        fprintf (stderr, "Can't parse target NID \"%s\"\n", argv[3]);
+                        return (-1);
+                }
+
+                if (nid1 > nid2) {
+                        ptl_nid_t tmp = nid1;
+                        
+                        nid1 = nid2;
+                        nid2 = tmp;
+                }
+        }
+        
         PORTAL_IOC_INIT(data);
+        data.ioc_nal = g_nal;
         data.ioc_nid = nid;
+        data.ioc_nid2 = nid1;
+        data.ioc_nid3 = nid2;
 
         rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_DEL_ROUTE, &data);
         if (rc != 0) 
@@ -1218,6 +1301,67 @@ jt_ptl_del_route (int argc, char **argv)
 }
 
 int
+jt_ptl_notify_router (int argc, char **argv)
+{
+        struct portal_ioctl_data data;
+        int                      enable;
+        ptl_nid_t                nid;
+        int                      rc;
+        struct timeval           now;
+        time_t                   when;
+
+        if (argc < 3)
+        {
+                fprintf (stderr, "usage: %s targetNID <up/down> [<time>]\n", 
+                         argv[0]);
+                return (0);
+        }
+
+        if (ptl_parse_nid (&nid, argv[1]) != 0)
+        {
+                fprintf (stderr, "Can't parse target NID \"%s\"\n", argv[1]);
+                return (-1);
+        }
+
+        if (Parser_bool (&enable, argv[2]) != 0) {
+                fprintf (stderr, "Can't parse boolean %s\n", argv[2]);
+                return (-1);
+        }
+
+        gettimeofday(&now, NULL);
+        
+        if (argc < 4) {
+                when = now.tv_sec;
+        } else if (ptl_parse_time (&when, argv[3]) != 0) {
+                fprintf(stderr, "Can't parse time %s\n"
+                        "Please specify either 'YYYY-MM-DD HH:MM:SS'\n"
+                        "or an absolute unix time in seconds\n", argv[3]);
+                return (-1);
+        } else if (when > now.tv_sec) {
+                fprintf (stderr, "%s specifies a time in the future\n",
+                         argv[3]);
+                return (-1);
+        }
+
+        PORTAL_IOC_INIT(data);
+        data.ioc_nal = g_nal;
+        data.ioc_nid = nid;
+        data.ioc_flags = enable;
+        /* Yeuch; 'cept I need a __u64 on 64 bit machines... */
+        data.ioc_nid3 = (__u64)when;
+        
+        rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_NOTIFY_ROUTER, &data);
+        if (rc != 0) 
+        {
+                fprintf (stderr, "IOC_PORTAL_NOTIFY_ROUTER ("LPX64") failed: %s\n",
+                         nid, strerror (errno));
+                return (-1);
+        }
+        
+        return (0);
+}
+
+int
 jt_ptl_print_routes (int argc, char **argv)
 {
         char                      buffer[3][128];
@@ -1228,8 +1372,8 @@ jt_ptl_print_routes (int argc, char **argv)
         ptl_nid_t                gateway_nid;
         ptl_nid_t                nid1;
         ptl_nid_t                nid2;
-        
-        
+        int                       alive;
+
         for (index = 0;;index++)
         {
                 PORTAL_IOC_INIT(data);
@@ -1243,13 +1387,332 @@ jt_ptl_print_routes (int argc, char **argv)
                 gateway_nid = data.ioc_nid;
                 nid1 = data.ioc_nid2;
                 nid2 = data.ioc_nid3;
-                
-                printf ("%8s %18s : %s - %s\n", 
+                alive = data.ioc_flags;
+
+                printf ("%8s %18s : %s - %s, %s\n", 
                         nal2name (gateway_nal), 
                         ptl_nid2str (buffer[0], gateway_nid),
                         ptl_nid2str (buffer[1], nid1),
-                        ptl_nid2str (buffer[2], nid2));
+                        ptl_nid2str (buffer[2], nid2),
+                        alive ? "up" : "down");
         }
         return (0);
 }
 
+static int
+lwt_control(int enable, int clear)
+{
+        struct portal_ioctl_data data;
+        int                      rc;
+
+        PORTAL_IOC_INIT(data);
+        data.ioc_flags = enable;
+        data.ioc_misc = clear;
+
+        rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_LWT_CONTROL, &data);
+        if (rc == 0)
+                return (0);
+
+        fprintf(stderr, "IOC_PORTAL_LWT_CONTROL failed: %s\n",
+                strerror(errno));
+        return (-1);
+}
+
+static int
+lwt_snapshot(int *ncpu, int *totalsize, lwt_event_t *events, int size)
+{
+        struct portal_ioctl_data data;
+        int                      rc;
+
+        PORTAL_IOC_INIT(data);
+        data.ioc_pbuf1 = (char *)events;
+        data.ioc_plen1 = size;
+
+        rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_LWT_SNAPSHOT, &data);
+        if (rc != 0) {
+                fprintf(stderr, "IOC_PORTAL_LWT_SNAPSHOT failed: %s\n",
+                        strerror(errno));
+                return (-1);
+        }
+
+        LASSERT (data.ioc_count != 0);
+        LASSERT (data.ioc_misc != 0);
+        
+        if (ncpu != NULL)
+                *ncpu = data.ioc_count;
+
+        if (totalsize != NULL)
+                *totalsize = data.ioc_misc;
+
+        return (0);
+}
+
+static char *
+lwt_get_string(char *kstr)
+{
+        char                     *ustr;
+        struct portal_ioctl_data  data;
+        int                       size;
+        int                       rc;
+
+        /* FIXME: this could maintain a symbol table since we expect to be
+         * looking up the same strings all the time... */
+
+        PORTAL_IOC_INIT(data);
+        data.ioc_pbuf1 = kstr;
+        data.ioc_plen1 = 1;        /* non-zero just to fool portal_ioctl_is_invalid() */
+        data.ioc_pbuf2 = NULL;
+        data.ioc_plen2 = 0;
+
+        rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_LWT_LOOKUP_STRING, &data);
+        if (rc != 0) {
+                fprintf(stderr, "IOC_PORTAL_LWT_LOOKUP_STRING failed: %s\n",
+                        strerror(errno));
+                return (NULL);
+        }
+
+        size = data.ioc_count;
+        ustr = (char *)malloc(size);
+        if (ustr == NULL) {
+                fprintf(stderr, "Can't allocate string storage of size %d\n",
+                        size);
+                return (NULL);
+        }
+
+        PORTAL_IOC_INIT(data);
+        data.ioc_pbuf1 = kstr;
+        data.ioc_plen1 = 1;        /* non-zero just to fool portal_ioctl_is_invalid() */
+        data.ioc_pbuf2 = ustr;
+        data.ioc_plen2 = size;
+
+        rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_LWT_LOOKUP_STRING, &data);
+        if (rc != 0) {
+                fprintf(stderr, "IOC_PORTAL_LWT_LOOKUP_STRING failed: %s\n",
+                        strerror(errno));
+                return (NULL);
+        }
+
+        LASSERT(strlen(ustr) == size - 1);
+        return (ustr);
+}
+
+static void
+lwt_put_string(char *ustr)
+{
+        free(ustr);
+}
+
+static int
+lwt_print(FILE *f, cycles_t t0, cycles_t tlast, double mhz, int cpu, lwt_event_t *e)
+{
+        char            whenstr[32];
+        char           *where = lwt_get_string(e->lwte_where);
+
+        if (where == NULL)
+                return (-1);
+
+        sprintf(whenstr, LPD64, e->lwte_when - t0);
+
+        fprintf(f, "%#010lx %#010lx %#010lx %#010lx: %#010lx %1d %10.6f %10.2f %s\n",
+                e->lwte_p1, e->lwte_p2, e->lwte_p3, e->lwte_p4,
+                (long)e->lwte_task, cpu, (e->lwte_when - t0) / (mhz * 1000000.0),
+                (t0 == e->lwte_when) ? 0.0 : (e->lwte_when - tlast) / mhz,
+                where);
+
+        lwt_put_string(where);
+
+        return (0);
+}
+
+double
+get_cycles_per_usec ()
+{
+        FILE      *f = fopen ("/proc/cpuinfo", "r");
+        double     mhz;
+        char      line[64];
+        
+        if (f != NULL) {
+                while (fgets (line, sizeof (line), f) != NULL)
+                        if (sscanf (line, "cpu MHz : %lf", &mhz) == 1) {
+                                fclose (f);
+                                return (mhz);
+                        }
+                fclose (f);
+        }
+
+        fprintf (stderr, "Can't read/parse /proc/cpuinfo\n");
+        return (1000.0);
+}
+
+int
+jt_ptl_lwt(int argc, char **argv)
+{
+#define MAX_CPUS 8
+        int             ncpus;
+        int             totalspace;
+        int             nevents_per_cpu;
+        lwt_event_t    *events;
+        lwt_event_t    *cpu_event[MAX_CPUS + 1];
+        lwt_event_t    *next_event[MAX_CPUS];
+        lwt_event_t    *first_event[MAX_CPUS];
+        int             cpu;
+        lwt_event_t    *e;
+        int             rc;
+        int             i;
+        double          mhz;
+        cycles_t        t0;
+        cycles_t        tlast;
+        FILE           *f = stdout;
+
+        if (argc < 2 ||
+            (strcmp(argv[1], "start") &&
+             strcmp(argv[1], "stop"))) {
+                fprintf(stderr, 
+                        "usage:  %s start\n"
+                        "        %s stop [fname]\n", argv[0], argv[0]);
+                return (-1);
+        }
+        
+        if (!strcmp(argv[1], "start")) {
+                /* disable */
+                if (lwt_control(0, 0) != 0)
+                        return (-1);
+
+                /* clear */
+                if (lwt_control(0, 1) != 0)
+                        return (-1);
+
+                /* enable */
+                if (lwt_control(1, 0) != 0)
+                        return (-1);
+
+                return (0);
+        }
+                
+        if (lwt_snapshot(&ncpus, &totalspace, NULL, 0) != 0)
+                return (-1);
+
+        if (ncpus > MAX_CPUS) {
+                fprintf(stderr, "Too many cpus: %d (%d)\n", ncpus, MAX_CPUS);
+                return (-1);
+        }
+
+        events = (lwt_event_t *)malloc(totalspace);
+        if (events == NULL) {
+                fprintf(stderr, "Can't allocate %d\n", totalspace);
+                return (-1);
+        }
+
+        if (lwt_control(0, 0) != 0) {           /* disable */
+                free(events);
+                return (-1);
+        }
+
+        if (lwt_snapshot(NULL, NULL, events, totalspace)) {
+                free(events);
+                return (-1);
+        }
+
+        if (argc > 2) {
+                f = fopen (argv[2], "w");
+                if (f == NULL) {
+                        fprintf(stderr, "Can't open %s for writing: %s\n", argv[2], strerror (errno));
+                        free(events);
+                        return (-1);
+                }
+        }
+
+        mhz = get_cycles_per_usec();
+        
+        /* carve events into per-cpu slices */
+        nevents_per_cpu = totalspace / (ncpus * sizeof(lwt_event_t));
+        for (cpu = 0; cpu <= ncpus; cpu++)
+                cpu_event[cpu] = &events[cpu * nevents_per_cpu];
+
+        /* find the earliest event on each cpu */
+        for (cpu = 0; cpu < ncpus; cpu++) {
+                first_event[cpu] = NULL;
+
+                for (e = cpu_event[cpu]; e < cpu_event[cpu + 1]; e++) {
+
+                        if (e->lwte_where == NULL) /* not an event */
+                                continue;
+
+                        if (first_event[cpu] == NULL ||
+                            first_event[cpu]->lwte_when > e->lwte_when)
+                                first_event[cpu] = e;
+                }
+
+                next_event[cpu] = first_event[cpu];
+        }
+
+        t0 = tlast = 0;
+        for (cpu = 0; cpu < ncpus; cpu++) {
+                e = first_event[cpu];
+                if (e == NULL)                  /* no events this cpu */
+                        continue;
+                
+                if (e == cpu_event[cpu])
+                        e = cpu_event[cpu + 1] - 1;
+                else 
+                        e = e - 1;
+                
+                /* If there's an event immediately before the first one, this
+                 * cpu wrapped its event buffer */
+                if (e->lwte_where == NULL)
+                        continue;
+         
+                /* We should only start outputting events from the most recent
+                 * first event in any wrapped cpu.  Events before this time on
+                 * other cpus won't have any events from this CPU to interleave
+                 * with. */
+                if (t0 < first_event[cpu]->lwte_when)
+                        t0 = first_event[cpu]->lwte_when;
+        }
+
+        for (;;) {
+                /* find which cpu has the next event */
+                cpu = -1;
+                for (i = 0; i < ncpus; i++) {
+
+                        if (next_event[i] == NULL) /* this cpu exhausted */
+                                continue;
+
+                        if (cpu < 0 ||
+                            next_event[i]->lwte_when < next_event[cpu]->lwte_when)
+                                cpu = i;
+                }
+
+                if (cpu < 0)                    /* all cpus exhausted */
+                        break;
+
+                if (t0 == 0) {
+                        /* no wrapped cpus and this is he first ever event */
+                        t0 = next_event[cpu]->lwte_when;
+                }
+                
+                if (t0 <= next_event[cpu]->lwte_when) {
+                        /* on or after the first event */
+                        rc = lwt_print(f, t0, tlast, mhz, cpu, next_event[cpu]);
+                        if (rc != 0)
+                                break;
+                }
+
+                tlast = next_event[cpu]->lwte_when;
+                
+                next_event[cpu]++;
+                if (next_event[cpu] == cpu_event[cpu + 1])
+                        next_event[cpu] = cpu_event[cpu];
+
+                if (next_event[cpu]->lwte_where == NULL ||
+                    next_event[cpu] == first_event[cpu])
+                        next_event[cpu] = NULL;
+        }
+
+        if (f != stdout)
+                fclose(f);
+
+        free(events);
+        return (0);
+#undef MAX_CPUS
+}
index c083e48..1a8e637 100644 (file)
@@ -31,7 +31,7 @@
 command_t list[] = {
         {"network", jt_ptl_network, 0,"setup the NAL (args: nal name)"},
         {"print_autoconns", jt_ptl_print_autoconnects, 0, "print autoconnect entries (no args)"},
-        {"add_autoconn", jt_ptl_add_autoconnect, 0, "add autoconnect entry (args: nid host [ixs])"},
+        {"add_autoconn", jt_ptl_add_autoconnect, 0, "add autoconnect entry (args: nid host [ixse])"},
         {"del_autoconn", jt_ptl_del_autoconnect, 0, "delete autoconnect entry (args: [nid] [host] [ks])"},
         {"print_conns", jt_ptl_print_connections, 0, "print connections (no args)"},
         {"connect", jt_ptl_connect, 0, "connect to a remote nid (args: host port [xi])"},
@@ -41,8 +41,12 @@ command_t list[] = {
         {"ping", jt_ptl_ping, 0, "do a ping test (args: nid [count] [size] [timeout])"},
         {"shownid", jt_ptl_shownid, 0, "print the local NID"},
         {"mynid", jt_ptl_mynid, 0, "inform the socknal of the local NID (args: [hostname])"},
-        {"add_route", jt_ptl_add_route, 0, "add an entry to the routing table (args: gatewayNID targetNID [targetNID])"},
-        {"del_route", jt_ptl_del_route, 0, "delete an entry from the routing table (args: targetNID"},
+        {"add_route", jt_ptl_add_route, 0, 
+         "add an entry to the routing table (args: gatewayNID targetNID [targetNID])"},
+        {"del_route", jt_ptl_del_route, 0, 
+         "delete all routes via a gateway from the routing table (args: gatewayNID"},
+        {"set_route", jt_ptl_notify_router, 0, 
+         "enable/disable a route in the routing table (args: gatewayNID up/down [time]"},
         {"print_routes", jt_ptl_print_routes, 0, "print the routing table (args: none)"},
         {"recv_mem", jt_ptl_rxmem, 0, "Set socket receive buffer size (args: [size])"},
         {"send_mem", jt_ptl_txmem, 0, "Set socket send buffer size (args: [size])"},
index 750d16c..0a1cc57 100644 (file)
@@ -371,12 +371,14 @@ typedef struct {
 } kpr_fwd_desc_t;
 
 typedef void  (*kpr_fwd_t)(void *arg, kpr_fwd_desc_t *fwd);
+typedef void  (*kpr_notify_t)(void *arg, ptl_nid_t peer, int alive);
 
 /* NAL's routing interface (Kernel Portals Routing Nal Interface) */
 typedef const struct {
         int             kprni_nalid;    /* NAL's id */
         void           *kprni_arg;      /* Arg to pass when calling into NAL */
         kpr_fwd_t       kprni_fwd;      /* NAL's forwarding entrypoint */
+        kpr_notify_t    kprni_notify;   /* NAL's notification entrypoint */
 } kpr_nal_interface_t;
 
 /* Router's routing interface (Kernel Portals Routing Router Interface) */
@@ -386,9 +388,10 @@ typedef const struct {
         int     (*kprri_register) (kpr_nal_interface_t *nal_interface,
                                    void **router_arg);
 
-        /* ask the router to find a gateway that forwards to 'nid' and is a peer
-         * of the calling NAL */
-        int     (*kprri_lookup) (void *router_arg, ptl_nid_t nid,
+        /* ask the router to find a gateway that forwards to 'nid' and is a
+         * peer of the calling NAL; assume caller will send 'nob' bytes of
+         * payload there */
+        int     (*kprri_lookup) (void *router_arg, ptl_nid_t nid, int nob,
                                  ptl_nid_t *gateway_nid);
 
         /* hand a packet over to the router for forwarding */
@@ -398,6 +401,10 @@ typedef const struct {
         void    (*kprri_fwd_done) (void *router_arg, kpr_fwd_desc_t *fwd,
                                    int error);
 
+        /* notify the router about peer state */
+        void    (*kprri_notify) (void *router_arg, ptl_nid_t peer,
+                                 int alive, time_t when);
+
         /* the calling NAL is shutting down */
         void    (*kprri_shutdown) (void *router_arg);
 
@@ -416,10 +423,14 @@ typedef struct {
 typedef const struct {
         int     (*kprci_add_route)(int gateway_nal, ptl_nid_t gateway_nid,
                                    ptl_nid_t lo_nid, ptl_nid_t hi_nid);
-        int     (*kprci_del_route)(ptl_nid_t nid);
+        int     (*kprci_del_route)(int gateway_nal, ptl_nid_t gateway_nid,
+                                   ptl_nid_t lo_nid, ptl_nid_t hi_nid);
         int     (*kprci_get_route)(int index, int *gateway_nal,
-                                   ptl_nid_t *gateway, ptl_nid_t *lo_nid,
-                                   ptl_nid_t *hi_nid);
+                                   ptl_nid_t *gateway,
+                                   ptl_nid_t *lo_nid, ptl_nid_t *hi_nid,
+                                   int *alive);
+        int     (*kprci_notify)(int gateway_nal, ptl_nid_t gateway_nid, 
+                                int alive, time_t when);
 } kpr_control_interface_t;
 
 extern kpr_control_interface_t  kpr_control_interface;
@@ -449,12 +460,12 @@ kpr_routing (kpr_router_t *router)
 }
 
 static inline int
-kpr_lookup (kpr_router_t *router, ptl_nid_t nid, ptl_nid_t *gateway_nid)
+kpr_lookup (kpr_router_t *router, ptl_nid_t nid, int nob, ptl_nid_t *gateway_nid)
 {
         if (!kpr_routing (router))
-                return (-EHOSTUNREACH);
+                return (-ENETUNREACH);
 
-        return (router->kpr_interface->kprri_lookup(router->kpr_arg, nid,
+        return (router->kpr_interface->kprri_lookup(router->kpr_arg, nid, nob,
                                                     gateway_nid));
 }
 
@@ -476,7 +487,7 @@ static inline void
 kpr_fwd_start (kpr_router_t *router, kpr_fwd_desc_t *fwd)
 {
         if (!kpr_routing (router))
-                fwd->kprfd_callback (fwd->kprfd_callback_arg, -EHOSTUNREACH);
+                fwd->kprfd_callback (fwd->kprfd_callback_arg, -ENETUNREACH);
         else
                 router->kpr_interface->kprri_fwd_start (router->kpr_arg, fwd);
 }
@@ -489,6 +500,16 @@ kpr_fwd_done (kpr_router_t *router, kpr_fwd_desc_t *fwd, int error)
 }
 
 static inline void
+kpr_notify (kpr_router_t *router, 
+            ptl_nid_t peer, int alive, time_t when)
+{
+        if (!kpr_routing (router))
+                return;
+        
+        router->kpr_interface->kprri_notify(router->kpr_arg, peer, alive, when);
+}
+
+static inline void
 kpr_shutdown (kpr_router_t *router)
 {
         if (kpr_routing (router))
@@ -554,6 +575,7 @@ extern struct prof_ent prof_ents[MAX_PROFS];
 #endif /* PORTALS_PROFILING */
 
 /* debug.c */
+void portals_run_upcall(char **argv);
 void portals_run_lbug_upcall(char * file, const char *fn, const int line);
 void portals_debug_dumplog(void);
 int portals_debug_init(unsigned long bufsize);
@@ -622,6 +644,92 @@ extern void kportal_blockallsigs (void);
 # define CURRENT_TIME time(0)
 #endif
 
+/******************************************************************************/
+/* Light-weight trace 
+ * Support for temporary event tracing with minimal Heisenberg effect. */
+#define LWT_SUPPORT  1
+
+typedef struct {
+        cycles_t    lwte_when;
+        char       *lwte_where;
+        void       *lwte_task;
+        long        lwte_p1;
+        long        lwte_p2;
+        long        lwte_p3;
+        long        lwte_p4;
+} lwt_event_t;
+
+#if LWT_SUPPORT
+#ifdef __KERNEL__
+#define LWT_EVENTS_PER_PAGE (PAGE_SIZE / sizeof (lwt_event_t))
+
+typedef struct _lwt_page {
+        struct list_head     lwtp_list;
+        struct page         *lwtp_page;
+        lwt_event_t         *lwtp_events;
+} lwt_page_t;
+
+typedef struct {
+        int                lwtc_current_index;
+        lwt_page_t        *lwtc_current_page;
+} lwt_cpu_t;
+
+extern int       lwt_enabled;
+extern lwt_cpu_t lwt_cpus[];
+
+extern int  lwt_init (void);
+extern void lwt_fini (void);
+extern int  lwt_lookup_string (int *size, char *knlptr,
+                               char *usrptr, int usrsize);
+extern int  lwt_control (int enable, int clear);
+extern int  lwt_snapshot (int *ncpu, int *total_size,
+                          void *user_ptr, int user_size);
+
+/* Note that we _don't_ define LWT_EVENT at all if LWT_SUPPORT isn't set.
+ * This stuff is meant for finding specific problems; it never stays in
+ * production code... */
+
+#define LWTSTR(n)      #n
+#define LWTWHERE(f,l)   f ":" LWTSTR(l)
+
+#define LWT_EVENT(p1, p2, p3, p4)                                       \
+do {                                                                    \
+        unsigned long    flags;                                         \
+        lwt_cpu_t       *cpu;                                           \
+        lwt_page_t      *p;                                             \
+        lwt_event_t     *e;                                             \
+                                                                        \
+        local_irq_save (flags);                                         \
+                                                                        \
+        if (lwt_enabled) {                                              \
+                cpu = &lwt_cpus[smp_processor_id()];                    \
+                p = cpu->lwtc_current_page;                             \
+                e = &p->lwtp_events[cpu->lwtc_current_index++];         \
+                                                                        \
+                if (cpu->lwtc_current_index >= LWT_EVENTS_PER_PAGE) {   \
+                        cpu->lwtc_current_page =                        \
+                                list_entry (p->lwtp_list.next,          \
+                                            lwt_page_t, lwtp_list);     \
+                        cpu->lwtc_current_index = 0;                    \
+                }                                                       \
+                                                                        \
+                e->lwte_when  = get_cycles();                           \
+                e->lwte_where = LWTWHERE(__FILE__,__LINE__);            \
+                e->lwte_task  = current;                                \
+                e->lwte_p1    = (long)(p1);                             \
+                e->lwte_p2    = (long)(p2);                             \
+                e->lwte_p3    = (long)(p3);                             \
+                e->lwte_p4    = (long)(p4);                             \
+        }                                                               \
+                                                                        \
+        local_irq_restore (flags);                                      \
+} while (0)
+#else  /* __KERNEL__ */
+#define LWT_EVENT(p1,p2,p3,p4)     /* no userland implementation yet */
+#endif /* __KERNEL__ */
+#endif /* LWT_SUPPORT */
+
+
 #include <linux/portals_lib.h>
 
 /*
@@ -856,8 +964,11 @@ static inline int portal_ioctl_getdata(char *buf, char *end, void *arg)
 #define IOC_PORTAL_GET_NID                 _IOWR('e', 39, long)
 #define IOC_PORTAL_FAIL_NID                _IOWR('e', 40, long)
 #define IOC_PORTAL_SET_DAEMON              _IOWR('e', 41, long)
-
-#define IOC_PORTAL_MAX_NR               41
+#define IOC_PORTAL_NOTIFY_ROUTER           _IOWR('e', 42, long)
+#define IOC_PORTAL_LWT_CONTROL             _IOWR('e', 43, long)
+#define IOC_PORTAL_LWT_SNAPSHOT            _IOWR('e', 44, long)
+#define IOC_PORTAL_LWT_LOOKUP_STRING       _IOWR('e', 45, long)
+#define IOC_PORTAL_MAX_NR                             45
 
 enum {
         QSWNAL  =  1,
index 8278111..7763f1b 100644 (file)
@@ -54,8 +54,10 @@ int jt_ptl_txmem (int argc, char **argv);
 int jt_ptl_nagle (int argc, char **argv);
 int jt_ptl_add_route (int argc, char **argv);
 int jt_ptl_del_route (int argc, char **argv);
+int jt_ptl_notify_router (int argc, char **argv);
 int jt_ptl_print_routes (int argc, char **argv);
 int jt_ptl_fail_nid (int argc, char **argv);
+int jt_ptl_lwt(int argc, char **argv);
 
 int dbg_initialize(int argc, char **argv);
 int jt_dbg_filter(int argc, char **argv);
index b5e1e39..0841d64 100644 (file)
@@ -32,6 +32,7 @@ kpr_nal_interface_t kqswnal_router_interface = {
        kprni_nalid:    QSWNAL,
        kprni_arg:      NULL,
        kprni_fwd:      kqswnal_fwd_packet,
+       kprni_notify:   NULL,                   /* we're connectionless */
 };
 
 
index 294f3e5..0d8e2fa 100644 (file)
@@ -164,6 +164,7 @@ typedef struct
         void             *ktx_args[2];          /* completion passthru */
         E3_Addr           ktx_ebuffer;          /* elan address of ktx_buffer */
         char             *ktx_buffer;           /* pre-allocated contiguous buffer for hdr + small payloads */
+        unsigned long     ktx_launchtime;       /* when (in jiffies) the transmit was launched */
 
         /* debug/info fields */
         pid_t             ktx_launcher;         /* pid of launching process */
index 8d5f70b..99f299f 100644 (file)
@@ -421,7 +421,9 @@ static void
 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 {
         kqswnal_tx_t      *ktx = (kqswnal_tx_t *)arg;
-
+        struct timeval     now;
+        time_t             then;
+        
         LASSERT (txd != NULL);
         LASSERT (ktx != NULL);
 
@@ -432,7 +434,15 @@ kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
 
         if (status != EP_SUCCESS)
         {
-                CERROR ("kqswnal: Transmit failed with %d\n", status);
+                CERROR ("Tx completion to "LPX64" failed: %d\n", 
+                        ktx->ktx_nid, status);
+
+                do_gettimeofday (&now);
+                then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
+        
+                kpr_notify (&kqswnal_data.kqn_router, 
+                            ktx->ktx_nid, 0, then);
+
                 status = -EIO;
         }
 
@@ -447,33 +457,40 @@ kqswnal_launch (kqswnal_tx_t *ktx)
         int   dest = kqswnal_nid2elanid (ktx->ktx_nid);
         long  flags;
         int   rc;
-        
+
+        ktx->ktx_launchtime = jiffies;
+
         LASSERT (dest >= 0);                    /* must be a peer */
         rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
                                ktx->ktx_port, attr, kqswnal_txhandler,
                                ktx, ktx->ktx_iov, ktx->ktx_niov);
-        if (rc == 0)
+        switch (rc) {
+        case 0: /* success */
                 atomic_inc (&kqswnal_packets_launched);
+                return (0);
 
-        if (rc != ENOMEM)
-                return (rc);
+        case ENOMEM: /* can't allocate ep txd => queue for later */
+                LASSERT (in_interrupt());
 
-        /* can't allocate ep txd => queue for later */
+                spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
 
-        LASSERT (in_interrupt());      /* not called by thread (not looping) */
+                list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
+                if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
+                        wake_up (&kqswnal_data.kqn_sched_waitq);
 
-        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
+                spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+                return (0);
 
-        list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
-        if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
-                wake_up (&kqswnal_data.kqn_sched_waitq);
-
-        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
+        default: /* fatal error */
+                CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
 
-        return (0);
+                /* Tell router I think a node is down */
+                kpr_notify (&kqswnal_data.kqn_router, ktx->ktx_nid,
+                            0, ktx->ktx_launchtime);
+                return (rc);
+        }
 }
 
-
 static char *
 hdr_type_string (ptl_hdr_t *hdr)
 {
@@ -584,7 +601,8 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         }
 
         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
-                rc = kpr_lookup (&kqswnal_data.kqn_router, nid, &gatewaynid);
+                rc = kpr_lookup (&kqswnal_data.kqn_router, nid, 
+                                 sizeof (ptl_hdr_t) + payload_nob, &gatewaynid);
                 if (rc != 0) {
                         CERROR("Can't route to "LPX64": router error %d\n",
                                nid, rc);
@@ -678,7 +696,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 return (PTL_FAIL);
         }
 
-        CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
+        CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
         return (PTL_OK);
 }
 
index e7232a0..c844dd6 100644 (file)
@@ -37,8 +37,34 @@ kpr_nal_interface_t ksocknal_router_interface = {
         kprni_nalid:      SOCKNAL,
         kprni_arg:        &ksocknal_data,
         kprni_fwd:        ksocknal_fwd_packet,
+        kprni_notify:     ksocknal_notify,
 };
 
+#define SOCKNAL_SYSCTL 200
+
+#define SOCKNAL_SYSCTL_TIMEOUT     1
+#define SOCKNAL_SYSCTL_EAGER_ACK   2
+#define SOCKNAL_SYSCTL_ZERO_COPY   3
+
+static ctl_table ksocknal_ctl_table[] = {
+        {SOCKNAL_SYSCTL_TIMEOUT, "timeout", 
+         &ksocknal_data.ksnd_io_timeout, sizeof (int),
+         0644, NULL, &proc_dointvec},
+        {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack", 
+         &ksocknal_data.ksnd_eager_ack, sizeof (int),
+         0644, NULL, &proc_dointvec},
+#if SOCKNAL_ZC
+        {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy", 
+         &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
+         0644, NULL, &proc_dointvec},
+#endif
+        { 0 }
+};
+
+static ctl_table ksocknal_top_ctl_table[] = {
+        {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
+        { 0 }
+};
 
 int
 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
@@ -172,7 +198,7 @@ ksocknal_bind_irq (unsigned int irq)
 
 ksock_route_t *
 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
-                       int irq_affinity, int xchange_nids, int nonagel)
+                       int nonagel, int xchange_nids, int irq_affinity, int eager)
 {
         ksock_route_t *route;
 
@@ -183,7 +209,7 @@ ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
         atomic_set (&route->ksnr_refcount, 1);
         route->ksnr_sharecount = 0;
         route->ksnr_peer = NULL;
-        route->ksnr_timeout = jiffies_64;
+        route->ksnr_timeout = jiffies;
         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
         route->ksnr_ipaddr = ipaddr;
         route->ksnr_port = port;
@@ -191,6 +217,7 @@ ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
         route->ksnr_irq_affinity = irq_affinity;
         route->ksnr_xchange_nids = xchange_nids;
         route->ksnr_nonagel = nonagel;
+        route->ksnr_eager = eager;
         route->ksnr_connecting = 0;
         route->ksnr_deleted = 0;
         route->ksnr_generation = 0;
@@ -371,7 +398,8 @@ ksocknal_get_route_by_idx (int index)
 
 int
 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
-                    int nonagle, int xchange_nids, int bind_irq, int share)
+                    int nonagle, int xchange_nids, int bind_irq, 
+                    int share, int eager)
 {
         unsigned long      flags;
         ksock_peer_t      *peer;
@@ -388,8 +416,8 @@ ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
         if (peer == NULL)
                 return (-ENOMEM);
 
-        route = ksocknal_create_route (ipaddr, port, bufnob,
-                                       nonagle, xchange_nids, bind_irq);
+        route = ksocknal_create_route (ipaddr, port, bufnob, nonagle,
+                                       xchange_nids, bind_irq, eager);
         if (route == NULL) {
                 ksocknal_put_peer (peer);
                 return (-ENOMEM);
@@ -454,7 +482,7 @@ ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
 
         if (conn != NULL) {
                 if (!keep_conn)
-                        ksocknal_close_conn_locked (conn);
+                        ksocknal_close_conn_locked (conn, 0);
                 else {
                         /* keeping the conn; just dissociate it and route... */
                         conn->ksnc_route = NULL;
@@ -568,14 +596,12 @@ ksocknal_get_peer_addr (ksock_conn_t *conn)
         struct sockaddr_in sin;
         int                len = sizeof (sin);
         int                rc;
-
-        rc = ksocknal_getconnsock (conn);
-        LASSERT (rc == 0);
         
         rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
                                             (struct sockaddr *)&sin, &len, 2);
+        /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
+        LASSERT (!conn->ksnc_closing);
         LASSERT (len <= sizeof (sin));
-        ksocknal_putconnsock (conn);
 
         if (rc != 0) {
                 CERROR ("Error %d getting sock peer IP\n", rc);
@@ -590,12 +616,8 @@ unsigned int
 ksocknal_conn_irq (ksock_conn_t *conn)
 {
         int                irq = 0;
-        int                rc;
         struct dst_entry  *dst;
 
-        rc = ksocknal_getconnsock (conn);
-        LASSERT (rc == 0);
-        
         dst = sk_dst_get (conn->ksnc_sock->sk);
         if (dst != NULL) {
                 if (dst->dev != NULL) {
@@ -608,7 +630,8 @@ ksocknal_conn_irq (ksock_conn_t *conn)
                 dst_release (dst);
         }
         
-        ksocknal_putconnsock (conn);
+        /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
+        LASSERT (!conn->ksnc_closing);
         return (irq);
 }
 
@@ -660,11 +683,13 @@ ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
         int                rc;
 
         /* NB, sock has an associated file since (a) this connection might
-         * have been created in userland and (b) we need the refcounting so
-         * that we don't close the socket while I/O is being done on it. */
+         * have been created in userland and (b) we need to refcount the
+         * socket so that we don't close it while I/O is being done on
+         * it, and sock->file has that pre-cooked... */
         LASSERT (sock->file != NULL);
+        LASSERT (file_count(sock->file) > 0);
 
-        rc = ksocknal_set_linger (sock);
+        rc = ksocknal_setup_sock (sock);
         if (rc != 0)
                 return (rc);
 
@@ -696,9 +721,6 @@ ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
         ksocknal_new_packet (conn, 0);
 
         INIT_LIST_HEAD (&conn->ksnc_tx_queue);
-#if SOCKNAL_ZC
-        INIT_LIST_HEAD (&conn->ksnc_tx_pending);
-#endif
         conn->ksnc_tx_ready = 0;
         conn->ksnc_tx_scheduled = 0;
         atomic_set (&conn->ksnc_tx_nob, 0);
@@ -753,6 +775,8 @@ ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
 
         conn->ksnc_peer = peer;
         atomic_inc (&peer->ksnp_refcount);
+        peer->ksnp_last_alive = jiffies;
+        peer->ksnp_error = 0;
 
         list_add (&conn->ksnc_list, &peer->ksnp_conns);
         atomic_inc (&conn->ksnc_refcount);
@@ -797,7 +821,7 @@ ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
 }
 
 void
-ksocknal_close_conn_locked (ksock_conn_t *conn)
+ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
 {
         /* This just does the immmediate housekeeping, and queues the
          * connection for the reaper to terminate. 
@@ -805,6 +829,7 @@ ksocknal_close_conn_locked (ksock_conn_t *conn)
         ksock_peer_t   *peer = conn->ksnc_peer;
         ksock_route_t  *route;
 
+        LASSERT (peer->ksnp_error == 0);
         LASSERT (!conn->ksnc_closing);
         conn->ksnc_closing = 1;
         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
@@ -825,11 +850,16 @@ ksocknal_close_conn_locked (ksock_conn_t *conn)
         /* ksnd_deathrow_conns takes over peer's ref */
         list_del (&conn->ksnc_list);
 
-        if (list_empty (&peer->ksnp_conns) &&
-            list_empty (&peer->ksnp_routes)) {
-                /* I've just closed last conn belonging to a
-                 * non-autoconnecting peer */
-                ksocknal_unlink_peer_locked (peer);
+        if (list_empty (&peer->ksnp_conns)) {
+                /* No more connections to this peer */
+
+                peer->ksnp_error = error;       /* stash last conn close reason */
+
+                if (list_empty (&peer->ksnp_routes)) {
+                        /* I've just closed last conn belonging to a
+                         * non-autoconnecting peer */
+                        ksocknal_unlink_peer_locked (peer);
+                }
         }
 
         spin_lock (&ksocknal_data.ksnd_reaper_lock);
@@ -842,7 +872,7 @@ ksocknal_close_conn_locked (ksock_conn_t *conn)
 }
 
 int
-ksocknal_close_conn_unlocked (ksock_conn_t *conn) 
+ksocknal_close_conn_unlocked (ksock_conn_t *conn, int why
 {
         unsigned long flags;
         int           did_it = 0;
@@ -851,7 +881,7 @@ ksocknal_close_conn_unlocked (ksock_conn_t *conn)
                 
         if (!conn->ksnc_closing) {
                 did_it = 1;
-                ksocknal_close_conn_locked (conn);
+                ksocknal_close_conn_locked (conn, why);
         }
         
         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
@@ -867,6 +897,10 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
          * ksnc_refcount will eventually hit zero, and then the reaper will
          * destroy it. */
         unsigned long   flags;
+        ksock_peer_t   *peer = conn->ksnc_peer;
+        struct timeval  now;
+        time_t          then = 0;
+        int             notify = 0;
 
         /* serialise with callbacks */
         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
@@ -886,6 +920,16 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
 
         conn->ksnc_scheduler->kss_nconns--;
 
+        if (peer->ksnp_error != 0) {
+                /* peer's last conn closed in error */
+                LASSERT (list_empty (&peer->ksnp_conns));
+                
+                /* convert peer's last-known-alive timestamp from jiffies */
+                do_gettimeofday (&now);
+                then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
+                notify = 1;
+        }
+        
         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
 
         /* The socket is closed on the final put; either here, or in
@@ -894,6 +938,10 @@ ksocknal_terminate_conn (ksock_conn_t *conn)
          * immediately, aborting anything buffered in it. Any hung
          * zero-copy transmits will therefore complete in finite time. */
         ksocknal_putconnsock (conn);
+
+        if (notify)
+                kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
+                            0, then);
 }
 
 void
@@ -906,9 +954,7 @@ ksocknal_destroy_conn (ksock_conn_t *conn)
         LASSERT (conn->ksnc_route == NULL);
         LASSERT (!conn->ksnc_tx_scheduled);
         LASSERT (!conn->ksnc_rx_scheduled);
-#if SOCKNAL_ZC
-        LASSERT (list_empty (&conn->ksnc_tx_pending));
-#endif
+
         /* complete queued packets */
         while (!list_empty (&conn->ksnc_tx_queue)) {
                 ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
@@ -1010,7 +1056,7 @@ ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
                                         continue;
 
                                 rc = 0;
-                                ksocknal_close_conn_locked (conn);
+                                ksocknal_close_conn_locked (conn, 0);
                         }
                 }
         }
@@ -1020,6 +1066,24 @@ ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
         return (rc);
 }
 
+void
+ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
+{
+        /* The router is telling me she's been notified of a change in
+         * gateway state.... */
+
+        CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
+
+        if (!alive) {
+                /* If the gateway crashed, close all open connections... */
+                ksocknal_close_conn (gw_nid, 0);
+                return;
+        }
+        
+        /* ...otherwise do nothing.  We can only establish new connections
+         * if we have autroutes, and these connect on demand. */
+}
+
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 struct tcp_opt *sock2tcp_opt(struct sock *sk)
 {
@@ -1177,7 +1241,8 @@ ksocknal_cmd(struct portal_ioctl_data * data, void * private)
                         data->ioc_wait  = route->ksnr_sharecount;
                         data->ioc_flags = (route->ksnr_nonagel      ? 1 : 0) |
                                           (route->ksnr_xchange_nids ? 2 : 0) |
-                                          (route->ksnr_irq_affinity ? 4 : 0);
+                                          (route->ksnr_irq_affinity ? 4 : 0) |
+                                          (route->ksnr_eager        ? 8 : 0);
                         ksocknal_put_route (route);
                 }
                 break;
@@ -1185,10 +1250,11 @@ ksocknal_cmd(struct portal_ioctl_data * data, void * private)
         case NAL_CMD_ADD_AUTOCONN: {
                 rc = ksocknal_add_route (data->ioc_nid, data->ioc_id,
                                          data->ioc_misc, data->ioc_size,
-                                         (data->ioc_flags & 1) != 0,
-                                         (data->ioc_flags & 2) != 0,
-                                         (data->ioc_flags & 4) != 0,
-                                         (data->ioc_flags & 8) != 0);
+                                         (data->ioc_flags & 0x01) != 0,
+                                         (data->ioc_flags & 0x02) != 0,
+                                         (data->ioc_flags & 0x04) != 0,
+                                         (data->ioc_flags & 0x08) != 0,
+                                         (data->ioc_flags & 0x10) != 0);
                 break;
         }
         case NAL_CMD_DEL_AUTOCONN: {
@@ -1287,6 +1353,10 @@ ksocknal_module_fini (void)
                 LASSERT (0);
 
         case SOCKNAL_INIT_ALL:
+#if CONFIG_SYSCTL
+                if (ksocknal_data.ksnd_sysctl != NULL)
+                        unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
+#endif
                 kportal_nal_unregister(SOCKNAL);
                 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
                 /* fall through */
@@ -1364,6 +1434,9 @@ ksocknal_module_init (void)
 
         /* packet descriptor must fit in a router descriptor's scratchpad */
         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
+        /* the following must be sizeof(int) for proc_dointvec() */
+        LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
+        LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
 
         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
 
@@ -1379,6 +1452,12 @@ ksocknal_module_init (void)
 
         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
 
+        ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
+        ksocknal_data.ksnd_eager_ack  = SOCKNAL_EAGER_ACK;
+#if SOCKNAL_ZC
+        ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
+#endif
+
         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
@@ -1561,9 +1640,13 @@ ksocknal_module_init (void)
 
         PORTAL_SYMBOL_REGISTER(ksocknal_ni);
 
+#ifdef CONFIG_SYSCTL
+        /* Press on regardless even if registering sysctl doesn't work */
+        ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
+#endif
         /* flag everything initialised */
         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
-
+        
         printk(KERN_INFO "Routing socket NAL loaded (Routing %s, initial "
                "mem %d)\n",
                kpr_routing (&ksocknal_data.ksnd_router) ?
index 69daa02..abd8e7b 100644 (file)
@@ -48,6 +48,7 @@
 #include <linux/stat.h>
 #include <linux/list.h>
 #include <linux/kmod.h>
+#include <linux/sysctl.h>
 #include <asm/uaccess.h>
 #include <asm/segment.h>
 #include <asm/div64.h>
 #define SOCKNAL_MIN_RECONNECT_INTERVAL HZ      /* first failed connection retry... */
 #define SOCKNAL_MAX_RECONNECT_INTERVAL (60*HZ) /* ...exponentially increasing to this */
 
-#define SOCKNAL_IO_TIMEOUT      (60*HZ)         /* default comms timeout */
+/* default vals for runtime tunables */
+#define SOCKNAL_IO_TIMEOUT       50             /* default comms timeout (seconds) */
+#define SOCKNAL_EAGER_ACK        1              /* default eager ack (boolean) */
+#define SOCKNAL_ZC_MIN_FRAG     (2<<10)         /* default smallest zerocopy fragment */
+
+#define SOCKNAL_USE_KEEPALIVES   0              /* use tcp/ip keepalive? */
 
 #define SOCKNAL_PEER_HASH_SIZE   101            /* # peer lists */
 
@@ -78,8 +84,6 @@
 # define SOCKNAL_MAX_FWD_PAYLOAD (64<<10)       /* biggest payload I can forward */
 #endif
 
-#define SOCKNAL_ZC_MIN_FRAG    (2<<10)          /* default smallest zerocopy fragment */
-
 #define SOCKNAL_NLTXS           128             /* # normal transmit messages */
 #define SOCKNAL_NNBLK_LTXS     128             /* # transmit messages reserved if can't block */
 
 
 #define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10)
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-# define jiffies_64  jiffies
-#endif
-
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,72))
 # define sk_data_ready data_ready
 # define sk_write_space write_space
@@ -136,6 +136,12 @@ typedef struct {
 
 typedef struct {
         int               ksnd_init;            /* initialisation state */
+        int               ksnd_io_timeout;      /* "stuck" socket timeout (seconds) */
+        int               ksnd_eager_ack;       /* make TCP ack eagerly? */
+#if SOCKNAL_ZC
+        unsigned int      ksnd_zc_min_frag;     /* minimum zero copy frag size */
+#endif
+        struct ctl_table_header *ksnd_sysctl;   /* sysctl interface */
         
         rwlock_t          ksnd_global_lock;     /* stabilize peer/conn ops */
         struct list_head *ksnd_peers;           /* hash table of all my known peers */
@@ -205,7 +211,6 @@ struct ksock_route;                             /* forward ref */
 typedef struct                                  /* transmit packet */
 {
         struct list_head        tx_list;        /* queue on conn for transmission etc */
-        __u64                   tx_deadline;    /* when (in jiffies) tx times out */
         char                    tx_isfwd;       /* forwarding / sourced here */
         int                     tx_nob;         /* # packet bytes */
         int                     tx_resid;       /* residual bytes */
@@ -291,10 +296,11 @@ typedef struct ksock_conn
         __u32               ksnc_ipaddr;        /* peer's IP */
         int                 ksnc_port;          /* peer's port */
         int                 ksnc_closing;       /* being shut down */
-
+        
         /* READER */
         struct list_head    ksnc_rx_list;       /* where I enq waiting input or a forwarding descriptor */
-        __u64               ksnc_rx_deadline;   /* when receive times out */
+        unsigned long       ksnc_rx_deadline;   /* when (in jiffies) receive times out */
+        int                 ksnc_rx_started;    /* started receiving a message */
         int                 ksnc_rx_ready;      /* data ready to read */
         int                 ksnc_rx_scheduled;  /* being progressed */
         int                 ksnc_rx_state;      /* what is being read */
@@ -311,9 +317,7 @@ typedef struct ksock_conn
         /* WRITER */
         struct list_head    ksnc_tx_list;       /* where I enq waiting for output space */
         struct list_head    ksnc_tx_queue;      /* packets waiting to be sent */
-#if SOCKNAL_ZC
-        struct list_head    ksnc_tx_pending;    /* zc packets pending callback */
-#endif
+        unsigned long       ksnc_tx_deadline;   /* when (in jiffies) tx times out */
         atomic_t            ksnc_tx_nob;        /* # bytes queued */
         int                 ksnc_tx_ready;      /* write space */
         int                 ksnc_tx_scheduled;  /* being progressed */
@@ -326,7 +330,7 @@ typedef struct ksock_route
         struct ksock_peer  *ksnr_peer;          /* owning peer */
         atomic_t            ksnr_refcount;      /* # users */
         int                 ksnr_sharecount;    /* lconf usage counter */
-        __u64               ksnr_timeout;       /* when reconnection can happen next */
+        unsigned long       ksnr_timeout;       /* when (in jiffies) reconnection can happen next */
         unsigned int        ksnr_retry_interval; /* how long between retries */
         __u32               ksnr_ipaddr;        /* an IP address for this peer */
         int                 ksnr_port;          /* port to connect to */
@@ -334,8 +338,9 @@ typedef struct ksock_route
         unsigned int        ksnr_irq_affinity:1; /* set affinity? */
         unsigned int        ksnr_xchange_nids:1; /* do hello protocol? */
         unsigned int        ksnr_nonagel:1;     /* disable nagle? */
-        unsigned int        ksnr_connecting;    /* autoconnect in progress? */
-        unsigned int        ksnr_deleted;       /* been removed from peer? */
+        unsigned int        ksnr_eager:1;       /* connect eagery? */
+        unsigned int        ksnr_connecting:1;  /* autoconnect in progress? */
+        unsigned int        ksnr_deleted:1;     /* been removed from peer? */
         int                 ksnr_generation;    /* connection incarnation # */
         ksock_conn_t       *ksnr_conn;          /* NULL/active connection */
 } ksock_route_t;
@@ -346,13 +351,14 @@ typedef struct ksock_peer
         ptl_nid_t           ksnp_nid;           /* who's on the other end(s) */
         atomic_t            ksnp_refcount;      /* # users */
         int                 ksnp_closing;       /* being closed */
+        int                 ksnp_error;         /* errno on closing last conn */
         struct list_head    ksnp_conns;         /* all active connections */
         struct list_head    ksnp_routes;        /* routes */
         struct list_head    ksnp_tx_queue;      /* waiting packets */
+        unsigned long       ksnp_last_alive;    /* when (in jiffies) I was last alive */
 } ksock_peer_t;
 
 
-
 extern nal_cb_t         ksocknal_lib;
 extern ksock_nal_data_t ksocknal_data;
 
@@ -393,8 +399,8 @@ extern int ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr,
                                int single, int keep_conn);
 extern int ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
                                  struct socket *sock, int bind_irq);
-extern void ksocknal_close_conn_locked (ksock_conn_t *conn);
-extern int ksocknal_close_conn_unlocked (ksock_conn_t *conn);
+extern void ksocknal_close_conn_locked (ksock_conn_t *conn, int why);
+extern int ksocknal_close_conn_unlocked (ksock_conn_t *conn, int why);
 extern void ksocknal_terminate_conn (ksock_conn_t *conn);
 extern void ksocknal_destroy_conn (ksock_conn_t *conn);
 extern void ksocknal_put_conn (ksock_conn_t *conn);
@@ -404,6 +410,7 @@ extern void ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn);
 extern void ksocknal_tx_done (ksock_tx_t *tx, int asynch);
 extern void ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
 extern void ksocknal_fmb_callback (void *arg, int error);
+extern void ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive);
 extern int ksocknal_thread_start (int (*fn)(void *arg), void *arg);
 extern int ksocknal_new_packet (ksock_conn_t *conn, int skip);
 extern int ksocknal_scheduler (void *arg);
@@ -411,4 +418,4 @@ extern void ksocknal_data_ready(struct sock *sk, int n);
 extern void ksocknal_write_space(struct sock *sk);
 extern int ksocknal_autoconnectd (void *arg);
 extern int ksocknal_reaper (void *arg);
-extern int ksocknal_set_linger (struct socket *sock);
+extern int ksocknal_setup_sock (struct socket *sock);
index 656a0c5..f946d87 100644 (file)
 
 #include "socknal.h"
 
-int        ksocknal_io_timeout = SOCKNAL_IO_TIMEOUT;
-#if SOCKNAL_ZC
-int        ksocknal_do_zc = 1;
-int        ksocknal_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
-#endif
-
 /*
  *  LIB functions follow
  *
@@ -225,7 +219,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         struct iovec  *iov = tx->tx_iov;
         int            fragsize = iov->iov_len;
         unsigned long  vaddr = (unsigned long)iov->iov_base;
-        int            more = !list_empty (&conn->ksnc_tx_queue) |
+        int            more = (!list_empty (&conn->ksnc_tx_queue)) |
                               (tx->tx_niov > 1) |
                               (tx->tx_nkiov > 1);
 #if SOCKNAL_ZC
@@ -241,10 +235,9 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (tx->tx_niov > 0);
         
 #if SOCKNAL_ZC
-        if (ksocknal_do_zc &&
+        if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
-            zcsize >= ksocknal_zc_min_frag &&
             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
                 
                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
@@ -306,7 +299,7 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
         int            fragsize = kiov->kiov_len;
         struct page   *page = kiov->kiov_page;
         int            offset = kiov->kiov_offset;
-        int            more = !list_empty (&conn->ksnc_tx_queue) |
+        int            more = (!list_empty (&conn->ksnc_tx_queue)) |
                               (tx->tx_nkiov > 1);
         int            rc;
 
@@ -318,10 +311,9 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (tx->tx_nkiov > 0);
 
 #if SOCKNAL_ZC
-        if (ksocknal_do_zc &&
+        if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
-            (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
-            fragsize >= ksocknal_zc_min_frag) {
+            (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
 
                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
                                page, offset, fragsize);
@@ -349,6 +341,7 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
                 set_fs (KERNEL_DS);
                 rc = sock_sendmsg(sock, &msg, fragsize);
                 set_fs (oldmm);
+
                 kunmap (page);
         }
 
@@ -410,6 +403,16 @@ ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
                         break;
                 }
 
+                /* Consider the connection alive since we managed to chuck
+                 * more data into it.  Really, we'd like to consider it
+                 * alive only when the peer ACKs something, but
+                 * write_space() only gets called back while SOCK_NOSPACE
+                 * is set.  Instead, we presume peer death has occurred if
+                 * the socket doesn't drain within a timout */
+                conn->ksnc_tx_deadline = jiffies + 
+                                         ksocknal_data.ksnd_io_timeout * HZ;
+                conn->ksnc_peer->ksnp_last_alive = jiffies;
+
                 if (tx->tx_resid == 0) {        /* sent everything */
                         rc = 0;
                         break;
@@ -420,6 +423,24 @@ ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
         RETURN (rc);
 }
 
+void
+ksocknal_eager_ack (ksock_conn_t *conn)
+{
+        int            opt = 1;
+        mm_segment_t   oldmm = get_fs();
+        struct socket *sock = conn->ksnc_sock;
+        
+        /* Remind the socket to ACK eagerly.  If I don't, the socket might
+         * think I'm about to send something it could piggy-back the ACK
+         * on, introducing delay in completing zero-copy sends in my
+         * peer. */
+
+        set_fs(KERNEL_DS);
+        sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
+                               (char *)&opt, sizeof (opt));
+        set_fs(oldmm);
+}
+
 int
 ksocknal_recv_iov (ksock_conn_t *conn)
 {
@@ -453,6 +474,13 @@ ksocknal_recv_iov (ksock_conn_t *conn)
         if (rc <= 0)
                 return (rc);
 
+        /* received something... */
+        conn->ksnc_peer->ksnp_last_alive = jiffies;
+        conn->ksnc_rx_deadline = jiffies + 
+                                 ksocknal_data.ksnd_io_timeout * HZ;
+        mb();                           /* order with setting rx_started */
+        conn->ksnc_rx_started = 1;
+
         conn->ksnc_rx_nob_wanted -= rc;
         conn->ksnc_rx_nob_left -= rc;
                 
@@ -499,11 +527,19 @@ ksocknal_recv_kiov (ksock_conn_t *conn)
         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
         /* NB this is just a boolean............................^ */
         set_fs (oldmm);
+
         kunmap (page);
         
         if (rc <= 0)
                 return (rc);
         
+        /* received something... */
+        conn->ksnc_peer->ksnp_last_alive = jiffies;
+        conn->ksnc_rx_deadline = jiffies + 
+                                 ksocknal_data.ksnd_io_timeout * HZ;
+        mb();                           /* order with setting rx_started */
+        conn->ksnc_rx_started = 1;
+
         conn->ksnc_rx_nob_wanted -= rc;
         conn->ksnc_rx_nob_left -= rc;
                 
@@ -541,20 +577,33 @@ ksocknal_recvmsg (ksock_conn_t *conn)
                         rc = -ESHUTDOWN;
                         break;
                 }
-                
+
                 if (conn->ksnc_rx_niov != 0)
                         rc = ksocknal_recv_iov (conn);
                 else
                         rc = ksocknal_recv_kiov (conn);
-                
+
                 if (rc <= 0) {
                         /* error/EOF or partial receive */
-                        if (rc == -EAGAIN)
+                        if (rc == -EAGAIN) {
                                 rc = 1;
+                        } else if (rc == 0 && conn->ksnc_rx_started) {
+                                /* EOF in the middle of a message */
+                                rc = -EPROTO;
+                        }
                         break;
                 }
 
+                /* Completed a fragment */
+
                 if (conn->ksnc_rx_nob_wanted == 0) {
+                        /* Completed a message segment (header or payload) */
+                        if (ksocknal_data.ksnd_eager_ack &&
+                            (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
+                             conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
+                                /* Remind the socket to ack eagerly... */
+                                ksocknal_eager_ack(conn);
+                        }
                         rc = 1;
                         break;
                 }
@@ -577,7 +626,6 @@ ksocknal_zc_callback (zccd_t *zcd)
 
         spin_lock_irqsave (&sched->kss_lock, flags);
 
-        list_del (&tx->tx_list); /* remove from kss_zctxpending_list */
         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
         if (waitqueue_active (&sched->kss_waitq))
                 wake_up (&sched->kss_waitq);
@@ -628,23 +676,12 @@ ksocknal_tx_launched (ksock_tx_t *tx)
 {
 #if SOCKNAL_ZC
         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
-                unsigned long  flags;
                 ksock_conn_t  *conn = tx->tx_conn;
-                ksock_sched_t *sched = conn->ksnc_scheduler;
                 
                 /* zccd skbufs are still in-flight.  First take a ref on
                  * conn, so it hangs about for ksocknal_tx_done... */
                 atomic_inc (&conn->ksnc_refcount);
 
-                /* Stash it for timeout...
-                 * NB We have to hold a lock to stash the tx, and we have
-                 * stash it before we zcc_put(), but we have to _not_ hold
-                 * this lock when we zcc_put(), otherwise we could deadlock
-                 * if it turns out to be the last put. Aaaaarrrrggghhh! */
-                spin_lock_irqsave (&sched->kss_lock, flags);
-                list_add_tail (&tx->tx_list, &conn->ksnc_tx_pending);
-                spin_unlock_irqrestore (&sched->kss_lock, flags);
-                
                 /* ...then drop the initial ref on zccd, so the zero copy
                  * callback can occur */
                 zccd_put (&tx->tx_zccd);
@@ -659,10 +696,10 @@ ksocknal_tx_launched (ksock_tx_t *tx)
 void
 ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
 {
-        ksock_conn_t *conn;
-        ksock_tx_t *tx;
-        int         rc;
-
+        ksock_conn_t  *conn;
+        ksock_tx_t    *tx;
+        int            rc;
+        
         LASSERT (!list_empty (&sched->kss_tx_conns));
         conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
         list_del (&conn->ksnc_tx_list);
@@ -686,7 +723,7 @@ ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
 
         if (rc != 0) {
-                if (ksocknal_close_conn_unlocked (conn)) {
+                if (ksocknal_close_conn_unlocked (conn, rc)) {
                         /* I'm the first to close */
                         CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
                                 conn, rc, conn->ksnc_peer->ksnp_nid,
@@ -696,7 +733,6 @@ ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
 
         } else if (tx->tx_resid == 0) {  
-
                 /* everything went; assume more can go, and avoid
                  * write_space locking */
                 conn->ksnc_tx_ready = 1;
@@ -763,7 +799,8 @@ ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
                 return (NULL);
         }
         
-        rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, &target_nid);
+        rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
+                         &target_nid);
         if (rc != 0) {
                 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
                 return (NULL);
@@ -820,8 +857,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 #endif
 
         spin_lock_irqsave (&sched->kss_lock, flags);
-                
-        tx->tx_deadline = jiffies_64 + ksocknal_io_timeout;
+
         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
                 
         if (conn->ksnc_tx_ready &&      /* able to send */
@@ -839,7 +875,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 }
 
 ksock_route_t *
-ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
+ksocknal_find_connectable_route_locked (ksock_peer_t *peer, int eager_only)
 {
         struct list_head  *tmp;
         ksock_route_t     *route;
@@ -849,7 +885,8 @@ ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
                 
                 if (route->ksnr_conn == NULL && /* not connected */
                     !route->ksnr_connecting &&  /* not connecting */
-                    route->ksnr_timeout <= jiffies_64) /* OK to retry */
+                    (!eager_only || route->ksnr_eager) && /* wants to connect */
+                    time_after_eq (jiffies, route->ksnr_timeout)) /* OK to retry */
                         return (route);
         }
         
@@ -880,7 +917,7 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
         ksock_conn_t     *conn;
         ksock_route_t    *route;
         rwlock_t         *g_lock;
-        
+
         /* Ensure the frags we've been given EXACTLY match the number of
          * bytes we want to send.  Many TCP/IP stacks disregard any total
          * size parameters passed to them and just look at the frags. 
@@ -907,17 +944,19 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
                 return (PTL_FAIL);
         }
 
-        /* Any routes need to be connected? (need write lock if so) */
-        if (ksocknal_find_connectable_route_locked (peer) == NULL) {
+        if (ksocknal_find_connectable_route_locked(peer, 1) == NULL) {
                 conn = ksocknal_find_conn_locked (tx, peer);
                 if (conn != NULL) {
+                        /* I've got no unconnected autoconnect routes that
+                         * need to be connected, and I do have an actual
+                         * connection... */
                         ksocknal_queue_tx_locked (tx, conn);
                         read_unlock (g_lock);
                         return (PTL_OK);
                 }
         }
         
-        /* need a write lock now to change peer state... */
+        /* Making one or more connections; I'll need a write lock... */
 
         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
         read_unlock (g_lock);
@@ -930,23 +969,37 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
         }
         ksocknal_put_peer (peer);               /* drop ref I got above */
 
-        /* I may launch autoconnects, now we're write locked... */
-        while ((route = ksocknal_find_connectable_route_locked (peer)) != NULL)
+
+        for (;;) {
+                /* launch all eager autoconnections */
+                route = ksocknal_find_connectable_route_locked (peer, 1);
+                if (route == NULL)
+                        break;
+
                 ksocknal_launch_autoconnect_locked (route);
+        }
 
         conn = ksocknal_find_conn_locked (tx, peer);
         if (conn != NULL) {
+                /* Connection exists; queue message on it */
                 ksocknal_queue_tx_locked (tx, conn);
                 write_unlock_irqrestore (g_lock, flags);
                 return (PTL_OK);
         }
-                
+
         if (ksocknal_find_connecting_route_locked (peer) == NULL) {
-                /* no routes actually connecting now */
-                write_unlock_irqrestore (g_lock, flags);
-                return (PTL_FAIL);
+                /* no autoconnect routes actually connecting now.  Scrape
+                 * the barrel for non-eager autoconnects */
+                route = ksocknal_find_connectable_route_locked (peer, 0);
+                if (route != NULL) {
+                        ksocknal_launch_autoconnect_locked (route);
+                } else {
+                        write_unlock_irqrestore (g_lock, flags);
+                        return (PTL_FAIL);
+                }
         }
 
+        /* At least 1 connection is being established; queue the message... */
         list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
 
         write_unlock_irqrestore (g_lock, flags);
@@ -1279,7 +1332,6 @@ ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 
         conn->ksnc_cookie = fmb;                /* stash fmb for later */
         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
-        conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout; /* start timeout */
         
         /* payload is desc's iov-ed buffer, but skipping the hdr */
         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
@@ -1323,7 +1375,7 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
                        dest_nid, body_len);
 
                 ksocknal_new_packet (conn, 0);  /* on to new packet */
-                ksocknal_close_conn_unlocked (conn); /* give up on conn */
+                ksocknal_close_conn_unlocked (conn, -EINVAL); /* give up on conn */
                 return;
         }
 
@@ -1373,6 +1425,9 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
         int   skipped;
 
         if (nob_to_skip == 0) {         /* right at next packet boundary now */
+                conn->ksnc_rx_started = 0;
+                mb ();                          /* racing with timeout thread */
+                
                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
@@ -1464,7 +1519,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
         rc = ksocknal_recvmsg(conn);
 
         if (rc <= 0) {
-                if (ksocknal_close_conn_unlocked (conn)) {
+                if (ksocknal_close_conn_unlocked (conn, rc)) {
                         /* I'm the first to close */
                         if (rc < 0)
                                 CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
@@ -1478,6 +1533,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                 goto out;
         }
 
+        
         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
                 goto out;                       /* try again later */
 
@@ -1506,9 +1562,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                 /* sets wanted_len, iovs etc */
                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
 
-                /* start timeout (lib is waiting for finalize) */
-                conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout;
-
                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
                         goto try_read;          /* go read the payload */
@@ -1517,7 +1570,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 
         case SOCKNAL_RX_BODY:
                 /* payload all received */
-                conn->ksnc_rx_deadline = 0;     /* cancel timeout */
                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
                 /* Fall through */
 
@@ -1534,9 +1586,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
                         conn->ksnc_rx_nob_left);
 
-                /* cancel timeout (only needed it while fmb allocated) */
-                conn->ksnc_rx_deadline = 0;
-
                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
                  * conn->ksnc_cookie */
                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
@@ -1631,15 +1680,20 @@ int ksocknal_scheduler (void *arg)
         int                id = sched - ksocknal_data.ksnd_schedulers;
         char               name[16];
 
-        snprintf (name, sizeof (name),"ksocknald[%d]", id);
+        snprintf (name, sizeof (name),"ksocknald_%02d", id);
         kportal_daemonize (name);
         kportal_blockallsigs ();
 
 #if (CONFIG_SMP && CPU_AFFINITY)
-        if ((cpu_online_map & (1 << id)) != 0)
+        if ((cpu_online_map & (1 << id)) != 0) {
+#if 0
                 current->cpus_allowed = (1 << id);
-        else
+#else
+                set_cpus_allowed (current, 1<<id);
+#endif
+        } else {
                 CERROR ("Can't set CPU affinity for %s\n", name);
+        }
 #endif /* CONFIG_SMP && CPU_AFFINITY */
         
         spin_lock_irqsave (&sched->kss_lock, flags);
@@ -1722,7 +1776,10 @@ ksocknal_data_ready (struct sock *sk, int n)
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
                 sk->sk_data_ready (sk, n);
-        } else if (!conn->ksnc_rx_ready) {        /* new news */
+                goto out;
+        }
+
+        if (!conn->ksnc_rx_ready) {        /* new news */
                 /* Set ASAP in case of concurrent calls to me */
                 conn->ksnc_rx_ready = 1;
 
@@ -1747,6 +1804,7 @@ ksocknal_data_ready (struct sock *sk, int n)
                 spin_unlock_irqrestore (&sched->kss_lock, flags);
         }
 
+ out:
         read_unlock (&ksocknal_data.ksnd_global_lock);
 
         EXIT;
@@ -1776,7 +1834,12 @@ ksocknal_write_space (struct sock *sk)
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
                 sk->sk_write_space (sk);
-        } else if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
+
+                read_unlock (&ksocknal_data.ksnd_global_lock);
+                return;
+        }
+
+        if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
 
                 if (!conn->ksnc_tx_ready) {      /* new news */
@@ -1967,7 +2030,7 @@ ksocknal_exchange_nids (struct socket *sock, ptl_nid_t nid)
 }
 
 int
-ksocknal_set_linger (struct socket *sock) 
+ksocknal_setup_sock (struct socket *sock) 
 {
         mm_segment_t    oldmm = get_fs ();
         int             rc;
@@ -1998,7 +2061,51 @@ ksocknal_set_linger (struct socket *sock)
                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
                 return (rc);
         }
+
+#if SOCKNAL_USE_KEEPALIVES
+        /* Keepalives: If 3/4 of the timeout elapses, start probing every
+         * second until the timeout elapses. */
+
+        option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
+                return (rc);
+        }
         
+        option = 1;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
+                return (rc);
+        }
+        
+        option = ksocknal_data.ksnd_io_timeout / 4;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
+                return (rc);
+        }
+
+        option = 1;
+        set_fs (KERNEL_DS);
+        rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
+                              (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
+                return (rc);
+        }
+#endif
         return (0);
 }
 
@@ -2007,7 +2114,6 @@ ksocknal_connect_peer (ksock_route_t *route)
 {
         struct sockaddr_in  peer_addr;
         mm_segment_t        oldmm = get_fs();
-        __u64               n;
         struct timeval      tv;
         int                 fd;
         struct socket      *sock;
@@ -2020,8 +2126,8 @@ ksocknal_connect_peer (ksock_route_t *route)
         }
 
         /* Ugh; have to map_fd for compatibility with sockets passed in
-         * from userspace.  And we actually need the refcounting that
-         * this gives you :) */
+         * from userspace.  And we actually need the sock->file refcounting
+         * that this gives you :) */
 
         fd = sock_map_fd (sock);
         if (fd < 0) {
@@ -2033,22 +2139,19 @@ ksocknal_connect_peer (ksock_route_t *route)
         /* NB the fd now owns the ref on sock->file */
         LASSERT (sock->file != NULL);
         LASSERT (file_count(sock->file) == 1);
-        
+
         /* Set the socket timeouts, so our connection attempt completes in
          * finite time */
-        tv.tv_sec = ksocknal_io_timeout / HZ;
-        n = ksocknal_io_timeout % HZ;
-        n = n * 1000000 + HZ - 1;
-        do_div (n, HZ);
-        tv.tv_usec = n;
+        tv.tv_sec = ksocknal_data.ksnd_io_timeout;
+        tv.tv_usec = 0;
 
         set_fs (KERNEL_DS);
         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
                               (char *)&tv, sizeof (tv));
         set_fs (oldmm);
         if (rc != 0) {
-                CERROR ("Can't set send timeout %d (in HZ): %d\n", 
-                        ksocknal_io_timeout, rc);
+                CERROR ("Can't set send timeout %d: %d\n", 
+                        ksocknal_data.ksnd_io_timeout, rc);
                 goto out;
         }
         
@@ -2057,8 +2160,8 @@ ksocknal_connect_peer (ksock_route_t *route)
                               (char *)&tv, sizeof (tv));
         set_fs (oldmm);
         if (rc != 0) {
-                CERROR ("Can't set receive timeout %d (in HZ): %d\n",
-                        ksocknal_io_timeout, rc);
+                CERROR ("Can't set receive timeout %d: %d\n",
+                        ksocknal_data.ksnd_io_timeout, rc);
                 goto out;
         }
 
@@ -2154,7 +2257,7 @@ ksocknal_autoconnect (ksock_route_t *route)
         route->ksnr_connecting = 0;
 
         LASSERT (route->ksnr_retry_interval != 0);
-        route->ksnr_timeout = jiffies_64 + route->ksnr_retry_interval;
+        route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
 
@@ -2198,7 +2301,7 @@ ksocknal_autoconnectd (void *arg)
         ksock_route_t     *route;
         int                rc;
 
-        snprintf (name, sizeof (name), "ksocknal_ad[%ld]", id);
+        snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
         kportal_daemonize (name);
         kportal_blockallsigs ();
 
@@ -2239,55 +2342,47 @@ ksock_conn_t *
 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
 {
         /* We're called with a shared lock on ksnd_global_lock */
-        unsigned long      flags;
         ksock_conn_t      *conn;
         struct list_head  *ctmp;
-        ksock_tx_t        *tx;
-        struct list_head  *ttmp;
         ksock_sched_t     *sched;
 
         list_for_each (ctmp, &peer->ksnp_conns) {
                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
                 sched = conn->ksnc_scheduler;
-                
-                if (conn->ksnc_rx_deadline != 0 &&
-                    conn->ksnc_rx_deadline <= jiffies_64)
-                        goto timed_out;
 
-                spin_lock_irqsave (&sched->kss_lock, flags);
+                /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
+                LASSERT (!conn->ksnc_closing);
                 
-                list_for_each (ttmp, &conn->ksnc_tx_queue) {
-                        tx = list_entry (ttmp, ksock_tx_t, tx_list);
-                        LASSERT (tx->tx_deadline != 0);
-                        
-                        if (tx->tx_deadline <= jiffies_64)
-                                goto timed_out_locked;
+                if (conn->ksnc_rx_started &&
+                    time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
+                        /* Timed out incomplete incoming message */
+                        atomic_inc (&conn->ksnc_refcount);
+                        CERROR ("Timed out RX from "LPX64" %p\n", 
+                                peer->ksnp_nid, conn);
+                        return (conn);
                 }
-#if SOCKNAL_ZC
-                list_for_each (ttmp, &conn->ksnc_tx_pending) {
-                        tx = list_entry (ttmp, ksock_tx_t, tx_list);
-                        LASSERT (tx->tx_deadline != 0);
-
-                        if (tx->tx_deadline <= jiffies_64)
-                                goto timed_out_locked;
+                
+                if ((!list_empty (&conn->ksnc_tx_queue) ||
+                     conn->ksnc_sock->sk->wmem_queued != 0) &&
+                    time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
+                        /* Timed out messages queued for sending, or
+                         * messages buffered in the socket's send buffer */
+                        atomic_inc (&conn->ksnc_refcount);
+                        CERROR ("Timed out TX to "LPX64" %s%d %p\n", 
+                                peer->ksnp_nid, 
+                                list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
+                                conn->ksnc_sock->sk->wmem_queued, conn);
+                        return (conn);
                 }
-#endif                
-                spin_unlock_irqrestore (&sched->kss_lock, flags);
-                continue;
-
-        timed_out_locked:
-                spin_unlock_irqrestore (&sched->kss_lock, flags);
-        timed_out:
-                atomic_inc (&conn->ksnc_refcount);
-                return (conn);
         }
 
         return (NULL);
 }
 
 void
-ksocknal_check_peer_timeouts (struct list_head *peers)
+ksocknal_check_peer_timeouts (int idx)
 {
+        struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
         struct list_head *ptmp;
         ksock_peer_t     *peer;
         ksock_conn_t     *conn;
@@ -2305,7 +2400,7 @@ ksocknal_check_peer_timeouts (struct list_head *peers)
                 if (conn != NULL) {
                         read_unlock (&ksocknal_data.ksnd_global_lock);
 
-                        if (ksocknal_close_conn_unlocked (conn)) {
+                        if (ksocknal_close_conn_unlocked (conn, -ETIMEDOUT)) {
                                 /* I actually closed... */
                                 CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
                                         peer->ksnp_nid, conn->ksnc_ipaddr,
@@ -2330,8 +2425,9 @@ ksocknal_reaper (void *arg)
         unsigned long      flags;
         ksock_conn_t      *conn;
         int                timeout;
+        int                i;
         int                peer_index = 0;
-        __u64              deadline = jiffies_64;
+        unsigned long      deadline = jiffies;
         
         kportal_daemonize ("ksocknal_reaper");
         kportal_blockallsigs ();
@@ -2371,12 +2467,32 @@ ksocknal_reaper (void *arg)
                 
                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
 
-                while ((timeout = deadline - jiffies_64) <= 0) {
-                        /* Time to check for timeouts on a few more peers */
-                        ksocknal_check_peer_timeouts (&ksocknal_data.ksnd_peers[peer_index]);
+                /* careful with the jiffy wrap... */
+                while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
+                        const int n = 4;
+                        const int p = 1;
+                        int       chunk = ksocknal_data.ksnd_peer_hash_size;
+                        
+                        /* Time to check for timeouts on a few more peers: I do
+                         * checks every 'p' seconds on a proportion of the peer
+                         * table and I need to check every connection 'n' times
+                         * within a timeout interval, to ensure I detect a
+                         * timeout on any connection within (n+1)/n times the
+                         * timeout interval. */
+
+                        if (ksocknal_data.ksnd_io_timeout > n * p)
+                                chunk = (chunk * n * p) / 
+                                        ksocknal_data.ksnd_io_timeout;
+                        if (chunk == 0)
+                                chunk = 1;
+
+                        for (i = 0; i < chunk; i++) {
+                                ksocknal_check_peer_timeouts (peer_index);
+                                peer_index = (peer_index + 1) % 
+                                             ksocknal_data.ksnd_peer_hash_size;
+                        }
 
-                        peer_index = (peer_index + 1) % SOCKNAL_PEER_HASH_SIZE;
-                        deadline += HZ;
+                        deadline += p * HZ;
                 }
 
                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
index abd0731..478f3c1 100644 (file)
@@ -431,7 +431,8 @@ ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie,
         if ((conn = ktoenal_get_conn (nid)) == NULL)
         {
                 /* It's not a peer; try to find a gateway */
-                rc = kpr_lookup (&ktoenal_data.ksnd_router, nid, &gatewaynid);
+                rc = kpr_lookup (&ktoenal_data.ksnd_router, nid, payload_niov,
+                                 &gatewaynid);
                 if (rc != 0)
                 {
                         CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
index 20d7fbd..cf9220b 100644 (file)
@@ -20,7 +20,7 @@ link-stamp:
        echo timestamp > link-stamp
 
 DEFS =
-portals_SOURCES = $(LINKS) module.c proc.c debug.c
+portals_SOURCES = $(LINKS) module.c proc.c debug.c lwt.c
 
 # Don't distribute any patched files.
 dist-hook:
index 63e71ee..90eb185 100644 (file)
@@ -790,41 +790,61 @@ void portals_debug_set_level(unsigned int debug_level)
         portal_debug = debug_level;
 }
 
+void portals_run_upcall(char **argv)
+{
+        int   rc;
+        int   argc;
+        char *envp[] = {
+                "HOME=/",
+                "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
+                NULL};
+        ENTRY;
+
+        argv[0] = portals_upcall;
+        argc = 1;
+        while (argv[argc] != NULL)
+                argc++;
+
+        LASSERT(argc >= 2);
+        
+        rc = call_usermodehelper(argv[0], argv, envp);
+        if (rc < 0) {
+                CERROR("Error %d invoking portals upcall %s %s%s%s%s%s%s%s%s; "
+                       "check /proc/sys/portals/upcall\n",
+                       rc, argv[0], argv[1],
+                       argc < 3 ? "" : ",", argc < 3 ? "" : argv[2],
+                       argc < 4 ? "" : ",", argc < 4 ? "" : argv[3],
+                       argc < 5 ? "" : ",", argc < 5 ? "" : argv[4],
+                       argc < 6 ? "" : ",...");
+        } else {
+                CERROR("Invoked portals upcall %s %s%s%s%s%s%s%s%s\n",
+                       argv[0], argv[1],
+                       argc < 3 ? "" : ",", argc < 3 ? "" : argv[2],
+                       argc < 4 ? "" : ",", argc < 4 ? "" : argv[3],
+                       argc < 5 ? "" : ",", argc < 5 ? "" : argv[4],
+                       argc < 6 ? "" : ",...");
+        }
+}
+
 void portals_run_lbug_upcall(char *file, const char *fn, const int line)
 {
         char *argv[6];
-        char *envp[3];
         char buf[32];
-        int rc;
 
         ENTRY;
         snprintf (buf, sizeof buf, "%d", line);
 
-        argv[0] = portals_upcall;
         argv[1] = "LBUG";
         argv[2] = file;
         argv[3] = (char *)fn;
         argv[4] = buf;
         argv[5] = NULL;
 
-        envp[0] = "HOME=/";
-        envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
-        envp[2] = NULL;
-
-        rc = call_usermodehelper(argv[0], argv, envp);
-        if (rc < 0) {
-                CERROR("Error invoking lbug upcall %s %s %s %s %s: %d; check "
-                       "/proc/sys/portals/upcall\n",                
-                       argv[0], argv[1], argv[2], argv[3], argv[4], rc);
-                
-        } else {
-                CERROR("Invoked upcall %s %s %s %s %s\n",
-                       argv[0], argv[1], argv[2], argv[3], argv[4]);
-        }
+        portals_run_upcall (argv);
 }
 
-
 EXPORT_SYMBOL(portals_debug_dumplog);
 EXPORT_SYMBOL(portals_debug_msg);
 EXPORT_SYMBOL(portals_debug_set_level);
+EXPORT_SYMBOL(portals_run_upcall);
 EXPORT_SYMBOL(portals_run_lbug_upcall);
diff --git a/lustre/portals/libcfs/lwt.c b/lustre/portals/libcfs/lwt.c
new file mode 100644 (file)
index 0000000..a40a7ed
--- /dev/null
@@ -0,0 +1,235 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2003 Cluster File Systems, Inc.
+ *   Author: Eric Barton <eeb@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define EXPORT_SYMTAB
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/kmod.h>
+#include <linux/kernel.h>
+#include <linux/kernel.h>
+#include <linux/mm.h>
+#include <linux/string.h>
+#include <linux/stat.h>
+#include <linux/errno.h>
+#include <linux/smp_lock.h>
+#include <linux/unistd.h>
+#include <linux/interrupt.h>
+#include <asm/system.h>
+#include <asm/uaccess.h>
+
+#define DEBUG_SUBSYSTEM S_PORTALS
+
+#include <linux/kp30.h>
+
+#if LWT_SUPPORT
+
+#define LWT_MEMORY              (1<<20)         /* 1Mb of trace memory */
+#define LWT_MAX_CPUS             4
+
+int         lwt_enabled;
+int         lwt_pages_per_cpu;
+lwt_cpu_t   lwt_cpus[LWT_MAX_CPUS];
+
+/* NB only root is allowed to retrieve LWT info; it's an open door into the
+ * kernel... */
+
+int
+lwt_lookup_string (int *size, char *knl_ptr,
+                   char *user_ptr, int user_size)
+{
+        /* knl_ptr was retrieved from an LWT snapshot and the caller wants to
+         * turn it into a string.  NB we can crash with an access violation
+         * trying to determine the string length, so we're trusting our
+         * caller... */
+
+        if (!capable(CAP_SYS_ADMIN))
+                return (-EPERM);
+
+        *size = strlen (knl_ptr) + 1;
+        
+        if (user_ptr != NULL &&
+            copy_to_user (user_ptr, knl_ptr, *size))
+                return (-EFAULT);
+        
+        return (0);
+}
+
+int
+lwt_control (int enable, int clear)
+{
+        lwt_page_t  *p;
+        int          i;
+        int          j;
+
+        if (!capable(CAP_SYS_ADMIN))
+                return (-EPERM);
+
+        if (clear)
+                for (i = 0; i < num_online_cpus(); i++) {
+                        p = lwt_cpus[i].lwtc_current_page;
+                        
+                        for (j = 0; j < lwt_pages_per_cpu; j++) {
+                                
+                                memset (p->lwtp_events, 0, PAGE_SIZE);
+                                
+                                p = list_entry (p->lwtp_list.next,
+                                                lwt_page_t, lwtp_list);
+                        }
+        }
+
+        lwt_enabled = enable;
+        mb();
+        if (!enable) {
+                /* give people some time to stop adding traces */
+                schedule_timeout(10);
+        }
+
+        return (0);
+}
+
+int
+lwt_snapshot (int *ncpu, int *total_size, 
+              void *user_ptr, int user_size) 
+{
+        const int    events_per_page = PAGE_SIZE / sizeof(lwt_event_t);
+        const int    bytes_per_page = events_per_page * sizeof(lwt_event_t);
+        lwt_page_t  *p;
+        int          i;
+        int          j;
+
+        if (!capable(CAP_SYS_ADMIN))
+                return (-EPERM);
+
+        *ncpu = num_online_cpus();
+        *total_size = num_online_cpus() * lwt_pages_per_cpu * bytes_per_page;
+
+        if (user_ptr == NULL)
+                return (0);
+
+        for (i = 0; i < num_online_cpus(); i++) {
+                p = lwt_cpus[i].lwtc_current_page;
+                
+                for (j = 0; j < lwt_pages_per_cpu; j++) {
+                        if (copy_to_user(user_ptr, p->lwtp_events,
+                                         bytes_per_page))
+                                return (-EFAULT);
+
+                        user_ptr = ((char *)user_ptr) + bytes_per_page;
+                        p = list_entry(p->lwtp_list.next,
+                                       lwt_page_t, lwtp_list);
+                        
+                }
+        }
+
+        return (0);
+}
+
+int
+lwt_init () 
+{
+       int     i;
+        int     j;
+        
+        if (num_online_cpus() > LWT_MAX_CPUS) {
+                CERROR ("Too many CPUs\n");
+                return (-EINVAL);
+        }
+
+       /* NULL pointers, zero scalars */
+       memset (lwt_cpus, 0, sizeof (lwt_cpus));
+        lwt_pages_per_cpu = LWT_MEMORY / (num_online_cpus() * PAGE_SIZE);
+
+       for (i = 0; i < num_online_cpus(); i++)
+               for (j = 0; j < lwt_pages_per_cpu; j++) {
+                       struct page *page = alloc_page (GFP_KERNEL);
+                       lwt_page_t  *lwtp;
+
+                       if (page == NULL) {
+                               CERROR ("Can't allocate page\n");
+                                lwt_fini ();
+                               return (-ENOMEM);
+                       }
+
+                        PORTAL_ALLOC(lwtp, sizeof (*lwtp));
+                       if (lwtp == NULL) {
+                               CERROR ("Can't allocate lwtp\n");
+                                __free_page(page);
+                               lwt_fini ();
+                               return (-ENOMEM);
+                       }
+
+                        lwtp->lwtp_page = page;
+                        lwtp->lwtp_events = page_address(page);
+                       memset (lwtp->lwtp_events, 0, PAGE_SIZE);
+
+                       if (j == 0) {
+                               INIT_LIST_HEAD (&lwtp->lwtp_list);
+                               lwt_cpus[i].lwtc_current_page = lwtp;
+                       } else {
+                               list_add (&lwtp->lwtp_list,
+                                   &lwt_cpus[i].lwtc_current_page->lwtp_list);
+                       }
+                }
+
+        lwt_enabled = 1;
+        mb();
+
+        return (0);
+}
+
+void
+lwt_fini () 
+{
+        int    i;
+        
+        if (num_online_cpus() > LWT_MAX_CPUS)
+                return;
+
+        for (i = 0; i < num_online_cpus(); i++)
+                while (lwt_cpus[i].lwtc_current_page != NULL) {
+                        lwt_page_t *lwtp = lwt_cpus[i].lwtc_current_page;
+                        
+                        if (list_empty (&lwtp->lwtp_list)) {
+                                lwt_cpus[i].lwtc_current_page = NULL;
+                        } else {
+                                lwt_cpus[i].lwtc_current_page =
+                                        list_entry (lwtp->lwtp_list.next,
+                                                    lwt_page_t, lwtp_list);
+
+                                list_del (&lwtp->lwtp_list);
+                        }
+                        
+                        __free_page (lwtp->lwtp_page);
+                        PORTAL_FREE (lwtp, sizeof (*lwtp));
+                }
+}
+
+EXPORT_SYMBOL(lwt_enabled);
+EXPORT_SYMBOL(lwt_cpus);
+
+EXPORT_SYMBOL(lwt_init);
+EXPORT_SYMBOL(lwt_fini);
+EXPORT_SYMBOL(lwt_lookup_string);
+EXPORT_SYMBOL(lwt_control);
+EXPORT_SYMBOL(lwt_snapshot);
+#endif
index c72dffc..11e4f4c 100644 (file)
@@ -122,8 +122,8 @@ static inline void freedata(void *data, int len)
 }
 
 static int
-kportal_add_route(int gateway_nalid, ptl_nid_t gateway_nid, ptl_nid_t lo_nid,
-                  ptl_nid_t hi_nid)
+kportal_add_route(int gateway_nalid, ptl_nid_t gateway_nid, 
+                  ptl_nid_t lo_nid, ptl_nid_t hi_nid)
 {
         int rc;
         kpr_control_interface_t *ci;
@@ -139,7 +139,8 @@ kportal_add_route(int gateway_nalid, ptl_nid_t gateway_nid, ptl_nid_t lo_nid,
 }
 
 static int
-kportal_del_route(ptl_nid_t target)
+kportal_del_route(int gw_nalid, ptl_nid_t gw_nid, 
+                  ptl_nid_t lo, ptl_nid_t hi)
 {
         int rc;
         kpr_control_interface_t *ci;
@@ -148,7 +149,24 @@ kportal_del_route(ptl_nid_t target)
         if (ci == NULL)
                 return (-ENODEV);
 
-        rc = ci->kprci_del_route (target);
+        rc = ci->kprci_del_route (gw_nalid, gw_nid, lo, hi);
+
+        PORTAL_SYMBOL_PUT(kpr_control_interface);
+        return (rc);
+}
+
+static int
+kportal_notify_router (int gw_nalid, ptl_nid_t gw_nid,
+                       int alive, time_t when)
+{
+        int rc;
+        kpr_control_interface_t *ci;
+
+        ci = (kpr_control_interface_t *)PORTAL_SYMBOL_GET(kpr_control_interface);
+        if (ci == NULL)
+                return (-ENODEV);
+
+        rc = ci->kprci_notify (gw_nalid, gw_nid, alive, when);
 
         PORTAL_SYMBOL_PUT(kpr_control_interface);
         return (rc);
@@ -156,12 +174,13 @@ kportal_del_route(ptl_nid_t target)
 
 static int
 kportal_get_route(int index, __u32 *gateway_nalidp, ptl_nid_t *gateway_nidp,
-                  ptl_nid_t *lo_nidp, ptl_nid_t *hi_nidp)
+                  ptl_nid_t *lo_nidp, ptl_nid_t *hi_nidp, int *alivep)
 {
         int       gateway_nalid;
         ptl_nid_t gateway_nid;
         ptl_nid_t lo_nid;
         ptl_nid_t hi_nid;
+        int       alive;
         int       rc;
         kpr_control_interface_t *ci;
 
@@ -169,17 +188,19 @@ kportal_get_route(int index, __u32 *gateway_nalidp, ptl_nid_t *gateway_nidp,
         if (ci == NULL)
                 return (-ENODEV);
 
-        rc = ci->kprci_get_route(index, &gateway_nalid, &gateway_nid, &lo_nid,
-                                 &hi_nid);
+        rc = ci->kprci_get_route(index, &gateway_nalid, &gateway_nid,
+                                 &lo_nid, &hi_nid, &alive);
 
         if (rc == 0) {
-                CDEBUG(D_IOCTL, "got route [%d] %d "LPX64":"LPX64" - "LPX64"\n",
-                       index, gateway_nalid, gateway_nid, lo_nid, hi_nid);
+                CDEBUG(D_IOCTL, "got route [%d] %d "LPX64":"LPX64" - "LPX64", %s\n",
+                       index, gateway_nalid, gateway_nid, lo_nid, hi_nid,
+                       alive ? "up" : "down");
 
                 *gateway_nalidp = (__u32)gateway_nalid;
-                *gateway_nidp   = (__u32)gateway_nid;
-                *lo_nidp        = (__u32)lo_nid;
-                *hi_nidp        = (__u32)hi_nid;
+                *gateway_nidp   = gateway_nid;
+                *lo_nidp        = lo_nid;
+                *hi_nidp        = hi_nid;
+                *alivep         = alive;
         }
 
         PORTAL_SYMBOL_PUT (kpr_control_interface);
@@ -367,23 +388,38 @@ static int kportal_ioctl(struct inode *inode, struct file *file,
 
         case IOC_PORTAL_ADD_ROUTE:
                 CDEBUG(D_IOCTL, "Adding route: [%d] "LPU64" : "LPU64" - "LPU64"\n",
-                       data->ioc_nal, data->ioc_nid, data->ioc_nid2,
-                       data->ioc_nid3);
+                       data->ioc_nal, data->ioc_nid, 
+                       data->ioc_nid2, data->ioc_nid3);
                 err = kportal_add_route(data->ioc_nal, data->ioc_nid,
-                                        MIN (data->ioc_nid2, data->ioc_nid3),
-                                        MAX (data->ioc_nid2, data->ioc_nid3));
+                                        data->ioc_nid2, data->ioc_nid3);
                 break;
 
         case IOC_PORTAL_DEL_ROUTE:
-                CDEBUG (D_IOCTL, "Removing route to "LPU64"\n", data->ioc_nid);
-                err = kportal_del_route (data->ioc_nid);
+                CDEBUG (D_IOCTL, "Removing routes via [%d] "LPU64" : "LPU64" - "LPU64"\n",
+                        data->ioc_nal, data->ioc_nid, 
+                        data->ioc_nid2, data->ioc_nid3);
+                err = kportal_del_route (data->ioc_nal, data->ioc_nid,
+                                         data->ioc_nid2, data->ioc_nid3);
                 break;
 
+        case IOC_PORTAL_NOTIFY_ROUTER: {
+                CDEBUG (D_IOCTL, "Notifying peer [%d] "LPU64" %s @ %ld\n",
+                        data->ioc_nal, data->ioc_nid,
+                        data->ioc_flags ? "Enabling" : "Disabling",
+                        (time_t)data->ioc_nid3);
+                
+                err = kportal_notify_router (data->ioc_nal, data->ioc_nid,
+                                             data->ioc_flags, 
+                                             (time_t)data->ioc_nid3);
+                break;
+        }
+                
         case IOC_PORTAL_GET_ROUTE:
                 CDEBUG (D_IOCTL, "Getting route [%d]\n", data->ioc_count);
                 err = kportal_get_route(data->ioc_count, &data->ioc_nal,
-                                        &data->ioc_nid, &data->ioc_nid2,
-                                        &data->ioc_nid3);
+                                        &data->ioc_nid, 
+                                        &data->ioc_nid2, &data->ioc_nid3,
+                                        &data->ioc_flags);
                 if (err == 0)
                         if (copy_to_user((char *)arg, data, sizeof (*data)))
                                 err = -EFAULT;
@@ -432,7 +468,27 @@ static int kportal_ioctl(struct inode *inode, struct file *file,
                 kportal_put_ni (data->ioc_nal);
                 break;
         }
-
+#if LWT_SUPPORT
+        case IOC_PORTAL_LWT_CONTROL: 
+                err = lwt_control (data->ioc_flags, data->ioc_misc);
+                break;
+                
+        case IOC_PORTAL_LWT_SNAPSHOT:
+                err = lwt_snapshot (&data->ioc_count, &data->ioc_misc,
+                                    data->ioc_pbuf1, data->ioc_plen1);
+                if (err == 0 &&
+                    copy_to_user((char *)arg, data, sizeof (*data)))
+                        err = -EFAULT;
+                break;
+                
+        case IOC_PORTAL_LWT_LOOKUP_STRING:
+                err = lwt_lookup_string (&data->ioc_count, data->ioc_pbuf1,
+                                         data->ioc_pbuf2, data->ioc_plen2);
+                if (err == 0 &&
+                    copy_to_user((char *)arg, data, sizeof (*data)))
+                        err = -EFAULT;
+                break;
+#endif                        
         default:
                 err = -EINVAL;
                 break;
@@ -471,12 +527,19 @@ static int init_kportals_module(void)
                 return (rc);
         }
 
+#if LWT_SUPPORT
+        rc = lwt_init();
+        if (rc != 0) {
+                CERROR("lwt_init: error %d\n", rc);
+                goto cleanup_debug;
+        }
+#endif
         sema_init(&nal_cmd_sem, 1);
 
         rc = misc_register(&portal_dev);
         if (rc) {
                 CERROR("misc_register: error %d\n", rc);
-                goto cleanup_debug;
+                goto cleanup_lwt;
         }
 
         rc = PtlInit();
@@ -498,6 +561,10 @@ static int init_kportals_module(void)
         PtlFini();
  cleanup_deregister:
         misc_deregister(&portal_dev);
+ cleanup_lwt:
+#if LWT_SUPPORT
+        lwt_fini();
+#endif
  cleanup_debug:
         portals_debug_cleanup();
         return rc;
@@ -518,6 +585,10 @@ static void exit_kportals_module(void)
         if (rc)
                 CERROR("misc_deregister error %d\n", rc);
 
+#if LWT_SUPPORT
+        lwt_fini();
+#endif
+
         if (atomic_read(&portal_kmemory) != 0)
                 CERROR("Portals memory leaked: %d bytes\n",
                        atomic_read(&portal_kmemory));
index 27a7fba..a03fb42 100644 (file)
@@ -24,6 +24,7 @@
 #include "router.h"
 
 LIST_HEAD(kpr_routes);
+LIST_HEAD(kpr_gateways);
 LIST_HEAD(kpr_nals);
 
 unsigned long long kpr_fwd_bytes;
@@ -42,6 +43,7 @@ kpr_router_interface_t kpr_router_interface = {
        kprri_lookup:           kpr_lookup_target,
        kprri_fwd_start:        kpr_forward_packet,
        kprri_fwd_done:         kpr_complete_packet,
+        kprri_notify:           kpr_nal_notify,
        kprri_shutdown:         kpr_shutdown_nal,
        kprri_deregister:       kpr_deregister_nal,
 };
@@ -50,6 +52,7 @@ kpr_control_interface_t kpr_control_interface = {
        kprci_add_route:        kpr_add_route,
        kprci_del_route:        kpr_del_route,
        kprci_get_route:        kpr_get_route,
+       kprci_notify:           kpr_sys_notify,
 };
 
 int
@@ -59,7 +62,7 @@ kpr_register_nal (kpr_nal_interface_t *nalif, void **argp)
        struct list_head  *e;
        kpr_nal_entry_t   *ne;
 
-        CDEBUG (D_OTHER, "Registering NAL %d\n", nalif->kprni_nalid);
+        CDEBUG (D_NET, "Registering NAL %d\n", nalif->kprni_nalid);
 
        PORTAL_ALLOC (ne, sizeof (*ne));
        if (ne == NULL)
@@ -96,12 +99,181 @@ kpr_register_nal (kpr_nal_interface_t *nalif, void **argp)
 }
 
 void
+kpr_do_upcall (void *arg)
+{
+        kpr_upcall_t *u = (kpr_upcall_t *)arg;
+        char          nalstr[10];
+        char          nidstr[36];
+        char          whenstr[36];
+        char         *argv[] = {
+                NULL,
+                "ROUTER_NOTIFY",
+                nalstr,
+                nidstr,
+                u->kpru_alive ? "up" : "down",
+                whenstr,
+                NULL};
+        
+        snprintf (nalstr, sizeof(nalstr), "%d", u->kpru_nal_id);
+        snprintf (nidstr, sizeof(nidstr), LPX64, u->kpru_nid);
+        snprintf (whenstr, sizeof(whenstr), "%ld", u->kpru_when);
+
+        portals_run_upcall (argv);
+}
+
+void
+kpr_upcall (int gw_nalid, ptl_nid_t gw_nid, int alive, time_t when)
+{
+        /* May be in arbitrary context */
+        kpr_upcall_t  *u = kmalloc (sizeof (kpr_upcall_t), GFP_ATOMIC);
+
+        if (u == NULL) {
+                CERROR ("Upcall out of memory: nal %d nid "LPX64" %s\n",
+                        gw_nalid, gw_nid, alive ? "up" : "down");
+                return;
+        }
+
+        u->kpru_nal_id     = gw_nalid;
+        u->kpru_nid        = gw_nid;
+        u->kpru_alive      = alive;
+        u->kpru_when       = when;
+
+        prepare_work (&u->kpru_tq, kpr_do_upcall, u);
+        schedule_work (&u->kpru_tq);
+}
+
+int
+kpr_do_notify (int byNal, int gateway_nalid, ptl_nid_t gateway_nid,
+               int alive, time_t when)
+{
+       unsigned long        flags;
+        int                  rc = -ENOENT;
+        kpr_nal_entry_t     *ne = NULL;
+        kpr_gateway_entry_t *ge = NULL;
+        struct timeval       now;
+       struct list_head    *e;
+       struct list_head    *n;
+
+        CDEBUG (D_ERROR, "%s notifying [%d] "LPX64": %s\n", 
+                byNal ? "NAL" : "userspace", 
+                gateway_nalid, gateway_nid, alive ? "up" : "down");
+
+        /* can't do predictions... */
+        do_gettimeofday (&now);
+        if (when > now.tv_sec) {
+                CERROR ("Ignoring prediction from %s of [%d] "LPX64" %s "
+                        "%ld seconds in the future\n", 
+                byNal ? "NAL" : "userspace", 
+                gateway_nalid, gateway_nid, alive ? "up" : "down",
+                        when - now.tv_sec);
+                return (EINVAL);
+        }
+
+        LASSERT (when <= now.tv_sec);
+
+        /* Serialise with lookups (i.e. write lock) */
+       write_lock_irqsave(&kpr_rwlock, flags);
+
+        list_for_each_safe (e, n, &kpr_gateways) {
+
+                ge = list_entry(e, kpr_gateway_entry_t, kpge_list);
+                if ((gateway_nalid != 0 &&
+                     ge->kpge_nalid != gateway_nalid) ||
+                    ge->kpge_nid != gateway_nid)
+                        continue;
+
+                rc = 0;
+                break;
+        }
+
+        if (rc != 0) {
+                /* gateway not found */
+                write_unlock_irqrestore(&kpr_rwlock, flags);
+                CERROR ("Gateway not found\n");
+                return (rc);
+        }
+        
+        if (when < ge->kpge_timestamp) {
+                /* out of date information */
+                write_unlock_irqrestore (&kpr_rwlock, flags);
+                CERROR ("Out of date\n");
+                return (0);
+        }
+
+        /* update timestamp */
+        ge->kpge_timestamp = when;
+
+        if ((!ge->kpge_alive) == (!alive)) {
+                /* new date for old news */
+                write_unlock_irqrestore (&kpr_rwlock, flags);
+                CERROR ("Old news\n");
+                return (0);
+        }
+
+        ge->kpge_alive = alive;
+        CDEBUG(D_NET, "set "LPX64" [%p] %d\n", gateway_nid, ge, alive);
+
+        if (alive) {
+                /* Reset all gateway weights so the newly-enabled gateway
+                 * doesn't have to play catch-up */
+                list_for_each_safe (e, n, &kpr_gateways) {
+                        kpr_gateway_entry_t *ge = list_entry(e, kpr_gateway_entry_t,
+                                                             kpge_list);
+                        atomic_set (&ge->kpge_weight, 0);
+                }
+        }
+
+        if (!byNal) {
+                /* userland notified me: notify NAL? */
+                ne = kpr_find_nal_entry_locked (ge->kpge_nalid);
+                if (ne != NULL) {
+                        if (ne->kpne_shutdown ||
+                            ne->kpne_interface.kprni_notify == NULL) {
+                                /* no need to notify */
+                                ne = NULL;
+                        } else {
+                                /* take a ref on this NAL until notifying
+                                 * it has completed... */
+                                atomic_inc (&ne->kpne_refcount);
+