Whamcloud - gitweb
LU-3008 lnet: Update support for Cray's interconnects
authorJames R. Shimek <jshimek@cray.com>
Thu, 21 Mar 2013 22:41:24 +0000 (17:41 -0500)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 23 Apr 2013 05:46:40 +0000 (01:46 -0400)
This patch updates gnilnd to include all of Cray's
patches for the last year since the initial push.

Included changes

----------------------------------------------------------------------
Subject
Reverse rdma kgnilnd fixes
Description
A LNET_PUT when matched on the receiving side is parsed it
can call kgnilnd_recv with a mlen == 0, previously the reverse_rdma
code for kgnilnd did not handle this and asserted. This mod adds
handling of the case when mlen is set to 0 and also adds handling
when an LNET_GET's lnetmsg is == NULL, which is another case which
is handled in non reverse_rdma path but not in the reverse_rdma path.

----------------------------------------------------------------------
Subject
Gnilnd refcount changes
Description
This mod adjusts connection refcount handling to bring the
reference adding and removing in line with what was expected, this
was brought up during the whamcloud review but left undone on their
end.

----------------------------------------------------------------------
Subject
kgnilnd peer_timeout enhancement for peer_health
Description
Currently on router nodes kgnilnd peer_health is enabled, when
peer_health is enabled it sets a default timeout factor of
kgn_timeout+kgn_timeout/8. This value currently cannot be adjusted
except through adjust kgn_timeout. This mod allows for the user to
increase the value by setting the module parameter peer_timeout in
conjunction with peer_health.

When peer_timeout is set and peer_health is enabled the timeout
passed to lnet will be what the user has specified as long as it is
greater than the previous fudge calculation. If the user specifies a
value less than fudge kgnilnd will fail to load and throw an error
to the console.

Changes
1. Added module parameter peer_timeout, when peer_health is enabled
   this allows manipulation of the ni_peertimeout value passed to
   lnet.

----------------------------------------------------------------------
Subject
kgnilnd conn double free refcount fix
Description
Currently kgnilnd has a possible race condition on service nodes
between two scheduler threads. When a connection is scheduled another
scheduler can act upon the conn before the first has decremented its
reference.
Currently kgnilnd_conn_decref uses a seperate atomic_read after it
decrefs to decide what to do next. There is the possibility that two
threads calling kgnilnd_conn_decref could see the same value of zero
even though one thread would have brought the refcount to one and the
other to zero. The same issue can occur with kgnilnd_peer_decref.

This mod introduces changes to the scheduler to prevent two decrefs
at the same time in different scheduler threads. Also it updates
kgnilnd_conn_decref to utilize the value that is returned by
atomic_dec_return instead of doing a second atomic_read to verify
the reference count.

Changes
1. Changed kgnilnd_conn_decref to use the val returned by
   atomic_sub_return instead of doing atomic_reads to get the value.
2. Changed kgnilnd_peer_decref to use the val returned by
   atomic_sub_return instead of doing atomic_reads to get the value.
3. Updated kgnilnd_schedule_conn and kgnilnd_schedule_process conn
   so that when a connection is scheduled from within a scheduler
   thread it carries the reference forward instead of removing it.
   This in addition to the kgnilnd_conn_decref change should remove
   the double free problem.
4. Changed assertions in kgnilnd_peer_addref, kgnilnd_conn_addref so
   they catch when the value is incremented up from 0 to 1.
5. Use magic value to verify conn is not being free twice.

----------------------------------------------------------------------
Subject
Debug for mailbox corrruption.
Description
We have two peers (routers) writing to the same mailbox of a compute
node.

Add more debug to identify the cause of two peers getting the same
mailbox information.
- Store both the previous nid and the previous purgatory nid for this
  mailbox.
- Store the dgram type in the conn so we can tell if the conn
  resulted from a matched wildcard or a direct connection request.
- Keep track of the total allocations of a mailbox and the current
  number of allocations.
- Add a proc file peer_conns with information containing the peer's
  connection information.
  - writing a nid value (echo 1234 > /proc/kgnilnd/peer_conns) will
    allow the read (cat /proc/kgnilnd/peer_conns) to produce a list
    of conns associated with the specified nid.

----------------------------------------------------------------------
Subject
Ignore events generated from 'xtcli set/clr_reserve'
Description
'xtcli set_reserve' and 'xtcli clr_reserve' operations overload the
ec_node_unavailable event as described in bug 785850.  Since gnilnd
uses ec_node_unavailable events, we need to ignore them when they
originate from those commands.

----------------------------------------------------------------------
Subject
Close connection upon receipt of RCA unavailable event.
Description
When a blade is powered down, messages sent to the nodes will
cause ORB timeouts which causes a quiesce and ORB scrub. The quiesce
causes gnilnd to bump it's timeouts so we continue sending traffic
causing more ORB timeouts.

----------------------------------------------------------------------
Subject
kgnilnd_dgram_mover thread runtime deadline
Description
Currently there is no deadline associated with starting outbound
dgrams within the kgnilnd_dgram_mover thread. The thread will loop
while the list is not empty. When there is a large amount of network
problems the thread could run for a very long time. This mod adds a
deadline check to make sure the dgram thread stops attempting to post
dgrams after the deadline passes, the thread will schedule itself and
be woken up normally after time has passed to continue its work.

Changes
1. Added deadline to kgnilnd_dgram_mover so
   kgnilnd_start_outbound_dgrams is bounded in runtime by size of
   list and by a maximum runtime deadline.
2. Added error injection to verify dgram deadline.
3. Added module parameter to adjust deadline of dgram thread.

----------------------------------------------------------------------
Subject
fix peer_conn_lock deadlock
Description
kgnilnd_tx_done() called with lock held.
There is an error case whereby kgnilnd_tx_done will be called by
kgnilnd_queue_tx(). This can cause a deadlock if lnet calls back
needing the write lock.

Remove call to kgnilnd_tx_done since the tx will be processsed by
kgnilnd_process_fmaq() (like the EAGAIN case).

----------------------------------------------------------------------
Subject
Make kgnilnd_bump_timeouts aware of DONE connections
Description
Currently when kgnilnd comes out of quiesce all connections timeouts
are bumped so they dont close from the period they were paused.
kgnilnd_bump_timeouts schedules all the connections on a peer
including ones that are in purgatory in the GNILND_CONN_DONE state.
These connections are not supposed to be put through the scheduler
once they are in the DONE state.

A LBUG can occur if after the quiesce occurs the scheduler thread
does not push the newly scheduled conns through the state machine
fast enough. This can leave DONE conns on the scheduled list when
stack reset is triggered. Stack reset then puts any scheduled conns
through kgnilnd_complete_closed_conn which when the function sees a
conn in the GNILND_CONN_DONE state it asserts.

Changes
1. Add if statement so kgnilnd_bump_timeouts does not schedule DONE
   connections.

----------------------------------------------------------------------
Subject
Subscribe GNILND to UXACT errors
Description
Aries has a new type of error that GNILND needs to be subscribed to
for stack reset initiation. This mod adds that error type to our
callback subscription routine.

Changes
1. Add GNI_ERRMASK_UNKNOWN_TRANSACTION to mask passed into
   kgnilnd_subscribe_errors function.

----------------------------------------------------------------------
Subject
kgnilnd reverse bte rdma transactions
Description
Currently GNILND executes all of its kgni bte rdma transactions
using GNI_POST_RDMA_PUT, on cascade systems this can cause IOMMU
thrashing on router nodes from the many computes initiating rdma
to the single service node. This can cause linear performance
degradation as more and more computes attempt to write into a single
service nodes memory space. To alleviate this problem we will change
how rdmas are done we will use GNI_POST_RDMA_GET, so the service node
will initiate the transfer of data to it instead of thousands of
clients all trying at once. By adding a run time tunable that allows
us to switch to using GNI_POST_RDMA_GET we can govern the RDMA from
the receiving node.

Changes
1. Added new message types that exist side by side with current
   infrastructure so different nodes can have rdma setting tuned
   and all nodes will handle the messages.
2. Added tunables so that the REVERSE setting can be adjusted at
   run time.
3. Added support for non byte aligned data transfers so that gets
   will succeed when non byte aligned offsets and lengths are
   provided to kgnilnd.
4. Added the capability to send checksum information in the message
   being sent to the side that will be initiating the rdma.
   This works side by side with existing rdma checksum capabilities.
5. Corrected rdma nak problems when RDMA mapping fails for a specific
   type of tx.
6. Added counters to rdma when a copy needs to be made due to
   unaligned data, this will allow us to see if performance is
   hindered because of a large number of vmalloc calls have to be
   made.
7. Changed the entire call tree for rdma to support the handling of
   the new message types.
8. On Aries platforms service nodes will be defaulted to
   GNILND_REVERSE_GET, compute nodes defaulted to GNILND_REVERSE_PUT.

----------------------------------------------------------------------
Subject
Generate/check checksum over the number of bytes actually transferred
Description
It is possible for PUTs to have a different length than the
length stored in lntmsg->msg_len since LNET can adjust this
length based on it's buffer size and offset.
lnet_try_match_md() sets the mlength that we use to do the
RDMA transfer.

Therfore we need to compute checksum using tx->tx_rdma_desc.length
and verify the checksum using length returned in the
msg->gnm_u.completion.gncm_retval which contains the actual number
of bytes transmitted.

----------------------------------------------------------------------
Subject
GniLND needs to filter accelerator events.
Description
Change the kgnilnd_rca thread to filter out accelerator events.
----------------------------------------------------------------------
Subject
kgnilnd BTE Delivery MODE tunable
Description
Currently kgnilnd only exposes a few options to tune for kgni's rdma
bte delivery mode. This works well for Gemini systems, but on Cascade
we would like finer grained control. This mod allows us to change the
delivery mode at run time through the exposed tunable interface.
Giving us the capability to tune the delivery modes without having to
restart the system or make code changes.

Changes
1.  Added tunable bte_dlvr_mode which takes a mask/number for the
    delivery mode and uses that to set the bte delivery option for
    rdma.
2.  Removed extraneous tunables that were only single tunable
    specific.
3.  Added Gemini and Aries header options if in the future we need to
    change the defaults on Aries or Gemini.

----------------------------------------------------------------------
Subject
GniLND connection serialization, debug for compute bad message type.
Description
Introduce a semaphore for connection processing serialization within
the scheduler thread for bugs 789853 and 789855.
  - The main work of the scheduler thread is now protected by a read
    semaphore.
  - When kgnilnd_process_conns needs to do work on a connection, it
    takes a write semaphore.

----------------------------------------------------------------------
Subject
GniLND rca_thread exit fix.
Description
Change the kgnilnd_rca thread from exiting when receiving an error
from krca_wait_event.

----------------------------------------------------------------------
Subject
GniLND kgnilnd_recv message type unknown
Description
Add debug to print out more info in kgnilnd_recv() default case of
the gnm_type switch statement.

----------------------------------------------------------------------
Subject
fix fma_blk state when mdd is invalidated.
Description
Currently when an VIRT_MAPPED fma_blk is unmapped kgnilnd doesnt
change its state to IDLE. Since it doesnt the code that finds a free
mbox will use mboxes within the fma_blk even though its mdd has been
invalidated, causing dgram exchanges to contain bad mailboxes.

This change will mark the fma_blk as having its mdd invalidated.

----------------------------------------------------------------------
Subject
gnilnd/rca integration
Description
Subscribe for the rca events ec_node_unavailable, ec_node_available
and ec_node_failed to prevent reconnect attempts to downed nodes.
We do not use the event to kill a live connection.

----------------------------------------------------------------------
Subject
kgnilnd eager_recv double free fix
Description
Currently the function call kgnilnd_eager_recv does no verification
that the connection passed into it with an rx message is alive and
valid. Normally this is without issue except when connections are
being closed and opened on routers. A connection could be in the
process of being destroyed and have its refcount incremented.
The next call to kgnilnd_recv could cause a double free.

This mod alleviates this by doing a reverse lookup on the connection
based on the information we can validate within the rx message. By
using a read_lock on kgn_peer_conn_lock we can then lookup the
connection based on its nid and verify it conn_stamp matches the one
the message is expecting. If we find a valid connection that matches
we then increment that connections refcount while the lock is held,
preventing it from disappearing until after the receive. Without the
lock and reverse lookup we could end up looking at already freed
memory.

This race was showing itself through an fma_blk assertion on the
router nodes, when 2 destroy_conn calls occured in parallel sometimes
one would get past an if(fma_blk) check and then find that the
fma_blk had already been set to 0.

----------------------------------------------------------------------
Subject
Sequence kgnilnd tx use with close of connections.
Description
Currently kgnilnd makes an incorrect assumption
that when a conn is closed and the connection is removed from
the cqid lookup table that no tx's are in use by other threads.

What can happen is one of the other scheduler threads can be
in the process of using a tx and has called
kgnilnd_tx_del_state_locked. This can race against
kgnilnd_complete_closed_conn in a different scheduler thread as it
attempts to remove all existing tx's from the conn's tx_ref_table.
That kgnilnd_complete_closed_conn calls kgnilnd_tx_del_state_locked
on the connection's tx's, and since a tx could still be in use in the
first scheduler thread an exception can occur.

This mod marks the conn as having tx's in use when the first thread
has a read_lock on the kgnilnd_peer_conn_lock.

Changes
1. Added to kgn_conn_t an atomic gnc_tx_in_use that is incremented
   any time kgnilnd_validate_tx_ev_id is called.
2. Added a decref to the conn's gnc_tx_in_use after the function
   is finished using the tx.
3. Added a check in kgnilnd_process_conns that barriers entry for a
   given connection into kgnilnd_complete_closed_conn until
   gnc_tx_in_use is 0. Once the conn is removed by the close call from
   the cqid hash table only existing in use tx's from before the close
   will prevent the close from completing so no livelocks should be
   possible.

----------------------------------------------------------------------
Subject
Add kgnilnd scheduler thread runtime deadline
Description
This mod makes sure that the kgnilnd scheduler threads
are not sitting on the cpu longer than neccessary by adding a deadline
that forces a yield after the deadline is hit. The amount of time
that the scheduler will allow itself to run without scheduling
is configurable via module parameter in 1 second intervals.

It was also found that the nice value of the scheduler threads
is preventing the heartbeat system from working correctly on
compute nodes with only a single scheduler thread. So we are
changing default nice value of thread to 0 to allow other
threads to run.

Changes
1. Added sched_timeout module parameter to allow changing of
   default scheduler thread deadline.
2. Added deadline check to kgnilnd_process_conns so it does
   not spin in its while loop forever.
3. Added error injection to verify deadline is checked and
   calls to yield occur.
3. Added sched_nice module parameter to allow adjustment of
   scheduler thread priority seperate from other kgnilnd
   threads.

----------------------------------------------------------------------
Subject
Cleanup kgnilnd_schedule_conn races during conn close
Description
This patch reworks the previous debug patch and adds a
debug framework that addresses the shortcomings previous patch.

We are also removing an extraneous kgnilnd_schedule_conn
call from kgnilnd_finish_connect that was causing a large number of
the schedule after close occurences.

There is still a chance that a conn can be scheduled after close but
the current refcount framework is designed to counteract issues that
arise when that happens, making the removal of the assertion valid.

----------------------------------------------------------------------
Subject
Repost WC dgram when OOM event occurs
Description
Currently when kgnilnd runs out of GART space while attempting to
repost a wildcard datagram, the system asserts and tips over. Instead
we can put into place a mechanism that allows WC datagrams to be
reposted when the OOM conditon resolves.

This mod removes the assertion and puts into place a mechanism within
the dgram mover thread to post wildcards when neccessary. This allows
the system to stay up instead of crashing. When posting a dgram
fails a D_NETERROR message will be written out to the console.

----------------------------------------------------------------------
Subject
Workaround and additional debug for scheduler assertion
Description
This mod adds debug to get a better analysis of the gnc_scheduled
problem. It also has a workaround; the call to
kgnilnd_complete_closed_conn will short circuit and let
kgnilnd_process_conns handle the schedule normally when it sees that
gnc_scheduled != GNILND_CONN_PROCESS instead of asserting. I have also
added debug to all the calls to kgnilnd_schedule_conn so we can find
the call that is causing the assertion.

----------------------------------------------------------------------
Subject
Remove assertion and attempt recovery on mailbox corruption
Description
Previous mods have addressed the sequencing that could cause mailbox
corruption by fixing the state machine and adding timeouts. This mod
builds on those and makes the detection of issues relating to the
mailbox a correctable error. Instead of asserting we will now close
the connection when we detect corruption occuring and utilize the
purgatory system to attempt to get things back in order.  The previous
changes allow us to do this as they prevent the close sequence
corruption from spiraling out of control.

Changes
        1. Removed assertion in kgnilnd_check_fma_rx on seqno
           corruption and replace with a statement that closes the
           connection and returns -EIO. This should allow the system
           to continue without causing the node to come down.
        2. Added debug so when we do detect corruption it will be
           tagged in the console. This will allow us to see how often
           the problem occurs and if it contributes to system
           problems.

----------------------------------------------------------------------
Subject
Fix race condition and sequence kgnilnd connection closing
Description
There is a race between the scheduler thread and
kgnilnd_close_conn_locked. While we take the kgn_peer_conn_lock to
close the connection, the scheduler threads dont look at it when they
check the gnc_state. We could end up all the way through the close
state machine by the time the kgnilnd_close_conn_locked function
finishes tripping an assertion. To correct this race and improve
sequencing we need to make sure when changing the conn's gnc_state
we grab the write_lock on kgn_peer_conn_lock.

Changes
        1. In kgnilnd_send_conn_close when setting the conn's
           gnc_state to GNILND_CONN_CLOSED added a write_lock to make
           sure we are sequencing the close with other threads that
           might be changing the connections state.
----------------------------------------------------------------------

Signed-off-by: James R. Shimek <jshimek@cray.com>
Change-Id: I5b8de3b72cdc17b32134cb2532c9ad7dc4fa621c
Reviewed-on: http://review.whamcloud.com/5815
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: James Simmons <uja.ornl@gmail.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
13 files changed:
lnet/klnds/gnilnd/gnilnd.c
lnet/klnds/gnilnd/gnilnd.h
lnet/klnds/gnilnd/gnilnd_api_wrap.h
lnet/klnds/gnilnd/gnilnd_aries.h [new file with mode: 0644]
lnet/klnds/gnilnd/gnilnd_cb.c
lnet/klnds/gnilnd/gnilnd_conn.c
lnet/klnds/gnilnd/gnilnd_gemini.h [new file with mode: 0644]
lnet/klnds/gnilnd/gnilnd_hss_ops.h
lnet/klnds/gnilnd/gnilnd_modparams.c
lnet/klnds/gnilnd/gnilnd_proc.c
lnet/klnds/gnilnd/gnilnd_stack.c
lnet/klnds/gnilnd/gnilnd_sysctl.c
lnet/klnds/gnilnd/gnilnd_version.h

index fcc05fa..6c00370 100644 (file)
@@ -1,7 +1,6 @@
 /*
  * Copyright (C) 2012 Cray, Inc.
  *
 /*
  * Copyright (C) 2012 Cray, Inc.
  *
- *   Author: Igor Gorodetsky <iogordet@cray.com>
  *   Author: Nic Henke <nic@cray.com>
  *   Author: James Shimek <jshimek@cray.com>
  *
  *   Author: Nic Henke <nic@cray.com>
  *   Author: James Shimek <jshimek@cray.com>
  *
@@ -36,7 +35,6 @@ lnd_t the_kgnilnd = {
 };
 
 kgn_data_t      kgnilnd_data;
 };
 
 kgn_data_t      kgnilnd_data;
-kgn_hssops_t   kgnilnd_hssops;
 
 /* needs write_lock on kgn_peer_conn_lock */
 int
 
 /* needs write_lock on kgn_peer_conn_lock */
 int
@@ -177,9 +175,9 @@ kgnilnd_conn_isdup_locked(kgn_peer_t *peer, kgn_conn_t *newconn)
 int
 kgnilnd_create_conn(kgn_conn_t **connp, kgn_device_t *dev)
 {
 int
 kgnilnd_create_conn(kgn_conn_t **connp, kgn_device_t *dev)
 {
-       kgn_conn_t    *conn;
-       gni_return_t   rrc;
-       int            rc = 0;
+       kgn_conn_t      *conn;
+       gni_return_t    rrc;
+       int             rc = 0;
 
        LASSERT (!in_interrupt());
        atomic_inc(&kgnilnd_data.kgn_nconns);
 
        LASSERT (!in_interrupt());
        atomic_inc(&kgnilnd_data.kgn_nconns);
@@ -208,6 +206,7 @@ kgnilnd_create_conn(kgn_conn_t **connp, kgn_device_t *dev)
        atomic_set(&conn->gnc_refcount, 1);
        atomic_set(&conn->gnc_reaper_noop, 0);
        atomic_set(&conn->gnc_sched_noop, 0);
        atomic_set(&conn->gnc_refcount, 1);
        atomic_set(&conn->gnc_reaper_noop, 0);
        atomic_set(&conn->gnc_sched_noop, 0);
+       atomic_set(&conn->gnc_tx_in_use, 0);
        INIT_LIST_HEAD(&conn->gnc_list);
        INIT_LIST_HEAD(&conn->gnc_hashlist);
        INIT_LIST_HEAD(&conn->gnc_schedlist);
        INIT_LIST_HEAD(&conn->gnc_list);
        INIT_LIST_HEAD(&conn->gnc_hashlist);
        INIT_LIST_HEAD(&conn->gnc_schedlist);
@@ -215,6 +214,7 @@ kgnilnd_create_conn(kgn_conn_t **connp, kgn_device_t *dev)
        INIT_LIST_HEAD(&conn->gnc_mdd_list);
        spin_lock_init(&conn->gnc_list_lock);
        spin_lock_init(&conn->gnc_tx_lock);
        INIT_LIST_HEAD(&conn->gnc_mdd_list);
        spin_lock_init(&conn->gnc_list_lock);
        spin_lock_init(&conn->gnc_tx_lock);
+       conn->gnc_magic = GNILND_CONN_MAGIC;
 
        /* set tx id to nearly the end to make sure we find wrapping
         * issues soon */
 
        /* set tx id to nearly the end to make sure we find wrapping
         * issues soon */
@@ -278,7 +278,6 @@ kgn_conn_t *
 kgnilnd_find_conn_locked(kgn_peer_t *peer)
 {
        kgn_conn_t      *conn = NULL;
 kgnilnd_find_conn_locked(kgn_peer_t *peer)
 {
        kgn_conn_t      *conn = NULL;
-       ENTRY;
 
        /* if we are in reset, this conn is going to die soon */
        if (unlikely(kgnilnd_data.kgn_in_reset)) {
 
        /* if we are in reset, this conn is going to die soon */
        if (unlikely(kgnilnd_data.kgn_in_reset)) {
@@ -399,13 +398,15 @@ kgnilnd_destroy_conn(kgn_conn_t *conn)
                list_empty(&conn->gnc_list) &&
                list_empty(&conn->gnc_hashlist) &&
                list_empty(&conn->gnc_schedlist) &&
                list_empty(&conn->gnc_list) &&
                list_empty(&conn->gnc_hashlist) &&
                list_empty(&conn->gnc_schedlist) &&
-               list_empty(&conn->gnc_mdd_list),
-               "conn 0x%p->%s IRQ %d sched %d purg %d ep 0x%p lists %d/%d/%d/%d\n",
+               list_empty(&conn->gnc_mdd_list) &&
+               conn->gnc_magic == GNILND_CONN_MAGIC,
+               "conn 0x%p->%s IRQ %d sched %d purg %d ep 0x%p Mg %d lists %d/%d/%d/%d\n",
                conn, conn->gnc_peer ? libcfs_nid2str(conn->gnc_peer->gnp_nid)
                                     : "<?>",
                !!in_interrupt(), conn->gnc_scheduled,
                conn->gnc_in_purgatory,
                conn->gnc_ephandle,
                conn, conn->gnc_peer ? libcfs_nid2str(conn->gnc_peer->gnp_nid)
                                     : "<?>",
                !!in_interrupt(), conn->gnc_scheduled,
                conn->gnc_in_purgatory,
                conn->gnc_ephandle,
+               conn->gnc_magic,
                list_empty(&conn->gnc_list),
                list_empty(&conn->gnc_hashlist),
                list_empty(&conn->gnc_schedlist),
                list_empty(&conn->gnc_list),
                list_empty(&conn->gnc_hashlist),
                list_empty(&conn->gnc_schedlist),
@@ -424,8 +425,16 @@ kgnilnd_destroy_conn(kgn_conn_t *conn)
        CDEBUG(D_NET, "destroying conn %p ephandle %p error %d\n",
                conn, conn->gnc_ephandle, conn->gnc_error);
 
        CDEBUG(D_NET, "destroying conn %p ephandle %p error %d\n",
                conn, conn->gnc_ephandle, conn->gnc_error);
 
+       /* We are freeing this memory remove the magic value from the connection */
+       conn->gnc_magic = 0;
+
        /* if there is an FMA blk left here, we'll tear it down */
        if (conn->gnc_fma_blk) {
        /* if there is an FMA blk left here, we'll tear it down */
        if (conn->gnc_fma_blk) {
+               if (conn->gnc_peer) {
+                       kgn_mbox_info_t *mbox;
+                       mbox = &conn->gnc_fma_blk->gnm_mbox_info[conn->gnc_mbox_id];
+                       mbox->mbx_prev_nid = conn->gnc_peer->gnp_nid;
+               }
                kgnilnd_release_mbox(conn, 0);
        }
 
                kgnilnd_release_mbox(conn, 0);
        }
 
@@ -574,7 +583,8 @@ kgnilnd_close_conn_locked(kgn_conn_t *conn, int error)
        }
 
        /* if we NETERROR, make sure it is rate limited */
        }
 
        /* if we NETERROR, make sure it is rate limited */
-       if (!kgnilnd_conn_clean_errno(error)) {
+       if (!kgnilnd_conn_clean_errno(error) &&
+           peer->gnp_down == GNILND_RCA_NODE_UP) {
                CNETERR("closing conn to %s: error %d\n",
                       libcfs_nid2str(peer->gnp_nid), error);
        } else {
                CNETERR("closing conn to %s: error %d\n",
                       libcfs_nid2str(peer->gnp_nid), error);
        } else {
@@ -600,6 +610,7 @@ kgnilnd_close_conn_locked(kgn_conn_t *conn, int error)
        /* Remove from conn hash table: no new callbacks */
        list_del_init(&conn->gnc_hashlist);
        kgnilnd_data.kgn_conn_version++;
        /* Remove from conn hash table: no new callbacks */
        list_del_init(&conn->gnc_hashlist);
        kgnilnd_data.kgn_conn_version++;
+       kgnilnd_conn_decref(conn);
 
        /* if we are in reset, go right to CLOSED as there is no scheduler
         * thread to move from CLOSING to CLOSED */
 
        /* if we are in reset, go right to CLOSED as there is no scheduler
         * thread to move from CLOSING to CLOSED */
@@ -628,11 +639,6 @@ kgnilnd_close_conn_locked(kgn_conn_t *conn, int error)
         * gnd_ready_conns and allows us to find it in quiesce processing */
        kgnilnd_schedule_conn(conn);
 
         * gnd_ready_conns and allows us to find it in quiesce processing */
        kgnilnd_schedule_conn(conn);
 
-       /* lose peer's ref */
-       kgnilnd_conn_decref(conn);
-       /* -1 for conn table */
-       kgnilnd_conn_decref(conn);
-
        EXIT;
 }
 
        EXIT;
 }
 
@@ -678,6 +684,17 @@ kgnilnd_complete_closed_conn(kgn_conn_t *conn)
        LASSERT(list_empty(&conn->gnc_hashlist));
 
        /* we've sent the close, start nuking */
        LASSERT(list_empty(&conn->gnc_hashlist));
 
        /* we've sent the close, start nuking */
+       if (CFS_FAIL_CHECK(CFS_FAIL_GNI_SCHEDULE_COMPLETE))
+               kgnilnd_schedule_conn(conn);
+
+       if (conn->gnc_scheduled != GNILND_CONN_PROCESS) {
+               CDEBUG(D_NETERROR, "Error someone scheduled us after we were "
+                               "done, Attempting to recover conn 0x%p "
+                               "scheduled %d function: %s line: %d\n", conn,
+                               conn->gnc_scheduled, conn->gnc_sched_caller,
+                               conn->gnc_sched_line);
+               RETURN_EXIT;
+       }
 
        /* we don't use lists to track things that we can get out of the
         * tx_ref table... */
 
        /* we don't use lists to track things that we can get out of the
         * tx_ref table... */
@@ -713,9 +730,13 @@ kgnilnd_complete_closed_conn(kgn_conn_t *conn)
 
        /* nobody should have marked this as needing scheduling after
         * we called close - so only ref should be us handling it */
 
        /* nobody should have marked this as needing scheduling after
         * we called close - so only ref should be us handling it */
-       LASSERTF(conn->gnc_scheduled == GNILND_CONN_PROCESS,
-                "conn 0x%p scheduled %d\n", conn, conn->gnc_scheduled);
-
+       if (conn->gnc_scheduled != GNILND_CONN_PROCESS) {
+               CDEBUG(D_NETERROR, "Error someone scheduled us after we were "
+                               "done, Attempting to recover conn 0x%p "
+                               "scheduled %d function %s line: %d\n", conn,
+                               conn->gnc_scheduled, conn->gnc_sched_caller,
+                               conn->gnc_sched_line);
+       }
        /* now reset a few to actual counters... */
        nrdma = atomic_read(&conn->gnc_nlive_rdma);
        nq_rdma = atomic_read(&conn->gnc_nq_rdma);
        /* now reset a few to actual counters... */
        nrdma = atomic_read(&conn->gnc_nlive_rdma);
        nq_rdma = atomic_read(&conn->gnc_nq_rdma);
@@ -732,17 +753,17 @@ kgnilnd_complete_closed_conn(kgn_conn_t *conn)
        logmsg = (nlive + nrdma + nq_rdma);
 
        if (logmsg) {
        logmsg = (nlive + nrdma + nq_rdma);
 
        if (logmsg) {
-               if (conn->gnc_peer_error != 0) {
+               if (conn->gnc_peer->gnp_down == GNILND_RCA_NODE_UP) {
                        CNETERR("Closed conn 0x%p->%s (errno %d, peer errno %d): "
                                "canceled %d TX, %d/%d RDMA\n",
                                conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
                                conn->gnc_error, conn->gnc_peer_error,
                                nlive, nq_rdma, nrdma);
                } else {
                        CNETERR("Closed conn 0x%p->%s (errno %d, peer errno %d): "
                                "canceled %d TX, %d/%d RDMA\n",
                                conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
                                conn->gnc_error, conn->gnc_peer_error,
                                nlive, nq_rdma, nrdma);
                } else {
-                       CNETERR("Closed conn 0x%p->%s (errno %d): "
-                               "canceled %d TX, %d/%d RDMA\n",
+                       CDEBUG(D_NET, "Closed conn 0x%p->%s (errno %d,"
+                               " peer errno %d): canceled %d TX, %d/%d RDMA\n",
                                conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
                                conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
-                               conn->gnc_error,
+                               conn->gnc_error, conn->gnc_peer_error,
                                nlive, nq_rdma, nrdma);
                }
        }
                                nlive, nq_rdma, nrdma);
                }
        }
@@ -767,6 +788,8 @@ kgnilnd_complete_closed_conn(kgn_conn_t *conn)
        /* Remove from peer's list of valid connections if its not in purgatory */
        if (!conn->gnc_in_purgatory) {
                list_del_init(&conn->gnc_list);
        /* Remove from peer's list of valid connections if its not in purgatory */
        if (!conn->gnc_in_purgatory) {
                list_del_init(&conn->gnc_list);
+               /* Lose peers reference on the conn */
+               kgnilnd_conn_decref(conn);
        }
 
        /* NB - only unlinking if we set pending in del_peer_locked from admin or
        }
 
        /* NB - only unlinking if we set pending in del_peer_locked from admin or
@@ -795,6 +818,7 @@ kgnilnd_set_conn_params(kgn_dgram_t *dgram)
        kgn_gniparams_t        *rem_param = &connreq->gncr_gnparams;
        gni_return_t            rrc;
        int                     rc = 0;
        kgn_gniparams_t        *rem_param = &connreq->gncr_gnparams;
        gni_return_t            rrc;
        int                     rc = 0;
+       gni_smsg_attr_t        *remote = &connreq->gncr_gnparams.gnpr_smsg_attr;
 
        /* set timeout vals in conn early so we can use them for the NAK */
 
 
        /* set timeout vals in conn early so we can use them for the NAK */
 
@@ -829,7 +853,6 @@ kgnilnd_set_conn_params(kgn_dgram_t *dgram)
                        &connreq->gncr_gnparams.gnpr_smsg_attr);
        if (unlikely(rrc == GNI_RC_INVALID_PARAM)) {
                gni_smsg_attr_t *local = &conn->gnpr_smsg_attr;
                        &connreq->gncr_gnparams.gnpr_smsg_attr);
        if (unlikely(rrc == GNI_RC_INVALID_PARAM)) {
                gni_smsg_attr_t *local = &conn->gnpr_smsg_attr;
-               gni_smsg_attr_t *remote = &connreq->gncr_gnparams.gnpr_smsg_attr;
                /* help folks figure out if there is a tunable off, etc. */
                LCONSOLE_ERROR("SMSG attribute mismatch. Data from local/remote:"
                               " type %d/%d msg_maxsize %u/%u"
                /* help folks figure out if there is a tunable off, etc. */
                LCONSOLE_ERROR("SMSG attribute mismatch. Data from local/remote:"
                               " type %d/%d msg_maxsize %u/%u"
@@ -864,6 +887,7 @@ kgnilnd_set_conn_params(kgn_dgram_t *dgram)
 
        conn->gnc_peerstamp = connreq->gncr_peerstamp;
        conn->gnc_peer_connstamp = connreq->gncr_connstamp;
 
        conn->gnc_peerstamp = connreq->gncr_peerstamp;
        conn->gnc_peer_connstamp = connreq->gncr_connstamp;
+       conn->remote_mbox_addr = (void *)((char *)remote->msg_buffer + remote->mbox_offset);
 
        /* We update the reaper timeout once we have a valid conn and timeout */
        kgnilnd_update_reaper_timeout(GNILND_TO2KA(conn->gnc_timeout));
 
        /* We update the reaper timeout once we have a valid conn and timeout */
        kgnilnd_update_reaper_timeout(GNILND_TO2KA(conn->gnc_timeout));
@@ -892,8 +916,8 @@ return_out:
 int
 kgnilnd_create_peer_safe(kgn_peer_t **peerp, lnet_nid_t nid, kgn_net_t *net)
 {
 int
 kgnilnd_create_peer_safe(kgn_peer_t **peerp, lnet_nid_t nid, kgn_net_t *net)
 {
-       kgn_peer_t    *peer;
-       int            rc;
+       kgn_peer_t      *peer;
+       int             rc;
 
        LASSERT(nid != LNET_NID_ANY);
 
 
        LASSERT(nid != LNET_NID_ANY);
 
@@ -922,6 +946,7 @@ kgnilnd_create_peer_safe(kgn_peer_t **peerp, lnet_nid_t nid, kgn_net_t *net)
                return -ENOMEM;
        }
        peer->gnp_nid = nid;
                return -ENOMEM;
        }
        peer->gnp_nid = nid;
+       peer->gnp_down = GNILND_RCA_NODE_UP;
 
        /* translate from nid to nic addr & store */
        rc = kgnilnd_nid_to_nicaddrs(LNET_NIDADDR(nid), 1, &peer->gnp_host_id);
 
        /* translate from nid to nic addr & store */
        rc = kgnilnd_nid_to_nicaddrs(LNET_NIDADDR(nid), 1, &peer->gnp_host_id);
@@ -1028,13 +1053,10 @@ kgnilnd_add_purgatory_locked(kgn_conn_t *conn, kgn_peer_t *peer)
        CDEBUG(D_NET, "conn %p peer %p dev %p\n", conn, peer,
                conn->gnc_device);
 
        CDEBUG(D_NET, "conn %p peer %p dev %p\n", conn, peer,
                conn->gnc_device);
 
-       /* add ref for mbox purgatory hold */
-       kgnilnd_peer_addref(peer);
-       kgnilnd_conn_addref(conn);
        conn->gnc_in_purgatory = 1;
 
        mbox = &conn->gnc_fma_blk->gnm_mbox_info[conn->gnc_mbox_id];
        conn->gnc_in_purgatory = 1;
 
        mbox = &conn->gnc_fma_blk->gnm_mbox_info[conn->gnc_mbox_id];
-       mbox->mbx_prev_nid = peer->gnp_nid;
+       mbox->mbx_prev_purg_nid = peer->gnp_nid;
        mbox->mbx_add_purgatory = jiffies;
        kgnilnd_release_mbox(conn, 1);
 
        mbox->mbx_add_purgatory = jiffies;
        kgnilnd_release_mbox(conn, 1);
 
@@ -1085,7 +1107,6 @@ kgnilnd_detach_purgatory_locked(kgn_conn_t *conn, struct list_head *conn_list)
                 * on the peer's conn_list anymore.
                 */
 
                 * on the peer's conn_list anymore.
                 */
 
-               kgnilnd_peer_decref(conn->gnc_peer);
                list_del_init(&conn->gnc_list);
 
                /* NB - only unlinking if we set pending in del_peer_locked from admin or
                list_del_init(&conn->gnc_list);
 
                /* NB - only unlinking if we set pending in del_peer_locked from admin or
@@ -1253,9 +1274,6 @@ kgnilnd_get_peer_info(int index,
                list_for_each(ptmp, &kgnilnd_data.kgn_peers[i]) {
                        peer = list_entry(ptmp, kgn_peer_t, gnp_list);
 
                list_for_each(ptmp, &kgnilnd_data.kgn_peers[i]) {
                        peer = list_entry(ptmp, kgn_peer_t, gnp_list);
 
-                       if (peer->gnp_nid != *id)
-                               continue;
-
                        if (index-- > 0)
                                continue;
 
                        if (index-- > 0)
                                continue;
 
@@ -1628,6 +1646,103 @@ kgnilnd_close_peer_conns_locked(kgn_peer_t *peer, int why)
 }
 
 int
 }
 
 int
+kgnilnd_report_node_state(lnet_nid_t nid, int down)
+{
+       int         rc;
+       kgn_peer_t  *peer, *new_peer;
+       CFS_LIST_HEAD(zombies);
+
+       write_lock(&kgnilnd_data.kgn_peer_conn_lock);
+       peer = kgnilnd_find_peer_locked(nid);
+
+       if (peer == NULL) {
+               int       i;
+               int       found_net = 0;
+               kgn_net_t *net;
+
+               write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+
+               /* Don't add a peer for node up events */
+               if (down == GNILND_RCA_NODE_UP) {
+                       return 0;
+               }
+
+               /* find any valid net - we don't care which one... */
+               down_read(&kgnilnd_data.kgn_net_rw_sem);
+               for (i = 0; i < *kgnilnd_tunables.kgn_net_hash_size; i++) {
+                       list_for_each_entry(net, &kgnilnd_data.kgn_nets[i],
+                                           gnn_list) {
+                               found_net = 1;
+                               break;
+                       }
+
+                       if (found_net) {
+                               break;
+                       }
+               }
+               up_read(&kgnilnd_data.kgn_net_rw_sem);
+
+               if (!found_net) {
+                       CNETERR("Could not find a net for nid %lld\n", nid);
+                       return 1;
+               }
+
+               /* The nid passed in does not yet contain the net portion.
+                * Let's build it up now
+                */
+               nid = LNET_MKNID(LNET_NIDNET(net->gnn_ni->ni_nid), nid);
+               rc = kgnilnd_add_peer(net, nid, &new_peer);
+
+               if (rc) {
+                       CNETERR("Could not add peer for nid %lld, rc %d\n",
+                               nid, rc);
+                       return 1;
+               }
+
+               write_lock(&kgnilnd_data.kgn_peer_conn_lock);
+               peer = kgnilnd_find_peer_locked(nid);
+
+               if (peer == NULL) {
+                       CNETERR("Could not find peer for nid %lld\n", nid);
+                       write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+                       return 1;
+               }
+       }
+
+       peer->gnp_down = down;
+
+       if (down == GNILND_RCA_NODE_DOWN) {
+               kgn_conn_t *conn;
+
+               peer->gnp_down_event_time = jiffies;
+               kgnilnd_cancel_peer_connect_locked(peer, &zombies);
+               conn = kgnilnd_find_conn_locked(peer);
+
+               if (conn != NULL) {
+                       kgnilnd_close_conn_locked(conn, -ENETRESET);
+               }
+       } else {
+               peer->gnp_up_event_time = jiffies;
+       }
+
+       write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+
+       if (down == GNILND_RCA_NODE_DOWN) {
+               /* using ENETRESET so we don't get messages from
+                * kgnilnd_tx_done
+                */
+               kgnilnd_txlist_done(&zombies, -ENETRESET);
+
+               if (*kgnilnd_tunables.kgn_peer_health) {
+                       kgnilnd_peer_notify(peer, -ECONNRESET);
+               }
+       }
+
+       CDEBUG(D_INFO, "marking nid %lld %s\n", nid, down ? "down" : "up");
+       return 0;
+}
+
+int
 kgnilnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
 {
        struct libcfs_ioctl_data *data = arg;
 kgnilnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
 {
        struct libcfs_ioctl_data *data = arg;
@@ -1847,6 +1962,8 @@ kgnilnd_dev_init(kgn_device_t *dev)
                GOTO(failed, rc);
        }
 
                GOTO(failed, rc);
        }
 
+       /* a bit gross, but not much we can do - Aries Sim doesn't have
+        * hardcoded NIC/NID that we can use */
        rc = kgnilnd_setup_nic_translation(dev->gnd_host_id);
        if (rc != 0) {
                rc = -ENODEV;
        rc = kgnilnd_setup_nic_translation(dev->gnd_host_id);
        if (rc != 0) {
                rc = -ENODEV;
@@ -1857,7 +1974,9 @@ kgnilnd_dev_init(kgn_device_t *dev)
         * - this works because we have a single PTAG, if we had more
         * then we'd need to have multiple handlers */
        if (dev->gnd_id == 0) {
         * - this works because we have a single PTAG, if we had more
         * then we'd need to have multiple handlers */
        if (dev->gnd_id == 0) {
-               rrc = kgnilnd_subscribe_errors(dev->gnd_handle, GNI_ERRMASK_CRITICAL,
+               rrc = kgnilnd_subscribe_errors(dev->gnd_handle,
+                                               GNI_ERRMASK_CRITICAL |
+                                               GNI_ERRMASK_UNKNOWN_TRANSACTION,
                                              0, NULL, kgnilnd_critical_error,
                                              &dev->gnd_err_handle);
                if (rrc != GNI_RC_SUCCESS) {
                                              0, NULL, kgnilnd_critical_error,
                                              &dev->gnd_err_handle);
                if (rrc != GNI_RC_SUCCESS) {
@@ -2026,7 +2145,6 @@ int kgnilnd_base_startup(void)
 
        /* zero pointers, flags etc */
        memset(&kgnilnd_data, 0, sizeof(kgnilnd_data));
 
        /* zero pointers, flags etc */
        memset(&kgnilnd_data, 0, sizeof(kgnilnd_data));
-       memset(&kgnilnd_hssops, 0, sizeof(kgnilnd_hssops));
 
        /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
         * a unique (for all time) connstamp so we can uniquely identify
 
        /* CAVEAT EMPTOR: Every 'Fma' message includes the sender's NID and
         * a unique (for all time) connstamp so we can uniquely identify
@@ -2066,6 +2184,7 @@ int kgnilnd_base_startup(void)
                spin_lock_init(&dev->gnd_dgram_lock);
                spin_lock_init(&dev->gnd_rdmaq_lock);
                INIT_LIST_HEAD(&dev->gnd_rdmaq);
                spin_lock_init(&dev->gnd_dgram_lock);
                spin_lock_init(&dev->gnd_rdmaq_lock);
                INIT_LIST_HEAD(&dev->gnd_rdmaq);
+               init_rwsem(&dev->gnd_conn_sem);
 
                /* alloc & setup nid based dgram table */
                LIBCFS_ALLOC(dev->gnd_dgrams,
 
                /* alloc & setup nid based dgram table */
                LIBCFS_ALLOC(dev->gnd_dgrams,
@@ -2080,10 +2199,15 @@ int kgnilnd_base_startup(void)
                        INIT_LIST_HEAD(&dev->gnd_dgrams[i]);
                }
                atomic_set(&dev->gnd_ndgrams, 0);
                        INIT_LIST_HEAD(&dev->gnd_dgrams[i]);
                }
                atomic_set(&dev->gnd_ndgrams, 0);
-
+               atomic_set(&dev->gnd_nwcdgrams, 0);
                /* setup timer for RDMAQ processing */
                setup_timer(&dev->gnd_rdmaq_timer, kgnilnd_schedule_device_timer,
                            (unsigned long)dev);
                /* setup timer for RDMAQ processing */
                setup_timer(&dev->gnd_rdmaq_timer, kgnilnd_schedule_device_timer,
                            (unsigned long)dev);
+
+               /* setup timer for mapping processing */
+               setup_timer(&dev->gnd_map_timer, kgnilnd_schedule_device_timer,
+                           (unsigned long)dev);
+
        }
 
        /* CQID 0 isn't allowed, set to MAX_MSG_ID - 1 to check for conflicts early */
        }
 
        /* CQID 0 isn't allowed, set to MAX_MSG_ID - 1 to check for conflicts early */
@@ -2098,6 +2222,10 @@ int kgnilnd_base_startup(void)
        atomic_set(&kgnilnd_data.kgn_npending_conns, 0);
        atomic_set(&kgnilnd_data.kgn_npending_unlink, 0);
        atomic_set(&kgnilnd_data.kgn_npending_detach, 0);
        atomic_set(&kgnilnd_data.kgn_npending_conns, 0);
        atomic_set(&kgnilnd_data.kgn_npending_unlink, 0);
        atomic_set(&kgnilnd_data.kgn_npending_detach, 0);
+       atomic_set(&kgnilnd_data.kgn_rev_offset, 0);
+       atomic_set(&kgnilnd_data.kgn_rev_length, 0);
+       atomic_set(&kgnilnd_data.kgn_rev_copy_buff, 0);
+
        /* OK to call kgnilnd_api_shutdown() to cleanup now */
        kgnilnd_data.kgn_init = GNILND_INIT_DATA;
        PORTAL_MODULE_USE;
        /* OK to call kgnilnd_api_shutdown() to cleanup now */
        kgnilnd_data.kgn_init = GNILND_INIT_DATA;
        PORTAL_MODULE_USE;
@@ -2247,6 +2375,12 @@ int kgnilnd_base_startup(void)
                GOTO(failed, rc);
        }
 
                GOTO(failed, rc);
        }
 
+       rc = kgnilnd_start_rca_thread();
+       if (rc != 0) {
+               CERROR("Can't spawn gnilnd rca: %d\n", rc);
+               GOTO(failed, rc);
+       }
+
        /*
         * Start ruhroh thread.  We can't use kgnilnd_thread_start() because
         * we don't want this thread included in kgnilnd_data.kgn_nthreads
        /*
         * Start ruhroh thread.  We can't use kgnilnd_thread_start() because
         * we don't want this thread included in kgnilnd_data.kgn_nthreads
@@ -2316,7 +2450,7 @@ failed:
 void
 kgnilnd_base_shutdown(void)
 {
 void
 kgnilnd_base_shutdown(void)
 {
-       int           i;
+       int                     i;
        ENTRY;
 
        while (CFS_FAIL_TIMEOUT(CFS_FAIL_GNI_PAUSE_SHUTDOWN, 1)) {};
        ENTRY;
 
        while (CFS_FAIL_TIMEOUT(CFS_FAIL_GNI_PAUSE_SHUTDOWN, 1)) {};
@@ -2369,6 +2503,8 @@ kgnilnd_base_shutdown(void)
        wake_up_all(&kgnilnd_data.kgn_reaper_waitq);
        spin_unlock(&kgnilnd_data.kgn_reaper_lock);
 
        wake_up_all(&kgnilnd_data.kgn_reaper_waitq);
        spin_unlock(&kgnilnd_data.kgn_reaper_lock);
 
+       kgnilnd_wakeup_rca_thread();
+
        /* Wait for threads to exit */
        i = 2;
        while (atomic_read(&kgnilnd_data.kgn_nthreads) != 0) {
        /* Wait for threads to exit */
        i = 2;
        while (atomic_read(&kgnilnd_data.kgn_nthreads) != 0) {
@@ -2511,12 +2647,24 @@ kgnilnd_startup(lnet_ni_t *ni)
 
        if (*kgnilnd_tunables.kgn_peer_health) {
                int     fudge;
 
        if (*kgnilnd_tunables.kgn_peer_health) {
                int     fudge;
-
+               int     timeout;
                /* give this a bit of leeway - we don't have a hard timeout
                 * as we only check timeouts periodically - see comment in kgnilnd_reaper */
                fudge = (GNILND_TO2KA(*kgnilnd_tunables.kgn_timeout) / GNILND_REAPER_NCHECKS);
                /* give this a bit of leeway - we don't have a hard timeout
                 * as we only check timeouts periodically - see comment in kgnilnd_reaper */
                fudge = (GNILND_TO2KA(*kgnilnd_tunables.kgn_timeout) / GNILND_REAPER_NCHECKS);
-
-               ni->ni_peertimeout = *kgnilnd_tunables.kgn_timeout + fudge;
+               timeout = *kgnilnd_tunables.kgn_timeout + fudge;
+
+               if (*kgnilnd_tunables.kgn_peer_timeout >= timeout)
+                       ni->ni_peertimeout = *kgnilnd_tunables.kgn_peer_timeout;
+               else if (*kgnilnd_tunables.kgn_peer_timeout > -1) {
+                       LCONSOLE_ERROR("Peer_timeout is set to %d but needs to be >= %d\n",
+                                       *kgnilnd_tunables.kgn_peer_timeout,
+                                       timeout);
+                       ni->ni_data = NULL;
+                       LIBCFS_FREE(net, sizeof(*net));
+                       rc = -EINVAL;
+                       GOTO(failed, rc);
+               } else
+                       ni->ni_peertimeout = timeout;
 
                LCONSOLE_INFO("Enabling LNet peer health for gnilnd, timeout %ds\n",
                              ni->ni_peertimeout);
 
                LCONSOLE_INFO("Enabling LNet peer health for gnilnd, timeout %ds\n",
                              ni->ni_peertimeout);
index de43728..6bfbd9a 100644 (file)
 #ifndef _GNILND_GNILND_H_
 #define _GNILND_GNILND_H_
 
 #ifndef _GNILND_GNILND_H_
 #define _GNILND_GNILND_H_
 
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/mm.h>
+#include <linux/string.h>
+#include <linux/stat.h>
+#include <linux/errno.h>
+#ifdef HAVE_LINUX_KERNEL_LOCK
+#include <linux/smp_lock.h>
+#endif
+#include <linux/unistd.h>
+#include <linux/uio.h>
+#include <linux/time.h>
+#include <asm/timex.h>
+
+#include <asm/system.h>
+#include <asm/uaccess.h>
+#include <asm/io.h>
+
+#include <linux/init.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/stat.h>
+#include <linux/list.h>
+#include <linux/kmod.h>
+#include <linux/sysctl.h>
+#include <linux/kthread.h>
+#include <linux/nmi.h>
+
+#include <net/sock.h>
+#include <linux/in.h>
+#include <linux/nmi.h>
+
 #define DEBUG_SUBSYSTEM S_LND
 
 #define DEBUG_SUBSYSTEM S_LND
 
+#include <libcfs/linux/kp30.h>
 #include <libcfs/libcfs.h>
 #include <lnet/lnet.h>
 #include <lnet/lib-lnet.h>
 #include <libcfs/libcfs.h>
 #include <lnet/lnet.h>
 #include <lnet/lib-lnet.h>
 
 #include <gni_pub.h>
 #include "gnilnd_version.h"
 
 #include <gni_pub.h>
 #include "gnilnd_version.h"
-#include "gnilnd_hss_ops.h"
+
 
 /* tunables determined at compile time */
 #define GNILND_MIN_TIMEOUT     5               /* minimum timeout interval (seconds) */
 
 /* tunables determined at compile time */
 #define GNILND_MIN_TIMEOUT     5               /* minimum timeout interval (seconds) */
-#define GNILND_BASE_TIMEOUT    60              /* default sane timeout */
 #define GNILND_TO2KA(t)                (((t)-1)/2)     /* timeout -> keepalive interval */
 #define GNILND_MIN_RECONNECT_TO        (GNILND_BASE_TIMEOUT/4)
 #define GNILND_MAX_RECONNECT_TO        GNILND_BASE_TIMEOUT
 #define GNILND_HARDWARE_TIMEOUT        15              /* maximum time for data to travel between nodes */
 #define GNILND_MDD_TIMEOUT     15              /* MDD hold timeout in minutes */
 #define GNILND_TO2KA(t)                (((t)-1)/2)     /* timeout -> keepalive interval */
 #define GNILND_MIN_RECONNECT_TO        (GNILND_BASE_TIMEOUT/4)
 #define GNILND_MAX_RECONNECT_TO        GNILND_BASE_TIMEOUT
 #define GNILND_HARDWARE_TIMEOUT        15              /* maximum time for data to travel between nodes */
 #define GNILND_MDD_TIMEOUT     15              /* MDD hold timeout in minutes */
+#define GNILND_SCHED_TIMEOUT       1
+#define GNILND_DGRAM_TIMEOUT       2
+#define GNILND_FAST_MAPPING_TRY   \
+       *kgnilnd_tunables.kgn_max_retransmits   /* maximum number to attempt mapping of a tx */
+#define GNILND_MAP_RETRY_RATE      1            /* interval between mapping attempts in jiffies */
+
+/* map failure timeout */
+#define GNILND_MAP_TIMEOUT         \
+       (cfs_time_seconds(*kgnilnd_tunables.kgn_timeout * \
+        *kgnilnd_tunables.kgn_timeout))
 
 /* reaper thread wakup interval */
 #define GNILND_REAPER_THREAD_WAKE  1
 
 /* reaper thread wakup interval */
 #define GNILND_REAPER_THREAD_WAKE  1
 #define GNILND_MAXDEVS         1               /* max # of GNI devices currently supported */
 #define GNILND_MBOX_CREDITS    256             /* number of credits per mailbox */
 #define GNILND_COOKIE          0xa3579         /* cookie used by along with ptag by GNI */
 #define GNILND_MAXDEVS         1               /* max # of GNI devices currently supported */
 #define GNILND_MBOX_CREDITS    256             /* number of credits per mailbox */
 #define GNILND_COOKIE          0xa3579         /* cookie used by along with ptag by GNI */
-
+#define GNILND_CONN_MAGIC         0xa100f       /* magic value for verifying connection validity */
 /* checksum values */
 #define GNILND_CHECKSUM_OFF            0       /* checksum turned off */
 #define GNILND_CHECKSUM_SMSG_HEADER    1       /* Only checksum SMSG header */
 /* checksum values */
 #define GNILND_CHECKSUM_OFF            0       /* checksum turned off */
 #define GNILND_CHECKSUM_SMSG_HEADER    1       /* Only checksum SMSG header */
 #if defined(CONFIG_CRAY_COMPUTE)
 #define GNILND_SCHED_THREADS      1             /* default # of kgnilnd_scheduler threads */
 #define GNILND_FMABLK             64            /* default number of mboxes per fmablk */
 #if defined(CONFIG_CRAY_COMPUTE)
 #define GNILND_SCHED_THREADS      1             /* default # of kgnilnd_scheduler threads */
 #define GNILND_FMABLK             64            /* default number of mboxes per fmablk */
+#define GNILND_SCHED_NICE         0            /* default nice value for scheduler threads */
+#define GNILND_COMPUTE            1             /* compute image */
 #else
 #define GNILND_SCHED_THREADS      3             /* default # of kgnilnd_scheduler threads */
 #define GNILND_FMABLK             1024          /* default number of mboxes per fmablk */
 #else
 #define GNILND_SCHED_THREADS      3             /* default # of kgnilnd_scheduler threads */
 #define GNILND_FMABLK             1024          /* default number of mboxes per fmablk */
+#define GNILND_SCHED_NICE         -20          /* default nice value for scheduler threads */
+#define GNILND_COMPUTE            0             /* service image */
 #endif
 
 /* EXTRA_BITS are there to allow us to hide NOOP/CLOSE and anything else out of band */
 #endif
 
 /* EXTRA_BITS are there to allow us to hide NOOP/CLOSE and anything else out of band */
 #define GNILND_MSG_GET_NAK           0x08        /* gnm_u.completion (no GET match: src->sink) */
 #define GNILND_MSG_GET_DONE          0x09        /* gnm_u.completion (src->sink) */
 #define GNILND_MSG_CLOSE             0x0a        /* empty gnm_u */
 #define GNILND_MSG_GET_NAK           0x08        /* gnm_u.completion (no GET match: src->sink) */
 #define GNILND_MSG_GET_DONE          0x09        /* gnm_u.completion (src->sink) */
 #define GNILND_MSG_CLOSE             0x0a        /* empty gnm_u */
+#define GNILND_MSG_PUT_REQ_REV       0x0b       /* gnm_u.get (src->sink) */
+#define GNILND_MSG_PUT_DONE_REV      0x0c       /* gnm_u.completion (sink->src) */
+#define GNILND_MSG_PUT_NAK_REV       0x0d        /* gnm_u.completion (no PUT match: sink->src) */
+#define GNILND_MSG_GET_REQ_REV       0x0e        /* gnm_u.get (sink->src ) */
+#define GNILND_MSG_GET_ACK_REV       0x0f        /* gnm_u.getack (GET matched: src->sink) */
+#define GNILND_MSG_GET_DONE_REV      0x10       /* gnm_u.completion (sink -> src) */
+#define GNILND_MSG_GET_NAK_REV       0x11        /* gnm_u.completeion (no GET match: sink -> src) */
 
 /* defines for gnc_*scheduled states */
 #define GNILND_CONN_IDLE             0
 
 /* defines for gnc_*scheduled states */
 #define GNILND_CONN_IDLE             0
 #define GNILND_DEL_PEER              1
 #define GNILND_CLEAR_PURGATORY       2
 
 #define GNILND_DEL_PEER              1
 #define GNILND_CLEAR_PURGATORY       2
 
+#define GNILND_RCA_NODE_UP           0
+#define GNILND_RCA_NODE_DOWN         1
+#define GNILND_RCA_NODE_UNKNOWN      2
+
+/* defines for reverse RDMA states */
+#define GNILND_REVERSE_NONE            0
+#define GNILND_REVERSE_GET             1
+#define GNILND_REVERSE_PUT             2
+#define GNILND_REVERSE_BOTH            (GNILND_REVERSE_GET | GNILND_REVERSE_PUT)
+
 typedef enum kgn_fmablk_state {
        GNILND_FMABLK_IDLE = 0, /* is allocated or ready to be freed */
        GNILND_FMABLK_PHYS,     /* allocated out of slab of physical memory */
 typedef enum kgn_fmablk_state {
        GNILND_FMABLK_IDLE = 0, /* is allocated or ready to be freed */
        GNILND_FMABLK_PHYS,     /* allocated out of slab of physical memory */
@@ -332,12 +398,14 @@ typedef struct {
 typedef struct {
        __u64             gnpam_src_cookie;     /* reflected completion cookie */
        __u64             gnpam_dst_cookie;     /* opaque completion cookie */
 typedef struct {
        __u64             gnpam_src_cookie;     /* reflected completion cookie */
        __u64             gnpam_dst_cookie;     /* opaque completion cookie */
+       __u16             gnpam_payload_cksum;  /* checksum for get msg */
        kgn_rdma_desc_t   gnpam_desc;           /* sender's sink buffer */
 } WIRE_ATTR kgn_putack_msg_t;
 
 typedef struct {
        lnet_hdr_t        gngm_hdr;             /* LNet header */
        __u64             gngm_cookie;          /* opaque completion cookie */
        kgn_rdma_desc_t   gnpam_desc;           /* sender's sink buffer */
 } WIRE_ATTR kgn_putack_msg_t;
 
 typedef struct {
        lnet_hdr_t        gngm_hdr;             /* LNet header */
        __u64             gngm_cookie;          /* opaque completion cookie */
+       __u16             gngm_payload_cksum;   /* checksum for put msg */
        kgn_rdma_desc_t   gngm_desc;            /* sender's sink buffer */
 } WIRE_ATTR kgn_get_msg_t;
 
        kgn_rdma_desc_t   gngm_desc;            /* sender's sink buffer */
 } WIRE_ATTR kgn_get_msg_t;
 
@@ -380,8 +448,7 @@ typedef struct kgn_tunables {
        int              *kgn_max_immediate;    /* immediate payload breakpoint */
        int              *kgn_checksum;         /* checksum data */
        int              *kgn_checksum_dump;    /* dump raw data to D_INFO log when checksumming */
        int              *kgn_max_immediate;    /* immediate payload breakpoint */
        int              *kgn_checksum;         /* checksum data */
        int              *kgn_checksum_dump;    /* dump raw data to D_INFO log when checksumming */
-       int              *kgn_bte_hash;         /* hashing on BTE transfers */
-       int              *kgn_bte_adapt;        /* adaptive routing on BTE transfers */
+       int              *kgn_bte_dlvr_mode;    /* BTE delivery mode mask */
        int              *kgn_bte_relaxed_ordering; /* relaxed ordering (PASSPW) on BTE transfers */
        int              *kgn_ptag;             /* PTAG for cdm_create */
        int              *kgn_max_retransmits;  /* max number of FMA retransmits */
        int              *kgn_bte_relaxed_ordering; /* relaxed ordering (PASSPW) on BTE transfers */
        int              *kgn_ptag;             /* PTAG for cdm_create */
        int              *kgn_max_retransmits;  /* max number of FMA retransmits */
@@ -391,6 +458,7 @@ typedef struct kgn_tunables {
        int              *kgn_loops;            /* # of loops sched does before flush/heartbeat tickle */
        int              *kgn_peer_hash_size;   /* size of kgn_peers */
        int              *kgn_peer_health;      /* enable/disable peer health */
        int              *kgn_loops;            /* # of loops sched does before flush/heartbeat tickle */
        int              *kgn_peer_hash_size;   /* size of kgn_peers */
        int              *kgn_peer_health;      /* enable/disable peer health */
+       int              *kgn_peer_timeout;     /* Override of the default peer_timeout used by peer_health */
        int              *kgn_vmap_cksum;       /* enable/disable vmap of kiov checksums */
        int              *kgn_mbox_per_block;   /* mailboxes per fmablk */
        int              *kgn_nphys_mbox;       /* # mailboxes to preallocate with physical memory */
        int              *kgn_vmap_cksum;       /* enable/disable vmap of kiov checksums */
        int              *kgn_mbox_per_block;   /* mailboxes per fmablk */
        int              *kgn_nphys_mbox;       /* # mailboxes to preallocate with physical memory */
@@ -399,6 +467,10 @@ typedef struct kgn_tunables {
        int              *kgn_net_hash_size;    /* size of kgn_net_ht */
        int              *kgn_hardware_timeout; /* max time for a message to get across the network */
        int              *kgn_mdd_timeout;      /* max time for ghal to hold an mdd in minutes */
        int              *kgn_net_hash_size;    /* size of kgn_net_ht */
        int              *kgn_hardware_timeout; /* max time for a message to get across the network */
        int              *kgn_mdd_timeout;      /* max time for ghal to hold an mdd in minutes */
+       int              *kgn_sched_timeout;    /* max time for scheduler to run before yielding */
+       int              *kgn_dgram_timeout;    /* max time for dgram mover to run before scheduling */
+       int              *kgn_sched_nice;       /* nice value for kgnilnd scheduler threads */
+       int              *kgn_reverse_rdma;     /* Reverse RDMA setting */
 #if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM
        cfs_sysctl_table_header_t *kgn_sysctl;  /* sysctl interface */
 #endif
 #if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM
        cfs_sysctl_table_header_t *kgn_sysctl;  /* sysctl interface */
 #endif
@@ -406,11 +478,14 @@ typedef struct kgn_tunables {
 
 typedef struct kgn_mbox_info {
        lnet_nid_t mbx_prev_nid;
 
 typedef struct kgn_mbox_info {
        lnet_nid_t mbx_prev_nid;
+       lnet_nid_t mbx_prev_purg_nid;
        unsigned long mbx_create_conn_memset;
        unsigned long mbx_add_purgatory;
        unsigned long mbx_detach_of_purgatory;
        unsigned long mbx_release_from_purgatory;
        unsigned long mbx_release_purg_active_dgram;
        unsigned long mbx_create_conn_memset;
        unsigned long mbx_add_purgatory;
        unsigned long mbx_detach_of_purgatory;
        unsigned long mbx_release_from_purgatory;
        unsigned long mbx_release_purg_active_dgram;
+       int           mbx_nallocs;
+       int           mbx_nallocs_total;
 } kgn_mbox_info_t;
 
 typedef struct kgn_fma_memblock {
 } kgn_mbox_info_t;
 
 typedef struct kgn_fma_memblock {
@@ -460,9 +535,11 @@ typedef struct kgn_device {
        int                     gnd_dgram_ready;  /* dgrams need movin' */
        struct list_head       *gnd_dgrams;       /* nid hash to dgrams */
        atomic_t                gnd_ndgrams;      /* # dgrams extant */
        int                     gnd_dgram_ready;  /* dgrams need movin' */
        struct list_head       *gnd_dgrams;       /* nid hash to dgrams */
        atomic_t                gnd_ndgrams;      /* # dgrams extant */
+       atomic_t                gnd_nwcdgrams;    /* # wildcard dgrams to post on device */
        spinlock_t              gnd_dgram_lock;   /* serialize gnd_dgrams */
        struct list_head        gnd_map_list;     /* list of all mapped regions */
        int                     gnd_map_version;  /* version flag for map list */
        spinlock_t              gnd_dgram_lock;   /* serialize gnd_dgrams */
        struct list_head        gnd_map_list;     /* list of all mapped regions */
        int                     gnd_map_version;  /* version flag for map list */
+       struct timer_list       gnd_map_timer;    /* wakey-wakey */
        atomic_t                gnd_n_mdd;        /* number of total MDD - fma, tx, etc */
        atomic_t                gnd_n_mdd_held;   /* number of total MDD held - fma, tx, etc */
        atomic_t                gnd_nq_map;       /* # queued waiting for mapping (MDD/GART) */
        atomic_t                gnd_n_mdd;        /* number of total MDD - fma, tx, etc */
        atomic_t                gnd_n_mdd_held;   /* number of total MDD held - fma, tx, etc */
        atomic_t                gnd_nq_map;       /* # queued waiting for mapping (MDD/GART) */
@@ -472,6 +549,9 @@ typedef struct kgn_device {
        __u32                   gnd_map_nvirt;    /* # TX virt mappings */
        __u64                   gnd_map_virtnob;  /* # TX virt bytes mapped */
        spinlock_t              gnd_map_lock;     /* serialize gnd_map_XXX */
        __u32                   gnd_map_nvirt;    /* # TX virt mappings */
        __u64                   gnd_map_virtnob;  /* # TX virt bytes mapped */
        spinlock_t              gnd_map_lock;     /* serialize gnd_map_XXX */
+       unsigned long           gnd_next_map;     /* next mapping attempt in jiffies */
+       int                     gnd_map_attempt;  /* last map attempt # */
+       unsigned long           gnd_last_map;     /* map timeout base */
        struct list_head        gnd_rdmaq;        /* RDMA to be sent */
        spinlock_t              gnd_rdmaq_lock;   /* play nice with others */
        atomic64_t              gnd_rdmaq_bytes_out; /* # bytes authorized */
        struct list_head        gnd_rdmaq;        /* RDMA to be sent */
        spinlock_t              gnd_rdmaq_lock;   /* play nice with others */
        atomic64_t              gnd_rdmaq_bytes_out; /* # bytes authorized */
@@ -494,6 +574,7 @@ typedef struct kgn_device {
        atomic_t                gnd_n_yield;
        atomic_t                gnd_n_schedule;
        atomic_t                gnd_canceled_dgrams; /* # of outstanding cancels */
        atomic_t                gnd_n_yield;
        atomic_t                gnd_n_schedule;
        atomic_t                gnd_canceled_dgrams; /* # of outstanding cancels */
+       struct rw_semaphore     gnd_conn_sem;       /* serialize connection changes/data movement */
 } kgn_device_t;
 
 typedef struct kgn_net {
 } kgn_device_t;
 
 typedef struct kgn_net {
@@ -575,6 +656,7 @@ typedef struct kgn_tx {                         /* message descriptor */
        int                       tx_buftype;   /* payload buffer type */
        int                       tx_phys_npages; /* # physical pages */
        gni_mem_handle_t          tx_map_key;   /* mapping key */
        int                       tx_buftype;   /* payload buffer type */
        int                       tx_phys_npages; /* # physical pages */
        gni_mem_handle_t          tx_map_key;   /* mapping key */
+       gni_mem_handle_t          tx_buffer_copy_map_key;  /* mapping key for page aligned copy */
        gni_mem_segment_t        *tx_phys;      /* page descriptors */
        kgn_msg_t                 tx_msg;       /* FMA message buffer */
        kgn_tx_ev_id_t            tx_id;        /* who are you, who ? who ? */
        gni_mem_segment_t        *tx_phys;      /* page descriptors */
        kgn_msg_t                 tx_msg;       /* FMA message buffer */
        kgn_tx_ev_id_t            tx_id;        /* who are you, who ? who ? */
@@ -582,6 +664,9 @@ typedef struct kgn_tx {                         /* message descriptor */
        int                       tx_retrans;   /* retrans count of RDMA */
        int                       tx_rc;        /* if we need to stash the ret code until we see completion */
        void                     *tx_buffer;    /* source/sink buffer */
        int                       tx_retrans;   /* retrans count of RDMA */
        int                       tx_rc;        /* if we need to stash the ret code until we see completion */
        void                     *tx_buffer;    /* source/sink buffer */
+       void                     *tx_buffer_copy;   /* pointer to page aligned buffer */
+       unsigned int              tx_nob_rdma;  /* nob actually rdma */
+       unsigned int              tx_offset;    /* offset of data into copied buffer */
        union {
                gni_post_descriptor_t     tx_rdma_desc; /* rdma descriptor */
                struct page              *tx_imm_pages[GNILND_MAX_IMMEDIATE/PAGE_SIZE];  /* page array to map kiov for immediate send */
        union {
                gni_post_descriptor_t     tx_rdma_desc; /* rdma descriptor */
                struct page              *tx_imm_pages[GNILND_MAX_IMMEDIATE/PAGE_SIZE];  /* page array to map kiov for immediate send */
@@ -597,6 +682,7 @@ typedef struct kgn_tx {                         /* message descriptor */
 typedef struct kgn_conn {
        kgn_device_t       *gnc_device;         /* which device */
        struct kgn_peer    *gnc_peer;           /* owning peer */
 typedef struct kgn_conn {
        kgn_device_t       *gnc_device;         /* which device */
        struct kgn_peer    *gnc_peer;           /* owning peer */
+       int                 gnc_magic;          /* magic value cleared before free */
        struct list_head    gnc_list;           /* stash on peer's conn list - or pending purgatory lists as we clear them */
        struct list_head    gnc_hashlist;       /* stash in connection hash table */
        struct list_head    gnc_schedlist;      /* schedule (on gnd_?_conns) for attention */
        struct list_head    gnc_list;           /* stash on peer's conn list - or pending purgatory lists as we clear them */
        struct list_head    gnc_hashlist;       /* stash in connection hash table */
        struct list_head    gnc_schedlist;      /* schedule (on gnd_?_conns) for attention */
@@ -632,6 +718,8 @@ typedef struct kgn_conn {
        int                 gnc_peer_error;     /* errno peer sent us on CLOSE */
        kgn_conn_state_t    gnc_state;          /* connection state */
        int                 gnc_scheduled;      /* being attented to */
        int                 gnc_peer_error;     /* errno peer sent us on CLOSE */
        kgn_conn_state_t    gnc_state;          /* connection state */
        int                 gnc_scheduled;      /* being attented to */
+       char                gnc_sched_caller[30]; /* what function last called schedule */
+       int                 gnc_sched_line;     /* what line # last called schedule */
        atomic_t            gnc_refcount;       /* # users */
        spinlock_t          gnc_list_lock;      /* serialise tx lists, max_rx_age */
        gni_ep_handle_t     gnc_ephandle;       /* GNI endpoint */
        atomic_t            gnc_refcount;       /* # users */
        spinlock_t          gnc_list_lock;      /* serialise tx lists, max_rx_age */
        gni_ep_handle_t     gnc_ephandle;       /* GNI endpoint */
@@ -644,6 +732,9 @@ typedef struct kgn_conn {
        int                 gnc_mbox_id;        /* id of mbox in fma_blk                 */
        short               gnc_needs_detach;   /* flag set in detach_purgatory_all_locked so reaper will clear out purgatory */
        short               gnc_needs_closing;  /* flag set in del_conns when called from kgnilnd_del_peer_or_conn */
        int                 gnc_mbox_id;        /* id of mbox in fma_blk                 */
        short               gnc_needs_detach;   /* flag set in detach_purgatory_all_locked so reaper will clear out purgatory */
        short               gnc_needs_closing;  /* flag set in del_conns when called from kgnilnd_del_peer_or_conn */
+       atomic_t            gnc_tx_in_use;      /* # of tx's currently in use by another thread use kgnilnd_peer_conn_lock */
+       kgn_dgram_type_t    gnc_dgram_type;     /* save dgram type used to establish this conn */
+       void               *remote_mbox_addr;   /* save remote mbox address */
 } kgn_conn_t;
 
 typedef struct kgn_mdd_purgatory {
 } kgn_conn_t;
 
 typedef struct kgn_mdd_purgatory {
@@ -669,6 +760,9 @@ typedef struct kgn_peer {
        unsigned long       gnp_reconnect_time;         /* CURRENT_SECONDS when reconnect OK */
        unsigned long       gnp_reconnect_interval;     /* exponential backoff */
        atomic_t            gnp_dirty_eps;              /* # of old but yet to be destroyed EPs from conns */
        unsigned long       gnp_reconnect_time;         /* CURRENT_SECONDS when reconnect OK */
        unsigned long       gnp_reconnect_interval;     /* exponential backoff */
        atomic_t            gnp_dirty_eps;              /* # of old but yet to be destroyed EPs from conns */
+       int                 gnp_down;                   /* rca says peer down */
+       unsigned long       gnp_down_event_time;        /* time peer down */
+       unsigned long       gnp_up_event_time;          /* time peer back up */
 } kgn_peer_t;
 
 /* the kgn_rx_t is a struct for handing to LNET as the private pointer for things
 } kgn_peer_t;
 
 /* the kgn_rx_t is a struct for handing to LNET as the private pointer for things
@@ -690,6 +784,8 @@ typedef struct kgn_data {
        int                     kgn_nresets;          /* number of stack resets */
        int                     kgn_in_reset;         /* are we in stack reset ? */
 
        int                     kgn_nresets;          /* number of stack resets */
        int                     kgn_in_reset;         /* are we in stack reset ? */
 
+       __u64                   kgn_nid_trans_private;/* private data for each of the HW nid2nic arenas */
+
        kgn_device_t            kgn_devices[GNILND_MAXDEVS]; /* device/ptag/cq etc */
        int                     kgn_ndevs;            /* # devices */
 
        kgn_device_t            kgn_devices[GNILND_MAXDEVS]; /* device/ptag/cq etc */
        int                     kgn_ndevs;            /* # devices */
 
@@ -747,7 +843,11 @@ typedef struct kgn_data {
        atomic_t                kgn_npending_unlink;  /* # of peers pending unlink */
        atomic_t                kgn_npending_conns;   /* # of conns with pending closes */
        atomic_t                kgn_npending_detach;  /* # of conns with a pending detach */
        atomic_t                kgn_npending_unlink;  /* # of peers pending unlink */
        atomic_t                kgn_npending_conns;   /* # of conns with pending closes */
        atomic_t                kgn_npending_detach;  /* # of conns with a pending detach */
-
+       unsigned long           kgn_last_scheduled;   /* last time schedule was called in a sched thread */
+       unsigned long           kgn_last_condresched; /* last time cond_resched was called in a sched thread */
+       atomic_t                kgn_rev_offset;       /* number of time REV rdma have been misaligned offsets */
+       atomic_t                kgn_rev_length;       /* Number of times REV rdma have been misaligned lengths */
+       atomic_t                kgn_rev_copy_buff;    /* Number of times REV rdma have had to make a copy buffer */
 } kgn_data_t;
 
 extern kgn_data_t         kgnilnd_data;
 } kgn_data_t;
 
 extern kgn_data_t         kgnilnd_data;
@@ -755,7 +855,18 @@ extern kgn_tunables_t     kgnilnd_tunables;
 
 extern void kgnilnd_destroy_peer(kgn_peer_t *peer);
 extern void kgnilnd_destroy_conn(kgn_conn_t *conn);
 
 extern void kgnilnd_destroy_peer(kgn_peer_t *peer);
 extern void kgnilnd_destroy_conn(kgn_conn_t *conn);
-extern void kgnilnd_schedule_conn(kgn_conn_t *conn);
+extern int _kgnilnd_schedule_conn(kgn_conn_t *conn, const char *caller, int line, int refheld);
+
+/* Macro wrapper for _kgnilnd_schedule_conn. This will store the function
+ * and the line of the calling function to allow us to debug problematic
+ * schedule calls in the future without the programmer having to mark
+ * the location manually.
+ */
+#define kgnilnd_schedule_conn(conn)                                    \
+       _kgnilnd_schedule_conn(conn, __func__, __LINE__, 0);
+
+#define kgnilnd_schedule_conn_refheld(conn, refheld)                           \
+       _kgnilnd_schedule_conn(conn, __func__, __LINE__, refheld);
 
 static inline int
 kgnilnd_thread_start(int(*fn)(void *arg), void *arg, char *name, int id)
 
 static inline int
 kgnilnd_thread_start(int(*fn)(void *arg), void *arg, char *name, int id)
@@ -985,7 +1096,7 @@ do {
        LASSERTF(val >= 0, "peer %p refcount %d\n", peer, val);                 \
        CDEBUG(D_NETTRACE, "peer %p->%s--(%d)\n", peer,                         \
               libcfs_nid2str(peer->gnp_nid), val);                             \
        LASSERTF(val >= 0, "peer %p refcount %d\n", peer, val);                 \
        CDEBUG(D_NETTRACE, "peer %p->%s--(%d)\n", peer,                         \
               libcfs_nid2str(peer->gnp_nid), val);                             \
-       if (atomic_read(&peer->gnp_refcount) == 0)                              \
+       if (val == 0)                                                           \
                kgnilnd_destroy_peer(peer);                                     \
 } while(0)
 
                kgnilnd_destroy_peer(peer);                                     \
 } while(0)
 
@@ -995,7 +1106,8 @@ do {                                                                    \
                                                                        \
        smp_wmb();                                                      \
        val = atomic_inc_return(&conn->gnc_refcount);                   \
                                                                        \
        smp_wmb();                                                      \
        val = atomic_inc_return(&conn->gnc_refcount);                   \
-       LASSERTF(val >= 0, "conn %p refc %d to %s\n",                   \
+       LASSERTF(val > 1 && conn->gnc_magic == GNILND_CONN_MAGIC,       \
+               "conn %p refc %d to %s\n",                              \
                conn, val,                                              \
                conn->gnc_peer                                          \
                        ? libcfs_nid2str(conn->gnc_peer->gnp_nid)       \
                conn, val,                                              \
                conn->gnc_peer                                          \
                        ? libcfs_nid2str(conn->gnc_peer->gnp_nid)       \
@@ -1074,12 +1186,12 @@ do {                                                                    \
                        : "<?>",                                        \
                val);                                                   \
        smp_rmb();                                                      \
                        : "<?>",                                        \
                val);                                                   \
        smp_rmb();                                                      \
-       if ((atomic_read(&conn->gnc_refcount) == 1) &&                  \
+       if ((val == 1) &&                                               \
            (conn->gnc_ephandle != NULL) &&                             \
            (conn->gnc_state != GNILND_CONN_DESTROY_EP)) {              \
                set_mb(conn->gnc_state, GNILND_CONN_DESTROY_EP);        \
                kgnilnd_schedule_conn(conn);                            \
            (conn->gnc_ephandle != NULL) &&                             \
            (conn->gnc_state != GNILND_CONN_DESTROY_EP)) {              \
                set_mb(conn->gnc_state, GNILND_CONN_DESTROY_EP);        \
                kgnilnd_schedule_conn(conn);                            \
-       } else if (atomic_read(&conn->gnc_refcount) == 0) {             \
+       } else if (val == 0) {                                          \
                kgnilnd_destroy_conn(conn);                             \
        }                                                               \
 } while (0)
                kgnilnd_destroy_conn(conn);                             \
        }                                                               \
 } while (0)
@@ -1128,7 +1240,7 @@ kgnilnd_conn_clean_errno(int errno)
 {
        /*  - ESHUTDOWN - LND is unloading
         *  - EUCLEAN - admin requested via "lctl del_peer"
 {
        /*  - ESHUTDOWN - LND is unloading
         *  - EUCLEAN - admin requested via "lctl del_peer"
-        *  - ENETRESET - admin requested via "lctl disconnect"
+        *  - ENETRESET - admin requested via "lctl disconnect" or rca event
         *  - ENOTRECOVERABLE - stack reset
         *  - EISCONN - cleared via "lctl push"
         *  not doing ESTALE - that isn't clean */
         *  - ENOTRECOVERABLE - stack reset
         *  - EISCONN - cleared via "lctl push"
         *  not doing ESTALE - that isn't clean */
@@ -1434,6 +1546,7 @@ kgnilnd_validate_tx_ev_id(kgn_tx_ev_id_t *ev_id, kgn_tx_t **txp, kgn_conn_t **co
        }
        /* just insurance */
        kgnilnd_conn_addref(conn);
        }
        /* just insurance */
        kgnilnd_conn_addref(conn);
+       kgnilnd_admin_addref(conn->gnc_tx_in_use);
        read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
 
        /* we know this is safe - as the TX won't be reused until AFTER
        read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
 
        /* we know this is safe - as the TX won't be reused until AFTER
@@ -1448,6 +1561,7 @@ kgnilnd_validate_tx_ev_id(kgn_tx_ev_id_t *ev_id, kgn_tx_t **txp, kgn_conn_t **co
         * lctl disconnect or del_peer. */
        if (tx == NULL) {
                CNETERR("txe_idx %d is gone, ignoring event\n", ev_id->txe_idx);
         * lctl disconnect or del_peer. */
        if (tx == NULL) {
                CNETERR("txe_idx %d is gone, ignoring event\n", ev_id->txe_idx);
+               kgnilnd_admin_decref(conn->gnc_tx_in_use);
                kgnilnd_conn_decref(conn);
                return;
        }
                kgnilnd_conn_decref(conn);
                return;
        }
@@ -1584,8 +1698,8 @@ kgn_tx_t *kgnilnd_new_tx_msg(int type, lnet_nid_t source);
 void kgnilnd_tx_done(kgn_tx_t *tx, int completion);
 void kgnilnd_txlist_done(struct list_head *txlist, int error);
 void kgnilnd_unlink_peer_locked(kgn_peer_t *peer);
 void kgnilnd_tx_done(kgn_tx_t *tx, int completion);
 void kgnilnd_txlist_done(struct list_head *txlist, int error);
 void kgnilnd_unlink_peer_locked(kgn_peer_t *peer);
-void kgnilnd_schedule_conn(kgn_conn_t *conn);
-void kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent);
+int _kgnilnd_schedule_conn(kgn_conn_t *conn, const char *caller, int line, int refheld);
+int kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent);
 
 void kgnilnd_schedule_dgram(kgn_device_t *dev);
 int kgnilnd_create_peer_safe(kgn_peer_t **peerp, lnet_nid_t nid, kgn_net_t *net);
 
 void kgnilnd_schedule_dgram(kgn_device_t *dev);
 int kgnilnd_create_peer_safe(kgn_peer_t **peerp, lnet_nid_t nid, kgn_net_t *net);
@@ -1608,6 +1722,7 @@ void kgnilnd_schedule_device_timer(unsigned long arg);
 int kgnilnd_reaper(void *arg);
 int kgnilnd_scheduler(void *arg);
 int kgnilnd_dgram_mover(void *arg);
 int kgnilnd_reaper(void *arg);
 int kgnilnd_scheduler(void *arg);
 int kgnilnd_dgram_mover(void *arg);
+int kgnilnd_rca(void *arg);
 
 int kgnilnd_create_conn(kgn_conn_t **connp, kgn_device_t *dev);
 int kgnilnd_conn_isdup_locked(kgn_peer_t *peer, kgn_conn_t *newconn);
 
 int kgnilnd_create_conn(kgn_conn_t **connp, kgn_device_t *dev);
 int kgnilnd_conn_isdup_locked(kgn_peer_t *peer, kgn_conn_t *newconn);
@@ -1625,6 +1740,9 @@ void kgnilnd_complete_closed_conn(kgn_conn_t *conn);
 void kgnilnd_destroy_conn_ep(kgn_conn_t *conn);
 
 int kgnilnd_close_peer_conns_locked(kgn_peer_t *peer, int why);
 void kgnilnd_destroy_conn_ep(kgn_conn_t *conn);
 
 int kgnilnd_close_peer_conns_locked(kgn_peer_t *peer, int why);
+int kgnilnd_report_node_state(lnet_nid_t nid, int down);
+void kgnilnd_wakeup_rca_thread(void);
+int kgnilnd_start_rca_thread(void);
 
 int kgnilnd_tunables_init(void);
 void kgnilnd_tunables_fini(void);
 
 int kgnilnd_tunables_init(void);
 void kgnilnd_tunables_fini(void);
@@ -1699,6 +1817,13 @@ kgnilnd_msgtype2str(int type)
                DO_TYPE(GNILND_MSG_GET_NAK);
                DO_TYPE(GNILND_MSG_GET_DONE);
                DO_TYPE(GNILND_MSG_CLOSE);
                DO_TYPE(GNILND_MSG_GET_NAK);
                DO_TYPE(GNILND_MSG_GET_DONE);
                DO_TYPE(GNILND_MSG_CLOSE);
+               DO_TYPE(GNILND_MSG_PUT_REQ_REV);
+               DO_TYPE(GNILND_MSG_PUT_DONE_REV);
+               DO_TYPE(GNILND_MSG_PUT_NAK_REV);
+               DO_TYPE(GNILND_MSG_GET_REQ_REV);
+               DO_TYPE(GNILND_MSG_GET_ACK_REV);
+               DO_TYPE(GNILND_MSG_GET_DONE_REV);
+               DO_TYPE(GNILND_MSG_GET_NAK_REV);
        }
        return "<unknown msg type>";
 }
        }
        return "<unknown msg type>";
 }
@@ -1781,10 +1906,33 @@ kgnilnd_dgram_type2str(kgn_dgram_t *dgram)
        return "<?type?>";
 }
 
        return "<?type?>";
 }
 
+static inline const char *
+kgnilnd_conn_dgram_type2str(kgn_dgram_type_t type)
+{
+       switch (type) {
+               DO_TYPE(GNILND_DGRAM_REQ);
+               DO_TYPE(GNILND_DGRAM_WC_REQ);
+               DO_TYPE(GNILND_DGRAM_NAK);
+               DO_TYPE(GNILND_DGRAM_CLOSE);
+       }
+       return "<?type?>";
+}
 
 #undef DO_TYPE
 
 /* API wrapper functions - include late to pick up all of the other defines */
 #include "gnilnd_api_wrap.h"
 
 
 #undef DO_TYPE
 
 /* API wrapper functions - include late to pick up all of the other defines */
 #include "gnilnd_api_wrap.h"
 
+/* pulls in tunables per platform and adds in nid/nic conversion
+ * if RCA wasn't available at build time */
+#include "gnilnd_hss_ops.h"
+
+#if defined(CONFIG_CRAY_GEMINI)
+ #include "gnilnd_gemini.h"
+#elif defined(CONFIG_CRAY_ARIES)
+ #include "gnilnd_aries.h"
+#else
+ #error "Undefined Network Hardware Type"
+#endif
+
 #endif /* _GNILND_GNILND_H_ */
 #endif /* _GNILND_GNILND_H_ */
index e7ba9ab..2a38432 100644 (file)
 #define CFS_FAIL_GNI_GNP_CONNECTING1   0xf046
 #define CFS_FAIL_GNI_GNP_CONNECTING2   0xf047
 #define CFS_FAIL_GNI_GNP_CONNECTING3   0xf048
 #define CFS_FAIL_GNI_GNP_CONNECTING1   0xf046
 #define CFS_FAIL_GNI_GNP_CONNECTING2   0xf047
 #define CFS_FAIL_GNI_GNP_CONNECTING3   0xf048
+#define CFS_FAIL_GNI_SCHEDULE_COMPLETE 0xf049
 #define CFS_FAIL_GNI_PUT_ACK_AGAIN     0xf050
 #define CFS_FAIL_GNI_GET_REQ_AGAIN     0xf051
 #define CFS_FAIL_GNI_PUT_ACK_AGAIN     0xf050
 #define CFS_FAIL_GNI_GET_REQ_AGAIN     0xf051
+#define CFS_FAIL_GNI_SCHED_DEADLINE    0xf052
+#define CFS_FAIL_GNI_DGRAM_DEADLINE    0xf053
+
 
 /* helper macros */
 extern void
 
 /* helper macros */
 extern void
diff --git a/lnet/klnds/gnilnd/gnilnd_aries.h b/lnet/klnds/gnilnd/gnilnd_aries.h
new file mode 100644 (file)
index 0000000..890c709
--- /dev/null
@@ -0,0 +1,115 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2009-2012 Cray, Inc.
+ *   Author: Nic Henke <nic@cray.com>, James Shimek <jshimek@cray.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+#ifndef _GNILND_ARIES_H
+#define _GNILND_ARIES_H
+
+/* for libcfs_ipif_query */
+#include <libcfs/libcfs.h>
+
+#ifndef _GNILND_HSS_OPS_H
+# error "must include gnilnd_hss_ops.h first"
+#endif
+
+/* Set HW related values */
+#include <aries/aries_timeouts_gpl.h>
+
+#define GNILND_BASE_TIMEOUT        TIMEOUT_SECS(TO_GNILND_timeout)
+#define GNILND_CHECKSUM_DEFAULT    0            /* all off for Aries */
+
+#if defined(CONFIG_CRAY_COMPUTE)
+#define GNILND_REVERSE_RDMA        GNILND_REVERSE_PUT
+#define GNILND_RDMA_DLVR_OPTION    GNI_DLVMODE_PERFORMANCE
+#else
+#define GNILND_REVERSE_RDMA        GNILND_REVERSE_GET
+#define GNILND_RDMA_DLVR_OPTION    GNI_DLVMODE_PERFORMANCE
+#endif
+
+/* plug in our functions for use on the simulator */
+#if !defined(GNILND_USE_RCA)
+
+extern kgn_data_t kgnilnd_data;
+
+#define kgnilnd_hw_hb()              do {} while(0)
+
+/* Aries Sim doesn't have hardcoded tables, so we'll hijack the nic_pe
+ * and decode our address and nic addr from that - the rest are just offsets */
+
+static inline int
+kgnilnd_nid_to_nicaddrs(__u32 nid, int numnic, __u32 *nicaddr)
+{
+       if (numnic > 1) {
+               CERROR("manual nid2nic translation doesn't support"
+                      "multiple nic addrs (you asked for %d)\n",
+                       numnic);
+               return -EINVAL;
+       }
+       if (nid < kgnilnd_data.kgn_nid_trans_private) {
+               CERROR("Request for invalid nid translation %u, minimum %Lu\n",
+                      nid, kgnilnd_data.kgn_nid_trans_private);
+               return -ESRCH;
+       }
+
+       *nicaddr = nid - kgnilnd_data.kgn_nid_trans_private;
+
+       CDEBUG(D_NETTRACE, "Sim nid %d -> nic 0x%x\n", nid, *nicaddr);
+
+       return 1;
+}
+
+static inline int
+kgnilnd_nicaddr_to_nid(__u32 nicaddr, __u32 *nid)
+{
+       *nid = kgnilnd_data.kgn_nid_trans_private + nicaddr;
+       return 1;
+}
+
+/* XXX Nic: This does not support multiple device!!!! */
+static inline int
+kgnilnd_setup_nic_translation(__u32 device_id)
+{
+       char              *if_name = "ipogif0";
+       __u32              ipaddr, netmask, my_nid;
+       int                up, rc;
+
+       LCONSOLE_INFO("using Aries SIM IP info for RCA translation\n");
+
+       rc = libcfs_ipif_query(if_name, &up, &ipaddr, &netmask);
+       if (rc != 0) {
+               CERROR ("can't get IP interface for %s: %d\n", if_name, rc);
+               return rc;
+       }
+       if (!up) {
+               CERROR ("IP interface %s is down\n", if_name);
+               return -ENODEV;
+       }
+
+       my_nid = ((ipaddr >> 8) & 0xFF) + (ipaddr & 0xFF);
+
+       kgnilnd_data.kgn_nid_trans_private = my_nid - device_id;
+
+       return 0;
+}
+
+#endif /* GNILND_USE_RCA */
+
+#endif /* _GNILND_ARIES_H */
index 56be88a..381aa64 100644 (file)
@@ -117,8 +117,11 @@ kgnilnd_device_callback(__u32 devid, __u64 arg)
  * < 0 : do not reschedule under any circumstances
  * == 0: reschedule if someone marked him WANTS_SCHED
  * > 0 : force a reschedule */
  * < 0 : do not reschedule under any circumstances
  * == 0: reschedule if someone marked him WANTS_SCHED
  * > 0 : force a reschedule */
+/* Return code 0 means it did not schedule the conn, 1
+ *  means it succesfully scheduled the conn.
+ */
 
 
-void
+int
 kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent)
 {
        int     conn_sched;
 kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent)
 {
        int     conn_sched;
@@ -136,19 +139,28 @@ kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent)
 
        if (sched_intent >= 0) {
                if ((sched_intent > 0 || (conn_sched == GNILND_CONN_WANTS_SCHED))) {
 
        if (sched_intent >= 0) {
                if ((sched_intent > 0 || (conn_sched == GNILND_CONN_WANTS_SCHED))) {
-                       kgnilnd_schedule_conn(conn);
+                       return kgnilnd_schedule_conn_refheld(conn, 1);
                }
        }
                }
        }
+       return 0;
 }
 
 }
 
-void
-kgnilnd_schedule_conn(kgn_conn_t *conn)
+/* Return of 0 for conn not scheduled, 1 returned if conn was scheduled or marked
+ * as scheduled */
+
+int
+_kgnilnd_schedule_conn(kgn_conn_t *conn, const char *caller, int line, int refheld)
 {
        kgn_device_t        *dev = conn->gnc_device;
        int                  sched;
 {
        kgn_device_t        *dev = conn->gnc_device;
        int                  sched;
+       int                  rc;
 
        sched = xchg(&conn->gnc_scheduled, GNILND_CONN_WANTS_SCHED);
 
        sched = xchg(&conn->gnc_scheduled, GNILND_CONN_WANTS_SCHED);
-
+       /* we only care about the last person who marked want_sched since they
+        * are most likely the culprit
+        */
+       memcpy(conn->gnc_sched_caller, caller, sizeof(conn->gnc_sched_caller));
+       conn->gnc_sched_line = line;
        /* if we are IDLE, add to list - only one guy sees IDLE and "wins"
         * the chance to put it onto gnd_ready_conns.
         * otherwise, leave marked as WANTS_SCHED and the thread that "owns"
        /* if we are IDLE, add to list - only one guy sees IDLE and "wins"
         * the chance to put it onto gnd_ready_conns.
         * otherwise, leave marked as WANTS_SCHED and the thread that "owns"
@@ -158,25 +170,33 @@ kgnilnd_schedule_conn(kgn_conn_t *conn)
        if (sched == GNILND_CONN_IDLE) {
                /* if the conn is already scheduled, we've already requested
                 * the scheduler thread wakeup */
        if (sched == GNILND_CONN_IDLE) {
                /* if the conn is already scheduled, we've already requested
                 * the scheduler thread wakeup */
-               kgnilnd_conn_addref(conn);       /* +1 ref for scheduler */
-
+               if (!refheld) {
+                       /* Add a reference to the conn if we are not holding a reference
+                        * already from the exisiting scheduler. We now use the same
+                        * reference if we need to reschedule a conn while in a scheduler
+                        * thread.
+                        */
+                       kgnilnd_conn_addref(conn);
+               }
                LASSERTF(list_empty(&conn->gnc_schedlist), "conn %p already sched state %d\n",
                         conn, sched);
 
                LASSERTF(list_empty(&conn->gnc_schedlist), "conn %p already sched state %d\n",
                         conn, sched);
 
-               CDEBUG(D_INFO, "scheduling conn 0x%p\n", conn);
+               CDEBUG(D_INFO, "scheduling conn 0x%p caller %s:%d\n", conn, caller, line);
 
                spin_lock(&dev->gnd_lock);
                list_add_tail(&conn->gnc_schedlist, &dev->gnd_ready_conns);
                spin_unlock(&dev->gnd_lock);
                set_mb(conn->gnc_last_sched_ask, jiffies);
 
                spin_lock(&dev->gnd_lock);
                list_add_tail(&conn->gnc_schedlist, &dev->gnd_ready_conns);
                spin_unlock(&dev->gnd_lock);
                set_mb(conn->gnc_last_sched_ask, jiffies);
-
+               rc = 1;
        } else {
        } else {
-               CDEBUG(D_INFO, "not scheduling conn 0x%p: %d\n", conn, sched);
+               CDEBUG(D_INFO, "not scheduling conn 0x%p: %d caller %s:%d\n", conn, sched, caller, line);
+               rc = 0;
        }
 
        /* make sure thread(s) going to process conns - but let it make
         * separate decision from conn schedule */
        kgnilnd_schedule_device(dev);
        }
 
        /* make sure thread(s) going to process conns - but let it make
         * separate decision from conn schedule */
        kgnilnd_schedule_device(dev);
+       return rc;
 }
 
 void
 }
 
 void
@@ -212,6 +232,13 @@ kgnilnd_free_tx(kgn_tx_t *tx)
                CDEBUG(D_MALLOC, "slab-freed 'tx_phys': %lu at %p.\n",
                       LNET_MAX_IOV * sizeof(gni_mem_segment_t), tx->tx_phys);
        }
                CDEBUG(D_MALLOC, "slab-freed 'tx_phys': %lu at %p.\n",
                       LNET_MAX_IOV * sizeof(gni_mem_segment_t), tx->tx_phys);
        }
+
+       /* Only free the buffer if we used it */
+       if (tx->tx_buffer_copy != NULL) {
+               vfree(tx->tx_buffer_copy);
+               tx->tx_buffer_copy = NULL;
+               CDEBUG(D_MALLOC, "vfreed buffer2\n");
+       }
 #if 0
        KGNILND_POISON(tx, 0x5a, sizeof(kgn_tx_t));
 #endif
 #if 0
        KGNILND_POISON(tx, 0x5a, sizeof(kgn_tx_t));
 #endif
@@ -221,9 +248,9 @@ kgnilnd_free_tx(kgn_tx_t *tx)
 }
 
 kgn_tx_t *
 }
 
 kgn_tx_t *
-kgnilnd_alloc_tx(void)
+kgnilnd_alloc_tx (void)
 {
 {
-       kgn_tx_t      *tx = NULL;
+       kgn_tx_t        *tx = NULL;
 
        if (CFS_FAIL_CHECK(CFS_FAIL_GNI_ALLOC_TX))
                return tx;
 
        if (CFS_FAIL_CHECK(CFS_FAIL_GNI_ALLOC_TX))
                return tx;
@@ -416,14 +443,40 @@ kgnilnd_new_tx_msg(int type, lnet_nid_t source)
 }
 
 static void
 }
 
 static void
-kgnilnd_nak_rdma(kgn_conn_t *conn, int type, int error, __u64 cookie, lnet_nid_t source) {
+kgnilnd_nak_rdma(kgn_conn_t *conn, int rx_type, int error, __u64 cookie, lnet_nid_t source) {
        kgn_tx_t        *tx;
 
        kgn_tx_t        *tx;
 
+       int             nak_type;
+
+       switch (rx_type) {
+       case GNILND_MSG_GET_REQ:
+       case GNILND_MSG_GET_DONE:
+               nak_type = GNILND_MSG_GET_NAK;
+               break;
+       case GNILND_MSG_PUT_REQ:
+       case GNILND_MSG_PUT_ACK:
+       case GNILND_MSG_PUT_DONE:
+               nak_type = GNILND_MSG_PUT_NAK;
+               break;
+       case GNILND_MSG_PUT_REQ_REV:
+       case GNILND_MSG_PUT_DONE_REV:
+               nak_type = GNILND_MSG_PUT_NAK_REV;
+               break;
+       case GNILND_MSG_GET_REQ_REV:
+       case GNILND_MSG_GET_ACK_REV:
+       case GNILND_MSG_GET_DONE_REV:
+               nak_type = GNILND_MSG_GET_NAK_REV;
+               break;
+       default:
+               CERROR("invalid msg type %s (%d)\n",
+                       kgnilnd_msgtype2str(rx_type), rx_type);
+               LBUG();
+       }
        /* only allow NAK on error and truncate to zero */
        LASSERTF(error <= 0, "error %d conn 0x%p, cookie "LPU64"\n",
                 error, conn, cookie);
 
        /* only allow NAK on error and truncate to zero */
        LASSERTF(error <= 0, "error %d conn 0x%p, cookie "LPU64"\n",
                 error, conn, cookie);
 
-       tx = kgnilnd_new_tx_msg(type, source);
+       tx = kgnilnd_new_tx_msg(nak_type, source);
        if (tx == NULL) {
                CNETERR("can't get TX to NAK RDMA to %s\n",
                        libcfs_nid2str(conn->gnc_peer->gnp_nid));
        if (tx == NULL) {
                CNETERR("can't get TX to NAK RDMA to %s\n",
                        libcfs_nid2str(conn->gnc_peer->gnp_nid));
@@ -577,8 +630,8 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
                          unsigned int offset, unsigned int nob)
 {
        gni_mem_segment_t *phys;
                          unsigned int offset, unsigned int nob)
 {
        gni_mem_segment_t *phys;
-       int                rc = 0;
-       unsigned int       fraglen;
+       int             rc = 0;
+       unsigned int    fraglen;
 
        GNIDBG_TX(D_NET, tx, "niov %d kiov 0x%p offset %u nob %u", nkiov, kiov, offset, nob);
 
 
        GNIDBG_TX(D_NET, tx, "niov %d kiov 0x%p offset %u nob %u", nkiov, kiov, offset, nob);
 
@@ -588,7 +641,7 @@ kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
 
        /* only allocate this if we are going to use it */
        tx->tx_phys = cfs_mem_cache_alloc(kgnilnd_data.kgn_tx_phys_cache,
 
        /* only allocate this if we are going to use it */
        tx->tx_phys = cfs_mem_cache_alloc(kgnilnd_data.kgn_tx_phys_cache,
-                                         CFS_ALLOC_ATOMIC);
+                                             CFS_ALLOC_ATOMIC);
        if (tx->tx_phys == NULL) {
                CERROR("failed to allocate tx_phys\n");
                rc = -ENOMEM;
        if (tx->tx_phys == NULL) {
                CERROR("failed to allocate tx_phys\n");
                rc = -ENOMEM;
@@ -691,7 +744,9 @@ kgnilnd_setup_rdma_buffer(kgn_tx_t *tx, unsigned int niov,
 {
        int     rc;
 
 {
        int     rc;
 
-       LASSERT((iov == NULL) != (kiov == NULL));
+       LASSERTF((iov == NULL) != (kiov == NULL), "iov 0x%p, kiov 0x%p, tx 0x%p,"
+                                               " offset %d, nob %d, niov %d\n"
+                                               , iov, kiov, tx, offset, nob, niov);
 
        if (kiov != NULL) {
                rc = kgnilnd_setup_phys_buffer(tx, niov, kiov, offset, nob);
 
        if (kiov != NULL) {
                rc = kgnilnd_setup_phys_buffer(tx, niov, kiov, offset, nob);
@@ -701,9 +756,20 @@ kgnilnd_setup_rdma_buffer(kgn_tx_t *tx, unsigned int niov,
        return rc;
 }
 
        return rc;
 }
 
+/* kgnilnd_parse_lnet_rdma()
+ * lntmsg - message passed in from lnet.
+ * niov, kiov, offset - see lnd_t in lib-types.h for descriptions.
+ * nob - actual number of bytes to in this message.
+ * put_len - It is possible for PUTs to have a different length than the
+ *           length stored in lntmsg->msg_len since LNET can adjust this
+ *           length based on it's buffer size and offset.
+ *           lnet_try_match_md() sets the mlength that we use to do the RDMA
+ *           transfer.
+ */
 static void
 static void
-kgnilnd_parse_lnet_rdma(lnet_msg_t *lntmsg, unsigned int *niov, unsigned int *offset,
-                       unsigned int *nob, lnet_kiov_t **kiov)
+kgnilnd_parse_lnet_rdma(lnet_msg_t *lntmsg, unsigned int *niov,
+                       unsigned int *offset, unsigned int *nob,
+                       lnet_kiov_t **kiov, int put_len)
 {
        /* GETs are weird, see kgnilnd_send */
        if (lntmsg->msg_type == LNET_MSG_GET) {
 {
        /* GETs are weird, see kgnilnd_send */
        if (lntmsg->msg_type == LNET_MSG_GET) {
@@ -718,13 +784,13 @@ kgnilnd_parse_lnet_rdma(lnet_msg_t *lntmsg, unsigned int *niov, unsigned int *of
        } else {
                *kiov = lntmsg->msg_kiov;
                *niov = lntmsg->msg_niov;
        } else {
                *kiov = lntmsg->msg_kiov;
                *niov = lntmsg->msg_niov;
-               *nob = lntmsg->msg_len;
+               *nob = put_len;
                *offset = lntmsg->msg_offset;
        }
 }
 
 static inline void
                *offset = lntmsg->msg_offset;
        }
 }
 
 static inline void
-kgnilnd_compute_rdma_cksum(kgn_tx_t *tx)
+kgnilnd_compute_rdma_cksum(kgn_tx_t *tx, int put_len)
 {
        unsigned int     niov, offset, nob;
        lnet_kiov_t     *kiov;
 {
        unsigned int     niov, offset, nob;
        lnet_kiov_t     *kiov;
@@ -732,10 +798,18 @@ kgnilnd_compute_rdma_cksum(kgn_tx_t *tx)
        int              dump_cksum = (*kgnilnd_tunables.kgn_checksum_dump > 1);
 
        GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE) ||
        int              dump_cksum = (*kgnilnd_tunables.kgn_checksum_dump > 1);
 
        GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE) ||
-                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE)),
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_ACK_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ_REV)),
                      "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type));
 
                      "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type));
 
-
+       if ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV) ||
+           (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV)) {
+               tx->tx_msg.gnm_payload_cksum = 0;
+               return;
+       }
        if (*kgnilnd_tunables.kgn_checksum < 3) {
                tx->tx_msg.gnm_payload_cksum = 0;
                return;
        if (*kgnilnd_tunables.kgn_checksum < 3) {
                tx->tx_msg.gnm_payload_cksum = 0;
                return;
@@ -743,7 +817,8 @@ kgnilnd_compute_rdma_cksum(kgn_tx_t *tx)
 
        GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL);
 
 
        GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL);
 
-       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov);
+       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov,
+                               put_len);
 
        if (kiov != NULL) {
                tx->tx_msg.gnm_payload_cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, dump_cksum);
 
        if (kiov != NULL) {
                tx->tx_msg.gnm_payload_cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, dump_cksum);
@@ -759,8 +834,13 @@ kgnilnd_compute_rdma_cksum(kgn_tx_t *tx)
        }
 }
 
        }
 }
 
+/* kgnilnd_verify_rdma_cksum()
+ * tx - PUT_DONE/GET_DONE matched tx.
+ * rx_cksum - received checksum to compare against.
+ * put_len - see kgnilnd_parse_lnet_rdma comments.
+ */
 static inline int
 static inline int
-kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum)
+kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum, int put_len)
 {
        int              rc = 0;
        __u16            cksum;
 {
        int              rc = 0;
        __u16            cksum;
@@ -771,9 +851,18 @@ kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum)
 
        /* we can only match certain requests */
        GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) ||
 
        /* we can only match certain requests */
        GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) ||
-                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK)),
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_ACK_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) ||
+                          (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV)),
                      "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type));
 
                      "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type));
 
+       if ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ_REV) ||
+           (tx->tx_msg.gnm_type == GNILND_MSG_GET_ACK_REV)) {
+               return 0;
+       }
+
        if (rx_cksum == 0)  {
                if (*kgnilnd_tunables.kgn_checksum >= 3) {
                        GNIDBG_MSG(D_WARNING, &tx->tx_msg,
        if (rx_cksum == 0)  {
                if (*kgnilnd_tunables.kgn_checksum >= 3) {
                        GNIDBG_MSG(D_WARNING, &tx->tx_msg,
@@ -784,7 +873,7 @@ kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum)
 
        GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL);
 
 
        GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL);
 
-       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov);
+       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov, put_len);
 
        if (kiov != NULL) {
                cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, 0);
 
        if (kiov != NULL) {
                cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, 0);
@@ -1001,14 +1090,17 @@ kgnilnd_map_buffer(kgn_tx_t *tx)
 void
 kgnilnd_add_purgatory_tx(kgn_tx_t *tx)
 {
 void
 kgnilnd_add_purgatory_tx(kgn_tx_t *tx)
 {
-       kgn_conn_t                  *conn = tx->tx_conn;
-       kgn_mdd_purgatory_t         *gmp;
+       kgn_conn_t              *conn = tx->tx_conn;
+       kgn_mdd_purgatory_t     *gmp;
 
        LIBCFS_ALLOC(gmp, sizeof(*gmp));
        LASSERTF(gmp != NULL, "couldn't allocate MDD purgatory member;"
                " asserting to avoid data corruption\n");
 
        LIBCFS_ALLOC(gmp, sizeof(*gmp));
        LASSERTF(gmp != NULL, "couldn't allocate MDD purgatory member;"
                " asserting to avoid data corruption\n");
-
+       if (tx->tx_buffer_copy)
+               gmp->gmp_map_key = tx->tx_buffer_copy_map_key;
+       else
        gmp->gmp_map_key = tx->tx_map_key;
        gmp->gmp_map_key = tx->tx_map_key;
+
        atomic_inc(&conn->gnc_device->gnd_n_mdd_held);
 
        /* ensure that we don't have a blank purgatory - indicating the
        atomic_inc(&conn->gnc_device->gnd_n_mdd_held);
 
        /* ensure that we don't have a blank purgatory - indicating the
@@ -1079,10 +1171,15 @@ kgnilnd_unmap_buffer(kgn_tx_t *tx, int error)
                                 tx->tx_conn->gnc_device, hold_timeout,
                                 tx->tx_map_key.qword1, tx->tx_map_key.qword2);
                }
                                 tx->tx_conn->gnc_device, hold_timeout,
                                 tx->tx_map_key.qword1, tx->tx_map_key.qword2);
                }
-
+               if (tx->tx_buffer_copy != NULL) {
+                       rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_buffer_copy_map_key, hold_timeout);
+                       LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
+                       rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, 0);
+                       LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
+               } else {
                rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, hold_timeout);
                rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, hold_timeout);
-
                LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
                LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
+               }
 
                tx->tx_buftype--;
                kgnilnd_mem_del_map_list(dev, tx);
 
                tx->tx_buftype--;
                kgnilnd_mem_del_map_list(dev, tx);
@@ -1159,6 +1256,7 @@ kgnilnd_tx_done(kgn_tx_t *tx, int completion)
        /* warning - we should hold no locks here - calling lnet_finalize
         * could free up lnet credits, resulting in a call chain back into
         * the LND via kgnilnd_send and friends */
        /* warning - we should hold no locks here - calling lnet_finalize
         * could free up lnet credits, resulting in a call chain back into
         * the LND via kgnilnd_send and friends */
+
        lnet_finalize(ni, lntmsg0, status0);
 
        if (lntmsg1 != NULL) {
        lnet_finalize(ni, lntmsg0, status0);
 
        if (lntmsg1 != NULL) {
@@ -1627,6 +1725,8 @@ kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx)
        switch (tx->tx_msg.gnm_type) {
        case GNILND_MSG_PUT_ACK:
        case GNILND_MSG_GET_REQ:
        switch (tx->tx_msg.gnm_type) {
        case GNILND_MSG_PUT_ACK:
        case GNILND_MSG_GET_REQ:
+       case GNILND_MSG_PUT_REQ_REV:
+       case GNILND_MSG_GET_ACK_REV:
                /* hijacking time! If this messages will authorize our peer to
                 * send his dirty little bytes in an RDMA, we need to get permission */
                kgnilnd_queue_rdma(conn, tx);
                /* hijacking time! If this messages will authorize our peer to
                 * send his dirty little bytes in an RDMA, we need to get permission */
                kgnilnd_queue_rdma(conn, tx);
@@ -1638,17 +1738,8 @@ kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx)
                if (rc >= 0) {
                        /* it was sent, break out of switch to avoid default case of queueing */
                        break;
                if (rc >= 0) {
                        /* it was sent, break out of switch to avoid default case of queueing */
                        break;
-               } else if (rc == -EAGAIN) {
-                       /* needs to queue to try again, so  fall through to default case */
-               } else {
-                       /* bail: it wasnt sent and we didn't get EAGAIN indicating
-                        * we should retrans - We do not close the conn due to locking
-                        * we let the reaper thread take care of it. There are no hard
-                        * errors from send_msg that would require close to be called
-                        */
-                       kgnilnd_tx_done(tx, rc);
-                       break;
                }
                }
+               /* needs to queue to try again, so fall through to default case */
        case GNILND_MSG_NOOP:
                /* Just make sure this goes out first for this conn */
                add_tail = 0;
        case GNILND_MSG_NOOP:
                /* Just make sure this goes out first for this conn */
                add_tail = 0;
@@ -1695,6 +1786,13 @@ kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target)
                        read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
                        RETURN_EXIT;
                }
                        read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
                        RETURN_EXIT;
                }
+
+               /* don't create a connection if the peer is marked down */
+               if (peer->gnp_down == GNILND_RCA_NODE_DOWN) {
+                       read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+                       rc = -ENETRESET;
+                       GOTO(no_peer, rc);
+               }
        }
 
        /* creating peer or conn; I'll need a write lock... */
        }
 
        /* creating peer or conn; I'll need a write lock... */
@@ -1748,8 +1846,12 @@ kgnilnd_rdma(kgn_tx_t *tx, int type,
 {
        kgn_conn_t   *conn = tx->tx_conn;
        unsigned long timestamp;
 {
        kgn_conn_t   *conn = tx->tx_conn;
        unsigned long timestamp;
+       gni_post_type_t post_type;
        gni_return_t  rrc;
        gni_return_t  rrc;
-
+       int rc = 0;
+       unsigned int desc_nob = nob;
+       void *desc_buffer = tx->tx_buffer;
+       gni_mem_handle_t desc_map_key = tx->tx_map_key;
        LASSERTF(kgnilnd_tx_mapped(tx),
                "unmapped tx %p\n", tx);
        LASSERTF(conn != NULL,
        LASSERTF(kgnilnd_tx_mapped(tx),
                "unmapped tx %p\n", tx);
        LASSERTF(conn != NULL,
@@ -1761,27 +1863,90 @@ kgnilnd_rdma(kgn_tx_t *tx, int type,
                "nob %d > tx(%p)->tx_nob %d\n",
                nob, tx, tx->tx_nob);
 
                "nob %d > tx(%p)->tx_nob %d\n",
                nob, tx, tx->tx_nob);
 
+       switch (type) {
+       case GNILND_MSG_GET_DONE:
+       case GNILND_MSG_PUT_DONE:
+               post_type = GNI_POST_RDMA_PUT;
+               break;
+       case GNILND_MSG_GET_DONE_REV:
+       case GNILND_MSG_PUT_DONE_REV:
+               post_type = GNI_POST_RDMA_GET;
+               break;
+       default:
+               CERROR("invalid msg type %s (%d)\n",
+                       kgnilnd_msgtype2str(type), type);
+               LBUG();
+       }
+       if (post_type == GNI_POST_RDMA_GET) {
+               /* Check for remote buffer / local buffer / length alignment. All must be 4 byte
+                * aligned. If the local buffer is not aligned correctly using the copy buffer
+                * will fix that issue. If length is misaligned copy buffer will also fix the issue, we end
+                * up transferring extra bytes into the buffer but only copy the correct nob into the original
+                * buffer.  Remote offset correction is done through a combination of adjusting the offset,
+                * making sure the length and addr are aligned and copying the data into the correct location
+                * once the transfer has completed.
+                */
+               if ((((__u64)((unsigned long)tx->tx_buffer)) & 3) ||
+                     (sink->gnrd_addr & 3) ||
+                     (nob & 3)) {
+
+                       tx->tx_offset = ((__u64)((unsigned long)sink->gnrd_addr)) & 3;
+                       if (tx->tx_offset)
+                               kgnilnd_admin_addref(kgnilnd_data.kgn_rev_offset);
+
+                       if ((nob + tx->tx_offset) & 3) {
+                               desc_nob = ((nob + tx->tx_offset) + (4 - ((nob + tx->tx_offset) & 3)));
+                               kgnilnd_admin_addref(kgnilnd_data.kgn_rev_length);
+                       } else {
+                               desc_nob = (nob + tx->tx_offset);
+                       }
+
+                       if (tx->tx_buffer_copy == NULL) {
+                               /* Allocate the largest copy buffer we will need, this will prevent us from overwriting data
+                                * and require at most we allocate a few extra bytes. */
+                               tx->tx_buffer_copy = vmalloc(desc_nob);
+
+                               if (!tx->tx_buffer_copy) {
+                                       /* allocation of buffer failed nak the rdma */
+                                       kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, -EFAULT, cookie, tx->tx_msg.gnm_srcnid);
+                                       kgnilnd_tx_done(tx, -EFAULT);
+                                       return;
+                               }
+                               kgnilnd_admin_addref(kgnilnd_data.kgn_rev_copy_buff);
+                               rc = kgnilnd_mem_register(conn->gnc_device->gnd_handle, (__u64)tx->tx_buffer_copy, desc_nob, NULL, GNI_MEM_READWRITE, &tx->tx_buffer_copy_map_key);
+                               if (rc != GNI_RC_SUCCESS) {
+                                       /* Registration Failed nak rdma and kill the tx. */
+                                       vfree(tx->tx_buffer_copy);
+                                       tx->tx_buffer_copy = NULL;
+                                       kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type, -EFAULT, cookie, tx->tx_msg.gnm_srcnid);
+                                       kgnilnd_tx_done(tx, -EFAULT);
+                                       return;
+                               }
+                       }
+                       desc_map_key = tx->tx_buffer_copy_map_key;
+                       desc_buffer = tx->tx_buffer_copy;
+               }
+       }
+
        memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
        tx->tx_rdma_desc.post_id = tx->tx_id.txe_cookie;
        memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
        tx->tx_rdma_desc.post_id = tx->tx_id.txe_cookie;
-       tx->tx_rdma_desc.type = GNI_POST_RDMA_PUT;
+       tx->tx_rdma_desc.type = post_type;
        tx->tx_rdma_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
        tx->tx_rdma_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
-       tx->tx_rdma_desc.local_addr = (__u64)((unsigned long)tx->tx_buffer);
-       tx->tx_rdma_desc.local_mem_hndl = tx->tx_map_key;
-       tx->tx_rdma_desc.remote_addr = sink->gnrd_addr;
+       tx->tx_rdma_desc.local_addr = (__u64)((unsigned long)desc_buffer);
+       tx->tx_rdma_desc.local_mem_hndl = desc_map_key;
+       tx->tx_rdma_desc.remote_addr = sink->gnrd_addr - tx->tx_offset;
        tx->tx_rdma_desc.remote_mem_hndl = sink->gnrd_key;
        tx->tx_rdma_desc.remote_mem_hndl = sink->gnrd_key;
-       tx->tx_rdma_desc.length = nob;
-       if (!*kgnilnd_tunables.kgn_bte_hash)
-               tx->tx_rdma_desc.dlvr_mode |= GNI_DLVMODE_NO_HASH;
-       if (!*kgnilnd_tunables.kgn_bte_adapt)
-               tx->tx_rdma_desc.dlvr_mode |= (GNI_DLVMODE_NO_ADAPT | GNI_DLVMODE_NO_RADAPT);
-
+       tx->tx_rdma_desc.length = desc_nob;
+       tx->tx_nob_rdma = nob;
+       if (*kgnilnd_tunables.kgn_bte_dlvr_mode)
+               tx->tx_rdma_desc.dlvr_mode = *kgnilnd_tunables.kgn_bte_dlvr_mode;
        /* prep final completion message */
        kgnilnd_init_msg(&tx->tx_msg, type, tx->tx_msg.gnm_srcnid);
        tx->tx_msg.gnm_u.completion.gncm_cookie = cookie;
        /* send actual size RDMA'd in retval */
        tx->tx_msg.gnm_u.completion.gncm_retval = nob;
 
        /* prep final completion message */
        kgnilnd_init_msg(&tx->tx_msg, type, tx->tx_msg.gnm_srcnid);
        tx->tx_msg.gnm_u.completion.gncm_cookie = cookie;
        /* send actual size RDMA'd in retval */
        tx->tx_msg.gnm_u.completion.gncm_retval = nob;
 
-       kgnilnd_compute_rdma_cksum(tx);
+       kgnilnd_compute_rdma_cksum(tx, nob);
 
        if (nob == 0) {
                kgnilnd_queue_tx(conn, tx);
 
        if (nob == 0) {
                kgnilnd_queue_tx(conn, tx);
@@ -1792,8 +1957,8 @@ kgnilnd_rdma(kgn_tx_t *tx, int type,
        LASSERTF(!conn->gnc_close_sent, "tx %p on conn %p after close sent %d\n",
                 tx, conn, conn->gnc_close_sent);
 
        LASSERTF(!conn->gnc_close_sent, "tx %p on conn %p after close sent %d\n",
                 tx, conn, conn->gnc_close_sent);
 
-       GNIDBG_TX(D_NET, tx, "Post RDMA type 0x%02x dlvr_mode 0x%x",
-              type, tx->tx_rdma_desc.dlvr_mode);
+       GNIDBG_TX(D_NET, tx, "Post RDMA type 0x%02x dlvr_mode 0x%x cookie:"LPX64,
+               type, tx->tx_rdma_desc.dlvr_mode, cookie);
 
        /* set CQ dedicated for RDMA */
        tx->tx_rdma_desc.src_cq_hndl = conn->gnc_device->gnd_snd_rdma_cqh;
 
        /* set CQ dedicated for RDMA */
        tx->tx_rdma_desc.src_cq_hndl = conn->gnc_device->gnd_snd_rdma_cqh;
@@ -1823,7 +1988,7 @@ kgnilnd_rdma(kgn_tx_t *tx, int type,
 kgn_rx_t *
 kgnilnd_alloc_rx(void)
 {
 kgn_rx_t *
 kgnilnd_alloc_rx(void)
 {
-       kgn_rx_t        *rx;
+       kgn_rx_t        *rx;
 
        rx = cfs_mem_cache_alloc(kgnilnd_data.kgn_rx_cache, CFS_ALLOC_ATOMIC);
        if (rx == NULL) {
 
        rx = cfs_mem_cache_alloc(kgnilnd_data.kgn_rx_cache, CFS_ALLOC_ATOMIC);
        if (rx == NULL) {
@@ -1905,6 +2070,7 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
        kgn_tx_t         *tx;
        int               rc = 0;
        int               mpflag = 0;
        kgn_tx_t         *tx;
        int               rc = 0;
        int               mpflag = 0;
+       int               reverse_rdma_flag = *kgnilnd_tunables.kgn_reverse_rdma;
 
        /* NB 'private' is different depending on what we're sending.... */
        LASSERT(!in_interrupt());
 
        /* NB 'private' is different depending on what we're sending.... */
        LASSERT(!in_interrupt());
@@ -1951,12 +2117,15 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                if (lntmsg->msg_md->md_length <= *kgnilnd_tunables.kgn_max_immediate)
                       break;
 
                if (lntmsg->msg_md->md_length <= *kgnilnd_tunables.kgn_max_immediate)
                       break;
 
-               tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ, ni->ni_nid);
+               if ((reverse_rdma_flag & GNILND_REVERSE_GET) == 0)
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ, ni->ni_nid);
+               else
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ_REV, ni->ni_nid);
+
                if (tx == NULL) {
                        rc = -ENOMEM;
                        goto out;
                }
                if (tx == NULL) {
                        rc = -ENOMEM;
                        goto out;
                }
-
                /* slightly different options as we might actually have a GET with a
                 * MD_KIOV set but a non-NULL md_iov.iov */
                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
                /* slightly different options as we might actually have a GET with a
                 * MD_KIOV set but a non-NULL md_iov.iov */
                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
@@ -1984,11 +2153,14 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                }
 
                tx->tx_lntmsg[0] = lntmsg;
                }
 
                tx->tx_lntmsg[0] = lntmsg;
-               tx->tx_msg.gnm_u.get.gngm_hdr = *hdr;
+               if ((reverse_rdma_flag & GNILND_REVERSE_GET) == 0)
+                       tx->tx_msg.gnm_u.get.gngm_hdr = *hdr;
+               else
+                       tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr;
+
                /* rest of tx_msg is setup just before it is sent */
                kgnilnd_launch_tx(tx, net, &target);
                goto out;
                /* rest of tx_msg is setup just before it is sent */
                kgnilnd_launch_tx(tx, net, &target);
                goto out;
-
        case LNET_MSG_REPLY:
        case LNET_MSG_PUT:
                /* to save on MDDs, we'll handle short kiov by vmap'ing
        case LNET_MSG_REPLY:
        case LNET_MSG_PUT:
                /* to save on MDDs, we'll handle short kiov by vmap'ing
@@ -1996,7 +2168,11 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                if (nob <= *kgnilnd_tunables.kgn_max_immediate)
                       break;
 
                if (nob <= *kgnilnd_tunables.kgn_max_immediate)
                       break;
 
-               tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ, ni->ni_nid);
+               if ((reverse_rdma_flag & GNILND_REVERSE_PUT) == 0)
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ, ni->ni_nid);
+               else
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ_REV, ni->ni_nid);
+
                if (tx == NULL) {
                        rc = -ENOMEM;
                        goto out;
                if (tx == NULL) {
                        rc = -ENOMEM;
                        goto out;
@@ -2010,7 +2186,11 @@ kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                }
 
                tx->tx_lntmsg[0] = lntmsg;
                }
 
                tx->tx_lntmsg[0] = lntmsg;
-               tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr;
+               if ((reverse_rdma_flag & GNILND_REVERSE_PUT) == 0)
+                       tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr;
+               else
+                       tx->tx_msg.gnm_u.get.gngm_hdr = *hdr;
+
                /* rest of tx_msg is setup just before it is sent */
                kgnilnd_launch_tx(tx, net, &target);
                goto out;
                /* rest of tx_msg is setup just before it is sent */
                kgnilnd_launch_tx(tx, net, &target);
                goto out;
@@ -2045,7 +2225,7 @@ out:
 }
 
 void
 }
 
 void
-kgnilnd_reply(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg)
+kgnilnd_setup_rdma(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg, int mlen)
 {
        kgn_conn_t    *conn = rx->grx_conn;
        kgn_msg_t     *rxmsg = rx->grx_msg;
 {
        kgn_conn_t    *conn = rx->grx_conn;
        kgn_msg_t     *rxmsg = rx->grx_msg;
@@ -2054,10 +2234,26 @@ kgnilnd_reply(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg)
        lnet_kiov_t   *kiov = lntmsg->msg_kiov;
        unsigned int   offset = lntmsg->msg_offset;
        unsigned int   nob = lntmsg->msg_len;
        lnet_kiov_t   *kiov = lntmsg->msg_kiov;
        unsigned int   offset = lntmsg->msg_offset;
        unsigned int   nob = lntmsg->msg_len;
+       int            done_type;
        kgn_tx_t      *tx;
        int            rc = 0;
 
        kgn_tx_t      *tx;
        int            rc = 0;
 
-       tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_DONE, ni->ni_nid);
+       switch (rxmsg->gnm_type) {
+       case GNILND_MSG_PUT_REQ_REV:
+               done_type = GNILND_MSG_PUT_DONE_REV;
+               nob = mlen;
+               break;
+       case GNILND_MSG_GET_REQ:
+               done_type = GNILND_MSG_GET_DONE;
+               break;
+       default:
+               CERROR("invalid msg type %s (%d)\n",
+                       kgnilnd_msgtype2str(rxmsg->gnm_type),
+                       rxmsg->gnm_type);
+               LBUG();
+       }
+
+       tx = kgnilnd_new_tx_msg(done_type, ni->ni_nid);
        if (tx == NULL)
                goto failed_0;
 
        if (tx == NULL)
                goto failed_0;
 
@@ -2084,7 +2280,7 @@ kgnilnd_reply(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg)
 
  failed_1:
        kgnilnd_tx_done(tx, rc);
 
  failed_1:
        kgnilnd_tx_done(tx, rc);
-       kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
+       kgnilnd_nak_rdma(conn, done_type, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
  failed_0:
        lnet_finalize(ni, lntmsg, rc);
 }
  failed_0:
        lnet_finalize(ni, lntmsg, rc);
 }
@@ -2097,6 +2293,8 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
        kgn_conn_t      *conn = rx->grx_conn;
        kgn_msg_t       *rxmsg = rx->grx_msg;
        kgn_msg_t       *eagermsg = NULL;
        kgn_conn_t      *conn = rx->grx_conn;
        kgn_msg_t       *rxmsg = rx->grx_msg;
        kgn_msg_t       *eagermsg = NULL;
+       kgn_peer_t      *peer = NULL;
+       kgn_conn_t      *found_conn = NULL;
 
        GNIDBG_MSG(D_NET, rxmsg, "eager recv for conn %p, rxmsg %p, lntmsg %p",
                conn, rxmsg, lntmsg);
 
        GNIDBG_MSG(D_NET, rxmsg, "eager recv for conn %p, rxmsg %p, lntmsg %p",
                conn, rxmsg, lntmsg);
@@ -2106,11 +2304,47 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
                        rxmsg->gnm_payload_len);
                return -EPROTO;
        }
                        rxmsg->gnm_payload_len);
                return -EPROTO;
        }
+       /* Grab a read lock so the connection doesnt disappear on us
+        * while we look it up
+        */
+       read_lock(&kgnilnd_data.kgn_peer_conn_lock);
+
+       peer = kgnilnd_find_peer_locked(rxmsg->gnm_srcnid);
+       if (peer != NULL)
+               found_conn = kgnilnd_find_conn_locked(peer);
+
+
+       /* Verify the connection found is the same one that the message
+        * is supposed to be using, if it is not output an error message
+        * and return.
+        */
+       if (!peer || !found_conn
+           || found_conn->gnc_peer_connstamp != rxmsg->gnm_connstamp) {
+               read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+               CERROR("Couldnt find matching peer %p or conn %p / %p\n",
+                       peer, conn, found_conn);
+               if (found_conn) {
+                       CERROR("Unexpected connstamp "LPX64"("LPX64" expected)"
+                               " from %s", rxmsg->gnm_connstamp,
+                               found_conn->gnc_peer_connstamp,
+                               libcfs_nid2str(peer->gnp_nid));
+               }
+               return -ENOTCONN;
+       }
+
+       /* add conn ref to ensure it doesn't go away until all eager
+        * messages processed */
+       kgnilnd_conn_addref(conn);
+
+       /* Now that we have verified the connection is valid and added a
+        * reference we can remove the read_lock on the peer_conn_lock */
+       read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
 
        /* we have no credits or buffers for this message, so copy it
         * somewhere for a later kgnilnd_recv */
        LIBCFS_ALLOC(eagermsg, sizeof(*eagermsg) + *kgnilnd_tunables.kgn_max_immediate);
        if (eagermsg == NULL) {
 
        /* we have no credits or buffers for this message, so copy it
         * somewhere for a later kgnilnd_recv */
        LIBCFS_ALLOC(eagermsg, sizeof(*eagermsg) + *kgnilnd_tunables.kgn_max_immediate);
        if (eagermsg == NULL) {
+               kgnilnd_conn_decref(conn);
                CERROR("couldn't allocate eager rx message for conn %p to %s\n",
                        conn, libcfs_nid2str(conn->gnc_peer->gnp_nid));
                return -ENOMEM;
                CERROR("couldn't allocate eager rx message for conn %p to %s\n",
                        conn, libcfs_nid2str(conn->gnc_peer->gnp_nid));
                return -ENOMEM;
@@ -2124,9 +2358,6 @@ kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
        /* stash this for lnet_finalize on cancel-on-conn-close */
        rx->grx_lntmsg = lntmsg;
 
        /* stash this for lnet_finalize on cancel-on-conn-close */
        rx->grx_lntmsg = lntmsg;
 
-       /* add conn ref to ensure it doesn't go away until all eager messages processed */
-       kgnilnd_conn_addref(conn);
-
        /* keep the same rx_t, it just has a new grx_msg now */
        *new_private = private;
 
        /* keep the same rx_t, it just has a new grx_msg now */
        *new_private = private;
 
@@ -2175,6 +2406,9 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
 
        switch (rxmsg->gnm_type) {
        default:
 
        switch (rxmsg->gnm_type) {
        default:
+               GNIDBG_MSG(D_NETERROR, rxmsg, "conn %p, rx %p, rxmsg %p, lntmsg %p"
+               " niov=%d kiov=%p iov=%p offset=%d mlen=%d rlen=%d",
+               conn, rx, rxmsg, lntmsg, niov, kiov, iov, offset, mlen, rlen);
                LBUG();
 
        case GNILND_MSG_IMMEDIATE:
                LBUG();
 
        case GNILND_MSG_IMMEDIATE:
@@ -2250,7 +2484,7 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
 
                        /* only error if lntmsg == NULL, otherwise we are just
                         * short circuiting the rdma process of 0 bytes */
 
                        /* only error if lntmsg == NULL, otherwise we are just
                         * short circuiting the rdma process of 0 bytes */
-                       kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK,
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
                                        lntmsg == NULL ? -ENOENT : 0,
                                        rxmsg->gnm_u.get.gngm_cookie,
                                        ni->ni_nid);
                                        lntmsg == NULL ? -ENOENT : 0,
                                        rxmsg->gnm_u.get.gngm_cookie,
                                        ni->ni_nid);
@@ -2295,20 +2529,116 @@ kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
 
 nak_put_req:
                /* make sure we send an error back when the PUT fails */
 
 nak_put_req:
                /* make sure we send an error back when the PUT fails */
-               kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
+               kgnilnd_nak_rdma(conn, rxmsg->gnm_type, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
                kgnilnd_tx_done(tx, rc);
                kgnilnd_consume_rx(rx);
 
                /* return magic LNet network error */
                RETURN(-EIO);
                kgnilnd_tx_done(tx, rc);
                kgnilnd_consume_rx(rx);
 
                /* return magic LNet network error */
                RETURN(-EIO);
+       case GNILND_MSG_GET_REQ_REV:
+               /* LNET wants to truncate or drop transaction, sending NAK */
+               if (mlen == 0) {
+                       kgnilnd_consume_rx(rx);
+                       lnet_finalize(ni, lntmsg, 0);
+
+                       /* only error if lntmsg == NULL, otherwise we are just
+                        * short circuiting the rdma process of 0 bytes */
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
+                                       lntmsg == NULL ? -ENOENT : 0,
+                                       rxmsg->gnm_u.get.gngm_cookie,
+                                       ni->ni_nid);
+                       RETURN(0);
+               }
+               /* lntmsg can be null when parsing a LNET_GET */
+               if (lntmsg != NULL) {
+                       /* sending ACK with sink buff. info */
+                       tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_ACK_REV, ni->ni_nid);
+                       if (tx == NULL) {
+                               kgnilnd_consume_rx(rx);
+                               RETURN(-ENOMEM);
+                       }
+
+                       rc = kgnilnd_set_tx_id(tx, conn);
+                       if (rc != 0)
+                               GOTO(nak_get_req_rev, rc);
+
+
+                       rc = kgnilnd_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
+                       if (rc != 0)
+                               GOTO(nak_get_req_rev, rc);
+
+
+                       tx->tx_msg.gnm_u.putack.gnpam_src_cookie =
+                               rxmsg->gnm_u.putreq.gnprm_cookie;
+                       tx->tx_msg.gnm_u.putack.gnpam_dst_cookie = tx->tx_id.txe_cookie;
+                       tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_addr =
+                               (__u64)((unsigned long)tx->tx_buffer);
+                       tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob = mlen;
+
+                       tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
+
+                       /* we only queue from kgnilnd_recv - we might get called from other contexts
+                        * and we don't want to block the mutex in those cases */
+
+                       spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
+                       kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 1);
+                       spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
+                       kgnilnd_schedule_device(tx->tx_conn->gnc_device);
+               } else {
+                       /* No match */
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
+                                       -ENOENT,
+                                       rxmsg->gnm_u.get.gngm_cookie,
+                                       ni->ni_nid);
+               }
+
+               kgnilnd_consume_rx(rx);
+               RETURN(0);
+
+nak_get_req_rev:
+               /* make sure we send an error back when the GET fails */
+               kgnilnd_nak_rdma(conn, rxmsg->gnm_type, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
+               kgnilnd_tx_done(tx, rc);
+               kgnilnd_consume_rx(rx);
+
+               /* return magic LNet network error */
+               RETURN(-EIO);
+
+
+       case GNILND_MSG_PUT_REQ_REV:
+               /* LNET wants to truncate or drop transaction, sending NAK */
+               if (mlen == 0) {
+                       kgnilnd_consume_rx(rx);
+                       lnet_finalize(ni, lntmsg, 0);
+
+                       /* only error if lntmsg == NULL, otherwise we are just
+                        * short circuiting the rdma process of 0 bytes */
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
+                                       lntmsg == NULL ? -ENOENT : 0,
+                                       rxmsg->gnm_u.get.gngm_cookie,
+                                       ni->ni_nid);
+                       RETURN(0);
+               }
 
 
+               if (lntmsg != NULL) {
+                       /* Matched! */
+                       kgnilnd_setup_rdma(ni, rx, lntmsg, mlen);
+               } else {
+                       /* No match */
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
+                                       -ENOENT,
+                                       rxmsg->gnm_u.get.gngm_cookie,
+                                       ni->ni_nid);
+               }
+               kgnilnd_consume_rx(rx);
+               RETURN(0);
        case GNILND_MSG_GET_REQ:
                if (lntmsg != NULL) {
                        /* Matched! */
        case GNILND_MSG_GET_REQ:
                if (lntmsg != NULL) {
                        /* Matched! */
-                       kgnilnd_reply(ni, rx, lntmsg);
+                       kgnilnd_setup_rdma(ni, rx, lntmsg, mlen);
                } else {
                        /* No match */
                } else {
                        /* No match */
-                       kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK,
+                       kgnilnd_nak_rdma(conn, rxmsg->gnm_type,
                                        -ENOENT,
                                        rxmsg->gnm_u.get.gngm_cookie,
                                        ni->ni_nid);
                                        -ENOENT,
                                        rxmsg->gnm_u.get.gngm_cookie,
                                        ni->ni_nid);
@@ -2352,7 +2682,13 @@ kgnilnd_check_conn_timeouts_locked(kgn_conn_t *conn)
        newest_last_rx = GNILND_LASTRX(conn);
 
        if (time_after_eq(now, newest_last_rx + timeout)) {
        newest_last_rx = GNILND_LASTRX(conn);
 
        if (time_after_eq(now, newest_last_rx + timeout)) {
-               GNIDBG_CONN(D_CONSOLE|D_NETERROR, conn, "No gnilnd traffic received from %s for %lu "
+               uint32_t level = D_CONSOLE|D_NETERROR;
+
+               if (conn->gnc_peer->gnp_down == GNILND_RCA_NODE_DOWN) {
+                       level = D_NET;
+               }
+                       GNIDBG_CONN(level, conn,
+                       "No gnilnd traffic received from %s for %lu "
                        "seconds, terminating connection. Is node down? ",
                        libcfs_nid2str(conn->gnc_peer->gnp_nid),
                        cfs_duration_sec(now - newest_last_rx));
                        "seconds, terminating connection. Is node down? ",
                        libcfs_nid2str(conn->gnc_peer->gnp_nid),
                        cfs_duration_sec(now - newest_last_rx));
@@ -2465,7 +2801,8 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie,
        /* Don't reconnect if we are still trying to clear out old conns.
         * This prevents us sending traffic on the new mbox before ensuring we are done
         * with the old one */
        /* Don't reconnect if we are still trying to clear out old conns.
         * This prevents us sending traffic on the new mbox before ensuring we are done
         * with the old one */
-       reconnect = (atomic_read(&peer->gnp_dirty_eps) == 0);
+       reconnect = (peer->gnp_down == GNILND_RCA_NODE_UP) &&
+                   (atomic_read(&peer->gnp_dirty_eps) == 0);
 
        /* if we are not connected and there are tx on the gnp_tx_queue waiting
         * to be sent, we'll check the reconnect interval and fire up a new
 
        /* if we are not connected and there are tx on the gnp_tx_queue waiting
         * to be sent, we'll check the reconnect interval and fire up a new
@@ -2501,8 +2838,8 @@ kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie,
         */
        if (first_rx &&
                time_after(jiffies, first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout))) {
         */
        if (first_rx &&
                time_after(jiffies, first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout))) {
-               CDEBUG(D_NET,"We can release conn %p from purgatory %lu\n",
-                      conn, first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout));
+               CDEBUG(D_INFO, "We can release peer %s conn's from purgatory %lu\n",
+                       libcfs_nid2str(peer->gnp_nid), first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout));
                releaseconn = 1;
        }
 
                releaseconn = 1;
        }
 
@@ -2696,6 +3033,25 @@ kgnilnd_reaper(void *arg)
 }
 
 int
 }
 
 int
+kgnilnd_recv_bte_get(kgn_tx_t *tx) {
+       unsigned niov, offset, nob;
+       lnet_kiov_t     *kiov;
+       lnet_msg_t *lntmsg = tx->tx_lntmsg[0];
+       kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov, tx->tx_nob_rdma);
+
+       if (kiov != NULL) {
+               lnet_copy_flat2kiov(
+                       niov, kiov, offset,
+                       nob,
+                       tx->tx_buffer_copy, tx->tx_offset, nob);
+       } else {
+               memcpy(tx->tx_buffer, tx->tx_buffer_copy + tx->tx_offset, nob);
+       }
+       return 0;
+}
+
+
+int
 kgnilnd_check_rdma_cq(kgn_device_t *dev)
 {
        gni_return_t           rrc;
 kgnilnd_check_rdma_cq(kgn_device_t *dev)
 {
        gni_return_t           rrc;
@@ -2764,17 +3120,37 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                }
 
                GNITX_ASSERTF(tx, tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE ||
                }
 
                GNITX_ASSERTF(tx, tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE ||
-                       tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE,
+                       tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE ||
+                       tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV ||
+                       tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV,
                        "tx %p with type %d\n", tx, tx->tx_msg.gnm_type);
 
                GNIDBG_TX(D_NET, tx, "RDMA completion for %d bytes", tx->tx_nob);
 
                        "tx %p with type %d\n", tx, tx->tx_msg.gnm_type);
 
                GNIDBG_TX(D_NET, tx, "RDMA completion for %d bytes", tx->tx_nob);
 
+               if (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) {
+                       lnet_set_reply_msg_len(NULL, tx->tx_lntmsg[1],
+                                              tx->tx_msg.gnm_u.completion.gncm_retval);
+               }
+
+               rc = 0;
+               if (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV && desc->status == GNI_RC_SUCCESS) {
+                       if (tx->tx_buffer_copy != NULL)
+                               kgnilnd_recv_bte_get(tx);
+                       rc = kgnilnd_verify_rdma_cksum(tx, tx->tx_putinfo.gnpam_payload_cksum, tx->tx_nob_rdma);
+               }
+
+               if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE_REV && desc->status == GNI_RC_SUCCESS) {
+                       if (tx->tx_buffer_copy != NULL)
+                               kgnilnd_recv_bte_get(tx);
+                       rc = kgnilnd_verify_rdma_cksum(tx, tx->tx_getinfo.gngm_payload_cksum, tx->tx_nob_rdma);
+               }
+
                /* remove from rdmaq */
                spin_lock(&conn->gnc_list_lock);
                kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
                spin_unlock(&conn->gnc_list_lock);
 
                /* remove from rdmaq */
                spin_lock(&conn->gnc_list_lock);
                kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
                spin_unlock(&conn->gnc_list_lock);
 
-               if (likely(desc->status == GNI_RC_SUCCESS)) {
+               if (likely(desc->status == GNI_RC_SUCCESS) && rc == 0) {
                        atomic_inc(&dev->gnd_rdma_ntx);
                        atomic64_add(tx->tx_nob, &dev->gnd_rdma_txbytes);
                        /* transaction succeeded, add into fmaq */
                        atomic_inc(&dev->gnd_rdma_ntx);
                        atomic64_add(tx->tx_nob, &dev->gnd_rdma_txbytes);
                        /* transaction succeeded, add into fmaq */
@@ -2782,6 +3158,7 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                        kgnilnd_peer_alive(conn->gnc_peer);
 
                        /* drop ref from kgnilnd_validate_tx_ev_id */
                        kgnilnd_peer_alive(conn->gnc_peer);
 
                        /* drop ref from kgnilnd_validate_tx_ev_id */
+                       kgnilnd_admin_decref(conn->gnc_tx_in_use);
                        kgnilnd_conn_decref(conn);
                        continue;
                }
                        kgnilnd_conn_decref(conn);
                        continue;
                }
@@ -2804,14 +3181,15 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                GNIDBG_TX(D_NETERROR, tx, "RDMA %s error (%s)",
                        should_retry ? "transient" : "unrecoverable", err_str);
 
                GNIDBG_TX(D_NETERROR, tx, "RDMA %s error (%s)",
                        should_retry ? "transient" : "unrecoverable", err_str);
 
-               if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE) {
+               if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE ||
+                   tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE_REV) {
                        if (should_retry) {
                        if (should_retry) {
-                               kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE,
+                               kgnilnd_rdma(tx, tx->tx_msg.gnm_type,
                                             &tx->tx_putinfo.gnpam_desc,
                                             tx->tx_putinfo.gnpam_desc.gnrd_nob,
                                             tx->tx_putinfo.gnpam_dst_cookie);
                        } else {
                                             &tx->tx_putinfo.gnpam_desc,
                                             tx->tx_putinfo.gnpam_desc.gnrd_nob,
                                             tx->tx_putinfo.gnpam_dst_cookie);
                        } else {
-                               kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK,
+                               kgnilnd_nak_rdma(conn, tx->tx_msg.gnm_type,
                                                -EFAULT,
                                                tx->tx_putinfo.gnpam_dst_cookie,
                                                tx->tx_msg.gnm_srcnid);
                                                -EFAULT,
                                                tx->tx_putinfo.gnpam_dst_cookie,
                                                tx->tx_msg.gnm_srcnid);
@@ -2819,12 +3197,12 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                        }
                } else {
                        if (should_retry) {
                        }
                } else {
                        if (should_retry) {
-                               kgnilnd_rdma(tx, GNILND_MSG_GET_DONE,
+                               kgnilnd_rdma(tx, tx->tx_msg.gnm_type,
                                             &tx->tx_getinfo.gngm_desc,
                                             tx->tx_lntmsg[0]->msg_len,
                                             tx->tx_getinfo.gngm_cookie);
                        } else {
                                             &tx->tx_getinfo.gngm_desc,
                                             tx->tx_lntmsg[0]->msg_len,
                                             tx->tx_getinfo.gngm_cookie);
                        } else {
-                               kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK,
+                               kgnilnd_nak_rdma(conn, tx->tx_msg.gnm_type,
                                                -EFAULT,
                                                tx->tx_getinfo.gngm_cookie,
                                                tx->tx_msg.gnm_srcnid);
                                                -EFAULT,
                                                tx->tx_getinfo.gngm_cookie,
                                                tx->tx_msg.gnm_srcnid);
@@ -2833,6 +3211,7 @@ kgnilnd_check_rdma_cq(kgn_device_t *dev)
                }
 
                /* drop ref from kgnilnd_validate_tx_ev_id */
                }
 
                /* drop ref from kgnilnd_validate_tx_ev_id */
+               kgnilnd_admin_decref(conn->gnc_tx_in_use);
                kgnilnd_conn_decref(conn);
        }
 }
                kgnilnd_conn_decref(conn);
        }
 }
@@ -2985,6 +3364,7 @@ kgnilnd_check_fma_send_cq(kgn_device_t *dev)
                }
 
                /* drop ref from kgnilnd_validate_tx_ev_id */
                }
 
                /* drop ref from kgnilnd_validate_tx_ev_id */
+               kgnilnd_admin_decref(conn->gnc_tx_in_use);
                kgnilnd_conn_decref(conn);
 
                /* if we are waiting for a REPLY, we'll handle the tx then */
                kgnilnd_conn_decref(conn);
 
                /* if we are waiting for a REPLY, we'll handle the tx then */
@@ -3187,6 +3567,43 @@ kgnilnd_send_mapped_tx(kgn_tx_t *tx, int try_map_if_full)
                             tx->tx_getinfo.gngm_cookie);
 
                break;
                             tx->tx_getinfo.gngm_cookie);
 
                break;
+       case GNILND_MSG_PUT_REQ_REV:
+               tx->tx_msg.gnm_u.get.gngm_desc.gnrd_key = tx->tx_map_key;
+               tx->tx_msg.gnm_u.get.gngm_cookie = tx->tx_id.txe_cookie;
+               tx->tx_msg.gnm_u.get.gngm_desc.gnrd_addr = (__u64)((unsigned long)tx->tx_buffer);
+               tx->tx_msg.gnm_u.get.gngm_desc.gnrd_nob = tx->tx_nob;
+               tx->tx_state = GNILND_TX_WAITING_COMPLETION | GNILND_TX_WAITING_REPLY;
+               kgnilnd_compute_rdma_cksum(tx, tx->tx_nob);
+               tx->tx_msg.gnm_u.get.gngm_payload_cksum = tx->tx_msg.gnm_payload_cksum;
+
+               rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ);
+               break;
+       case GNILND_MSG_PUT_DONE_REV:
+               kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE_REV,
+                            &tx->tx_getinfo.gngm_desc,
+                            tx->tx_lntmsg[0]->msg_len,
+                            tx->tx_getinfo.gngm_cookie);
+               break;
+       case GNILND_MSG_GET_ACK_REV:
+               tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_key = tx->tx_map_key;
+               tx->tx_state = GNILND_TX_WAITING_COMPLETION | GNILND_TX_WAITING_REPLY;
+               /* LNET_GETS are a special case for parse */
+               kgnilnd_compute_rdma_cksum(tx, tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob);
+               tx->tx_msg.gnm_u.putack.gnpam_payload_cksum = tx->tx_msg.gnm_payload_cksum;
+
+               if (CFS_FAIL_CHECK(CFS_FAIL_GNI_PUT_ACK_AGAIN))
+                       tx->tx_state |= GNILND_TX_FAIL_SMSG;
+
+               /* redirect to FMAQ on failure, no need to infinite loop here in MAPQ */
+               rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ);
+               break;
+       case GNILND_MSG_GET_REQ_REV:
+               kgnilnd_rdma(tx, GNILND_MSG_GET_DONE_REV,
+                               &tx->tx_putinfo.gnpam_desc,
+                               tx->tx_putinfo.gnpam_desc.gnrd_nob,
+                               tx->tx_putinfo.gnpam_dst_cookie);
+
+               break;
        }
 
        RETURN(rc);
        }
 
        RETURN(rc);
@@ -3289,15 +3706,22 @@ kgnilnd_process_fmaq(kgn_conn_t *conn)
 
        case GNILND_MSG_GET_DONE:
        case GNILND_MSG_PUT_DONE:
 
        case GNILND_MSG_GET_DONE:
        case GNILND_MSG_PUT_DONE:
+       case GNILND_MSG_PUT_DONE_REV:
+       case GNILND_MSG_GET_DONE_REV:
        case GNILND_MSG_PUT_NAK:
        case GNILND_MSG_GET_NAK:
        case GNILND_MSG_PUT_NAK:
        case GNILND_MSG_GET_NAK:
+       case GNILND_MSG_GET_NAK_REV:
+       case GNILND_MSG_PUT_NAK_REV:
                tx->tx_state = GNILND_TX_WAITING_COMPLETION;
                break;
 
        case GNILND_MSG_PUT_REQ:
                tx->tx_state = GNILND_TX_WAITING_COMPLETION;
                break;
 
        case GNILND_MSG_PUT_REQ:
+       case GNILND_MSG_GET_REQ_REV:
                tx->tx_msg.gnm_u.putreq.gnprm_cookie = tx->tx_id.txe_cookie;
 
        case GNILND_MSG_PUT_ACK:
                tx->tx_msg.gnm_u.putreq.gnprm_cookie = tx->tx_id.txe_cookie;
 
        case GNILND_MSG_PUT_ACK:
+       case GNILND_MSG_PUT_REQ_REV:
+       case GNILND_MSG_GET_ACK_REV:
        case GNILND_MSG_GET_REQ:
                /* This is really only to handle the retransmit of SMSG once these
                 * two messages are setup in send_mapped_tx */
        case GNILND_MSG_GET_REQ:
                /* This is really only to handle the retransmit of SMSG once these
                 * two messages are setup in send_mapped_tx */
@@ -3518,7 +3942,9 @@ kgnilnd_finalize_rx_done(kgn_tx_t *tx, kgn_msg_t *msg)
        atomic_inc(&conn->gnc_device->gnd_rdma_nrx);
        atomic64_add(tx->tx_nob, &conn->gnc_device->gnd_rdma_rxbytes);
 
        atomic_inc(&conn->gnc_device->gnd_rdma_nrx);
        atomic64_add(tx->tx_nob, &conn->gnc_device->gnd_rdma_rxbytes);
 
-       rc = kgnilnd_verify_rdma_cksum(tx, msg->gnm_payload_cksum);
+       /* the gncm_retval is passed in for PUTs */
+       rc = kgnilnd_verify_rdma_cksum(tx, msg->gnm_payload_cksum,
+                                      msg->gnm_u.completion.gncm_retval);
 
        kgnilnd_complete_tx(tx, rc);
 }
 
        kgnilnd_complete_tx(tx, rc);
 }
@@ -3539,7 +3965,6 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
        int           repost = 1, saw_complete;
        unsigned long timestamp, newest_last_rx, timeout;
        int           last_seq;
        int           repost = 1, saw_complete;
        unsigned long timestamp, newest_last_rx, timeout;
        int           last_seq;
-       void         *memory = NULL;
        ENTRY;
 
        /* Short circuit if the ep_handle is null.
        ENTRY;
 
        /* Short circuit if the ep_handle is null.
@@ -3592,18 +4017,27 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                RETURN_EXIT;
        }
 
                RETURN_EXIT;
        }
 
+       /* Instead of asserting when we get mailbox corruption lets attempt to
+        * close the conn and recover. We can put the conn/mailbox into
+        * purgatory and let purgatory deal with the problem. If we see
+        * this NETTERROR reported on production systems in large amounts
+        * we will need to revisit the state machine to see if we can tighten
+        * it up further to improve data protection.
+        */
+
        if (rrc == GNI_RC_INVALID_STATE) {
        if (rrc == GNI_RC_INVALID_STATE) {
-               LIBCFS_ALLOC(memory, conn->gnpr_smsg_attr.buff_size);
-               if (memory == NULL) {
-                       memory = (void *)0xdeadbeef;
-               } else {
-                       memcpy(memory, conn->gnpr_smsg_attr.msg_buffer + conn->gnpr_smsg_attr.mbox_offset, conn->gnpr_smsg_attr.buff_size);
-               }
+               mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
+               GNIDBG_CONN(D_NETERROR | D_CONSOLE, conn, "Mailbox corruption "
+                       "detected closing conn %p from peer %s\n", conn,
+                       libcfs_nid2str(conn->gnc_peer->gnp_nid));
+               rc = -EIO;
+               kgnilnd_close_conn(conn, rc);
+               RETURN_EXIT;
        }
 
        LASSERTF(rrc == GNI_RC_SUCCESS,
        }
 
        LASSERTF(rrc == GNI_RC_SUCCESS,
-               "bad rc %d on conn %p from peer %s mailbox copy %p\n",
-                rrc, conn, libcfs_nid2str(peer->gnp_nid), memory);
+               "bad rc %d on conn %p from peer %s\n",
+               rrc, conn, libcfs_nid2str(peer->gnp_nid));
 
        msg = (kgn_msg_t *)prefix;
 
 
        msg = (kgn_msg_t *)prefix;
 
@@ -3688,10 +4122,12 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
 
                /* NB message type checked below; NOT here... */
                switch (msg->gnm_type) {
 
                /* NB message type checked below; NOT here... */
                switch (msg->gnm_type) {
+               case GNILND_MSG_GET_ACK_REV:
                case GNILND_MSG_PUT_ACK:
                        kgnilnd_swab_rdma_desc(&msg->gnm_u.putack.gnpam_desc);
                        break;
 
                case GNILND_MSG_PUT_ACK:
                        kgnilnd_swab_rdma_desc(&msg->gnm_u.putack.gnpam_desc);
                        break;
 
+               case GNILND_MSG_PUT_REQ_REV:
                case GNILND_MSG_GET_REQ:
                        kgnilnd_swab_rdma_desc(&msg->gnm_u.get.gngm_desc);
                        break;
                case GNILND_MSG_GET_REQ:
                        kgnilnd_swab_rdma_desc(&msg->gnm_u.get.gngm_desc);
                        break;
@@ -3795,13 +4231,20 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                                msg->gnm_srcnid, rx, 0);
                repost = rc < 0;
                break;
                                msg->gnm_srcnid, rx, 0);
                repost = rc < 0;
                break;
-
+       case GNILND_MSG_GET_REQ_REV:
        case GNILND_MSG_PUT_REQ:
                rc = lnet_parse(net->gnn_ni, &msg->gnm_u.putreq.gnprm_hdr,
                                msg->gnm_srcnid, rx, 1);
                repost = rc < 0;
                break;
        case GNILND_MSG_PUT_REQ:
                rc = lnet_parse(net->gnn_ni, &msg->gnm_u.putreq.gnprm_hdr,
                                msg->gnm_srcnid, rx, 1);
                repost = rc < 0;
                break;
+       case GNILND_MSG_GET_NAK_REV:
+               tx = kgnilnd_match_reply_either(conn, GNILND_MSG_GET_REQ_REV, GNILND_MSG_GET_ACK_REV,
+                                       msg->gnm_u.completion.gncm_cookie);
+               if (tx == NULL)
+                       break;
 
 
+               kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
+               break;
        case GNILND_MSG_PUT_NAK:
                tx = kgnilnd_match_reply_either(conn, GNILND_MSG_PUT_REQ, GNILND_MSG_PUT_ACK,
                                        msg->gnm_u.completion.gncm_cookie);
        case GNILND_MSG_PUT_NAK:
                tx = kgnilnd_match_reply_either(conn, GNILND_MSG_PUT_REQ, GNILND_MSG_PUT_ACK,
                                        msg->gnm_u.completion.gncm_cookie);
@@ -3810,7 +4253,6 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
 
                kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
                break;
 
                kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
                break;
-
        case GNILND_MSG_PUT_ACK:
                tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ,
                                        msg->gnm_u.putack.gnpam_src_cookie);
        case GNILND_MSG_PUT_ACK:
                tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ,
                                        msg->gnm_u.putack.gnpam_src_cookie);
@@ -3848,7 +4290,42 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
                                kgnilnd_tx_done(tx, rc);
                }
                break;
                                kgnilnd_tx_done(tx, rc);
                }
                break;
+       case GNILND_MSG_GET_ACK_REV:
+               tx = kgnilnd_match_reply(conn, GNILND_MSG_GET_REQ_REV,
+                                       msg->gnm_u.putack.gnpam_src_cookie);
+               if (tx == NULL)
+                       break;
+
+               /* store putack data for later: deferred rdma or re-try */
+               tx->tx_putinfo = msg->gnm_u.putack;
+               saw_complete = 0;
+               spin_lock(&tx->tx_conn->gnc_list_lock);
+
+               GNITX_ASSERTF(tx, tx->tx_state & GNILND_TX_WAITING_REPLY,
+                       "not waiting for reply", NULL);
+
+               tx->tx_state &= ~GNILND_TX_WAITING_REPLY;
+
+               if (likely(!(tx->tx_state & GNILND_TX_WAITING_COMPLETION))) {
+                       kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
+                       /* sample under lock as follow on steps require gnc_list_lock
+                        * - or call kgnilnd_tx_done which requires no locks held over
+                        *   call to lnet_finalize */
+                       saw_complete = 1;
+               } else {
+                       /* cannot launch rdma if still waiting for fma-msg completion */
+                       CDEBUG(D_NET, "tx 0x%p type 0x%02x will need to "
+                                       "wait for SMSG completion\n", tx, tx->tx_msg.gnm_type);
+                       tx->tx_state |= GNILND_TX_PENDING_RDMA;
+               }
+               spin_unlock(&tx->tx_conn->gnc_list_lock);
 
 
+               if (saw_complete) {
+                       rc = kgnilnd_send_mapped_tx(tx, 0);
+                       if (rc < 0)
+                               kgnilnd_tx_done(tx, rc);
+               }
+               break;
        case GNILND_MSG_PUT_DONE:
                tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_ACK,
                                        msg->gnm_u.completion.gncm_cookie);
        case GNILND_MSG_PUT_DONE:
                tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_ACK,
                                        msg->gnm_u.completion.gncm_cookie);
@@ -3861,7 +4338,7 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
 
                kgnilnd_finalize_rx_done(tx, msg);
                break;
 
                kgnilnd_finalize_rx_done(tx, msg);
                break;
-
+       case GNILND_MSG_PUT_REQ_REV:
        case GNILND_MSG_GET_REQ:
                rc = lnet_parse(net->gnn_ni, &msg->gnm_u.get.gngm_hdr,
                                msg->gnm_srcnid, rx, 1);
        case GNILND_MSG_GET_REQ:
                rc = lnet_parse(net->gnn_ni, &msg->gnm_u.get.gngm_hdr,
                                msg->gnm_srcnid, rx, 1);
@@ -3896,6 +4373,45 @@ kgnilnd_check_fma_rx(kgn_conn_t *conn)
 
                kgnilnd_finalize_rx_done(tx, msg);
                break;
 
                kgnilnd_finalize_rx_done(tx, msg);
                break;
+       case GNILND_MSG_GET_DONE_REV:
+               tx = kgnilnd_match_reply(conn, GNILND_MSG_GET_ACK_REV,
+                                       msg->gnm_u.completion.gncm_cookie);
+               if (tx == NULL)
+                       break;
+
+               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED ||
+                               tx->tx_buftype == GNILND_BUF_VIRT_MAPPED,
+                               "bad tx buftype %d", tx->tx_buftype);
+
+               kgnilnd_finalize_rx_done(tx, msg);
+               break;
+
+       case GNILND_MSG_PUT_DONE_REV:
+               tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ_REV,
+                                       msg->gnm_u.completion.gncm_cookie);
+
+               if (tx == NULL)
+                       break;
+
+               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED ||
+                              tx->tx_buftype == GNILND_BUF_VIRT_MAPPED,
+                              "bad tx buftype %d", tx->tx_buftype);
+
+               kgnilnd_finalize_rx_done(tx, msg);
+               break;
+       case GNILND_MSG_PUT_NAK_REV:
+               tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ_REV,
+                                       msg->gnm_u.completion.gncm_cookie);
+
+               if (tx == NULL)
+                       break;
+
+               GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED ||
+                              tx->tx_buftype == GNILND_BUF_VIRT_MAPPED,
+                               "bad tx buftype %d", tx->tx_buftype);
+
+               kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
+               break;
        }
 
  out:
        }
 
  out:
@@ -4031,7 +4547,10 @@ kgnilnd_send_conn_close(kgn_conn_t *conn)
                }
        }
 
                }
        }
 
+       /* When changing gnc_state we need to take the kgn_peer_conn_lock */
+       write_lock(&kgnilnd_data.kgn_peer_conn_lock);
        conn->gnc_state = GNILND_CONN_CLOSED;
        conn->gnc_state = GNILND_CONN_CLOSED;
+       write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
        /* mark this conn as CLOSED now that we processed it
         * do after TX, so we can use CLOSING in asserts */
 
        /* mark this conn as CLOSED now that we processed it
         * do after TX, so we can use CLOSING in asserts */
 
@@ -4053,13 +4572,15 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev)
        int             found_work = 0;
        int             rc = 0;
        kgn_tx_t        *tx;
        int             found_work = 0;
        int             rc = 0;
        kgn_tx_t        *tx;
-       int             max_retrans = *kgnilnd_tunables.kgn_max_retransmits;
+       int              fast_remaps = GNILND_FAST_MAPPING_TRY;
        int             log_retrans, log_retrans_level;
        static int      last_map_version;
        ENTRY;
 
        spin_lock(&dev->gnd_lock);
        if (list_empty(&dev->gnd_map_tx)) {
        int             log_retrans, log_retrans_level;
        static int      last_map_version;
        ENTRY;
 
        spin_lock(&dev->gnd_lock);
        if (list_empty(&dev->gnd_map_tx)) {
+               /* if the list is empty make sure we dont have a timer running */
+               del_singleshot_timer_sync(&dev->gnd_map_timer);
                spin_unlock(&dev->gnd_lock);
                RETURN(0);
        }
                spin_unlock(&dev->gnd_lock);
                RETURN(0);
        }
@@ -4070,13 +4591,23 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev)
         * backing off until our map version changes - indicating we unmapped
         * something */
        tx = list_first_entry(&dev->gnd_map_tx, kgn_tx_t, tx_list);
         * backing off until our map version changes - indicating we unmapped
         * something */
        tx = list_first_entry(&dev->gnd_map_tx, kgn_tx_t, tx_list);
-       if ((tx->tx_retrans > (max_retrans / 4)) &&
-           (last_map_version == dev->gnd_map_version)) {
+       if (likely(dev->gnd_map_attempt == 0) ||
+               time_after_eq(jiffies, dev->gnd_next_map) ||
+               last_map_version != dev->gnd_map_version) {
+
+               /* if this is our first attempt at mapping set last mapped to current
+                * jiffies so we can timeout our attempt correctly.
+                */
+               if (dev->gnd_map_attempt == 0)
+                       dev->gnd_last_map = jiffies;
+       } else {
                GNIDBG_TX(D_NET, tx, "waiting for mapping event event to retry", NULL);
                spin_unlock(&dev->gnd_lock);
                RETURN(0);
        }
 
                GNIDBG_TX(D_NET, tx, "waiting for mapping event event to retry", NULL);
                spin_unlock(&dev->gnd_lock);
                RETURN(0);
        }
 
+       /* delete the previous timer if it exists */
+       del_singleshot_timer_sync(&dev->gnd_map_timer);
        /* stash the last map version to let us know when a good one was seen */
        last_map_version = dev->gnd_map_version;
 
        /* stash the last map version to let us know when a good one was seen */
        last_map_version = dev->gnd_map_version;
 
@@ -4116,28 +4647,59 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev)
                         * this function is called again - we operate on a copy of the original
                         * list and not the live list */
                        spin_lock(&dev->gnd_lock);
                         * this function is called again - we operate on a copy of the original
                         * list and not the live list */
                        spin_lock(&dev->gnd_lock);
+                       /* reset map attempts back to zero we successfully
+                        * mapped so we can reset our timers */
+                       dev->gnd_map_attempt = 0;
                        continue;
                } else if (rc != -ENOMEM) {
                        /* carp, failure we can't handle */
                        kgnilnd_tx_done(tx, rc);
                        spin_lock(&dev->gnd_lock);
                        continue;
                } else if (rc != -ENOMEM) {
                        /* carp, failure we can't handle */
                        kgnilnd_tx_done(tx, rc);
                        spin_lock(&dev->gnd_lock);
+                       /* reset map attempts back to zero we dont know what happened but it
+                        * wasnt a failed mapping
+                        */
+                       dev->gnd_map_attempt = 0;
                        continue;
                }
 
                        continue;
                }
 
-               /* time to handle the retry cases.. */
-               tx->tx_retrans++;
-               if (tx->tx_retrans == 1)
-                       tx->tx_qtime = jiffies;
+               /* time to handle the retry cases..  lock so we dont have 2 threads
+                * mucking with gnd_map_attempt, or gnd_next_map at the same time.
+                */
+               spin_lock(&dev->gnd_lock);
+               dev->gnd_map_attempt++;
+               if (dev->gnd_map_attempt < fast_remaps) {
+                       /* do nothing we just want it to go as fast as possible.
+                        * just set gnd_next_map to current jiffies so it will process
+                        * as fast as possible.
+                        */
+                       dev->gnd_next_map = jiffies;
+               } else {
+                       /* Retry based on GNILND_MAP_RETRY_RATE */
+                       dev->gnd_next_map = jiffies + GNILND_MAP_RETRY_RATE;
+               }
 
 
-               /* only log occasionally once we've retried max / 2 */
-               log_retrans = (tx->tx_retrans >= (max_retrans / 2)) &&
-                             ((tx->tx_retrans % 32) == 0);
+               /* only log occasionally once we've retried fast_remaps */
+               log_retrans = (dev->gnd_map_attempt >= fast_remaps) &&
+                             ((dev->gnd_map_attempt % fast_remaps) == 0);
                log_retrans_level = log_retrans ? D_NETERROR : D_NET;
 
                /* make sure we are not off in the weeds with this tx */
                log_retrans_level = log_retrans ? D_NETERROR : D_NET;
 
                /* make sure we are not off in the weeds with this tx */
-               if (tx->tx_retrans > *kgnilnd_tunables.kgn_max_retransmits) {
+               if (time_after(jiffies, dev->gnd_last_map + GNILND_MAP_TIMEOUT)) {
                       GNIDBG_TX(D_NETERROR, tx,
                               "giving up on TX, too many retries", NULL);
                       GNIDBG_TX(D_NETERROR, tx,
                               "giving up on TX, too many retries", NULL);
+                      spin_unlock(&dev->gnd_lock);
+                      if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_REQ ||
+                          tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ_REV) {
+                              kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type,
+                                               -ENOMEM,
+                                               tx->tx_putinfo.gnpam_dst_cookie,
+                                               tx->tx_msg.gnm_srcnid);
+                       } else {
+                               kgnilnd_nak_rdma(tx->tx_conn, tx->tx_msg.gnm_type,
+                                               -ENOMEM,
+                                               tx->tx_getinfo.gngm_cookie,
+                                               tx->tx_msg.gnm_srcnid);
+                       }
                       kgnilnd_tx_done(tx, -ENOMEM);
                       GOTO(get_out_mapped, rc);
                } else {
                       kgnilnd_tx_done(tx, -ENOMEM);
                       GOTO(get_out_mapped, rc);
                } else {
@@ -4145,7 +4707,7 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev)
                                "transient map failure #%d %d pages/%d bytes phys %u@%u "
                                "virt %u@"LPU64" "
                                "nq_map %d mdd# %d/%d GART %ld",
                                "transient map failure #%d %d pages/%d bytes phys %u@%u "
                                "virt %u@"LPU64" "
                                "nq_map %d mdd# %d/%d GART %ld",
-                               tx->tx_retrans, tx->tx_phys_npages, tx->tx_nob,
+                               dev->gnd_map_attempt, tx->tx_phys_npages, tx->tx_nob,
                                dev->gnd_map_nphys, dev->gnd_map_physnop * PAGE_SIZE,
                                dev->gnd_map_nvirt, dev->gnd_map_virtnob,
                                atomic_read(&dev->gnd_nq_map),
                                dev->gnd_map_nphys, dev->gnd_map_physnop * PAGE_SIZE,
                                dev->gnd_map_nvirt, dev->gnd_map_virtnob,
                                atomic_read(&dev->gnd_nq_map),
@@ -4154,7 +4716,8 @@ kgnilnd_process_mapped_tx(kgn_device_t *dev)
                }
 
                /* we need to stop processing the rest of the list, so add it back in */
                }
 
                /* we need to stop processing the rest of the list, so add it back in */
-               spin_lock(&dev->gnd_lock);
+               /* set timer to wake device when we need to schedule this tx */
+               mod_timer(&dev->gnd_map_timer, dev->gnd_next_map);
                kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 0);
                spin_unlock(&dev->gnd_lock);
                GOTO(get_out_mapped, rc);
                kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 0);
                spin_unlock(&dev->gnd_lock);
                GOTO(get_out_mapped, rc);
@@ -4165,16 +4728,20 @@ get_out_mapped:
 }
 
 int
 }
 
 int
-kgnilnd_process_conns(kgn_device_t *dev)
+kgnilnd_process_conns(kgn_device_t *dev, unsigned long deadline)
 {
        int              found_work = 0;
        int              conn_sched;
        int              intent = 0;
 {
        int              found_work = 0;
        int              conn_sched;
        int              intent = 0;
+       int              error_inject = 0;
+       int              rc = 0;
        kgn_conn_t      *conn;
 
        spin_lock(&dev->gnd_lock);
        kgn_conn_t      *conn;
 
        spin_lock(&dev->gnd_lock);
-       while (!list_empty(&dev->gnd_ready_conns)) {
+       while (!list_empty(&dev->gnd_ready_conns) && time_before(jiffies, deadline)) {
                dev->gnd_sched_alive = jiffies;
                dev->gnd_sched_alive = jiffies;
+               error_inject = 0;
+               rc = 0;
 
                if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
                        /* break with lock held */
 
                if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
                        /* break with lock held */
@@ -4201,10 +4768,16 @@ kgnilnd_process_conns(kgn_device_t *dev)
                if (kgnilnd_check_conn_fail_loc(dev, conn, &intent)) {
 
                        /* based on intent see if we should run again. */
                if (kgnilnd_check_conn_fail_loc(dev, conn, &intent)) {
 
                        /* based on intent see if we should run again. */
-                       kgnilnd_schedule_process_conn(conn, intent);
-
+                       rc = kgnilnd_schedule_process_conn(conn, intent);
+                       error_inject = 1;
                        /* drop ref from gnd_ready_conns */
                        /* drop ref from gnd_ready_conns */
+                       if (atomic_read(&conn->gnc_refcount) == 1 && rc != 1) {
+                               down_write(&dev->gnd_conn_sem);
+                               kgnilnd_conn_decref(conn);
+                               up_write(&dev->gnd_conn_sem);
+                       } else if (rc != 1) {
                        kgnilnd_conn_decref(conn);
                        kgnilnd_conn_decref(conn);
+                       }
                        /* clear this so that scheduler thread doesn't spin */
                        found_work = 0;
                        /* break with lock held... */
                        /* clear this so that scheduler thread doesn't spin */
                        found_work = 0;
                        /* break with lock held... */
@@ -4213,30 +4786,60 @@ kgnilnd_process_conns(kgn_device_t *dev)
                }
 
                if (unlikely(conn->gnc_state == GNILND_CONN_CLOSED)) {
                }
 
                if (unlikely(conn->gnc_state == GNILND_CONN_CLOSED)) {
+                       down_write(&dev->gnd_conn_sem);
+
                        /* CONN_CLOSED set in procces_fmaq when CLOSE is sent */
                        /* CONN_CLOSED set in procces_fmaq when CLOSE is sent */
+                       if (unlikely(atomic_read(&conn->gnc_tx_in_use))) {
+                               /* If there are tx's currently in use in another
+                                * thread we dont want to complete the close
+                                * yet. Cycle this conn back through
+                                * the scheduler. */
+                               kgnilnd_schedule_conn(conn);
+                       } else
                        kgnilnd_complete_closed_conn(conn);
                        kgnilnd_complete_closed_conn(conn);
+
+                       up_write(&dev->gnd_conn_sem);
                } else if (unlikely(conn->gnc_state == GNILND_CONN_DESTROY_EP)) {
                        /* DESTROY_EP set in kgnilnd_conn_decref on gnc_refcount = 1 */
                        /* serialize SMSG CQs with ep_bind and smsg_release */
                } else if (unlikely(conn->gnc_state == GNILND_CONN_DESTROY_EP)) {
                        /* DESTROY_EP set in kgnilnd_conn_decref on gnc_refcount = 1 */
                        /* serialize SMSG CQs with ep_bind and smsg_release */
+                       down_write(&dev->gnd_conn_sem);
                        kgnilnd_destroy_conn_ep(conn);
                        kgnilnd_destroy_conn_ep(conn);
+                       up_write(&dev->gnd_conn_sem);
                } else if (unlikely(conn->gnc_state == GNILND_CONN_CLOSING)) {
                       /* if we need to do some CLOSE sending, etc done here do it */
                } else if (unlikely(conn->gnc_state == GNILND_CONN_CLOSING)) {
                       /* if we need to do some CLOSE sending, etc done here do it */
+                       down_write(&dev->gnd_conn_sem);
                        kgnilnd_send_conn_close(conn);
                        kgnilnd_check_fma_rx(conn);
                        kgnilnd_send_conn_close(conn);
                        kgnilnd_check_fma_rx(conn);
+                       up_write(&dev->gnd_conn_sem);
                } else if (atomic_read(&conn->gnc_peer->gnp_dirty_eps) == 0) {
                        /* start moving traffic if the old conns are cleared out */
                } else if (atomic_read(&conn->gnc_peer->gnp_dirty_eps) == 0) {
                        /* start moving traffic if the old conns are cleared out */
+                       down_read(&dev->gnd_conn_sem);
                        kgnilnd_check_fma_rx(conn);
                        kgnilnd_process_fmaq(conn);
                        kgnilnd_check_fma_rx(conn);
                        kgnilnd_process_fmaq(conn);
+                       up_read(&dev->gnd_conn_sem);
                }
 
                }
 
-               kgnilnd_schedule_process_conn(conn, 0);
+               rc = kgnilnd_schedule_process_conn(conn, 0);
 
                /* drop ref from gnd_ready_conns */
 
                /* drop ref from gnd_ready_conns */
+               if (atomic_read(&conn->gnc_refcount) == 1 && rc != 1) {
+                       down_write(&dev->gnd_conn_sem);
+                       kgnilnd_conn_decref(conn);
+                       up_write(&dev->gnd_conn_sem);
+               } else if (rc != 1) {
                kgnilnd_conn_decref(conn);
                kgnilnd_conn_decref(conn);
+               }
 
                /* check list again with lock held */
                spin_lock(&dev->gnd_lock);
        }
 
                /* check list again with lock held */
                spin_lock(&dev->gnd_lock);
        }
+
+       /* If we are short circuiting due to timing we want to be scheduled
+        * as soon as possible.
+        */
+       if (!list_empty(&dev->gnd_ready_conns) && !error_inject)
+               found_work++;
+
        spin_unlock(&dev->gnd_lock);
 
        RETURN(found_work);
        spin_unlock(&dev->gnd_lock);
 
        RETURN(found_work);
@@ -4246,9 +4849,10 @@ int
 kgnilnd_scheduler(void *arg)
 {
        int               threadno = (long)arg;
 kgnilnd_scheduler(void *arg)
 {
        int               threadno = (long)arg;
-       kgn_device_t     *dev;
-       char              name[16];
-       int               busy_loops = 0;
+       kgn_device_t            *dev;
+       char                    name[16];
+       int                     busy_loops = 0;
+       unsigned long     deadline = 0;
        DEFINE_WAIT(wait);
 
        dev = &kgnilnd_data.kgn_devices[(threadno + 1) % kgnilnd_data.kgn_ndevs];
        DEFINE_WAIT(wait);
 
        dev = &kgnilnd_data.kgn_devices[(threadno + 1) % kgnilnd_data.kgn_ndevs];
@@ -4258,8 +4862,8 @@ kgnilnd_scheduler(void *arg)
        cfs_block_allsigs();
 
        /* all gnilnd threads need to run fairly urgently */
        cfs_block_allsigs();
 
        /* all gnilnd threads need to run fairly urgently */
-       set_user_nice(current, *kgnilnd_tunables.kgn_nice);
-
+       set_user_nice(current, *kgnilnd_tunables.kgn_sched_nice);
+       deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_sched_timeout);
        while (!kgnilnd_data.kgn_shutdown) {
                int     found_work = 0;
                /* Safe: kgn_shutdown only set when quiescent */
        while (!kgnilnd_data.kgn_shutdown) {
                int     found_work = 0;
                /* Safe: kgn_shutdown only set when quiescent */
@@ -4273,12 +4877,15 @@ kgnilnd_scheduler(void *arg)
                /* tracking for when thread goes AWOL */
                dev->gnd_sched_alive = jiffies;
 
                /* tracking for when thread goes AWOL */
                dev->gnd_sched_alive = jiffies;
 
+               CFS_FAIL_TIMEOUT(CFS_FAIL_GNI_SCHED_DEADLINE,
+                       (*kgnilnd_tunables.kgn_sched_timeout + 1));
                /* let folks know we are up and kicking
                 * - they can use this for latency savings, etc
                 * - only change if IRQ, if IDLE leave alone as that
                 *   schedule_device calls to put us back to IRQ */
                (void)cmpxchg(&dev->gnd_ready, GNILND_DEV_IRQ, GNILND_DEV_LOOP);
 
                /* let folks know we are up and kicking
                 * - they can use this for latency savings, etc
                 * - only change if IRQ, if IDLE leave alone as that
                 *   schedule_device calls to put us back to IRQ */
                (void)cmpxchg(&dev->gnd_ready, GNILND_DEV_IRQ, GNILND_DEV_LOOP);
 
+               down_read(&dev->gnd_conn_sem);
                /* always check these - they are super low cost  */
                found_work += kgnilnd_check_fma_send_cq(dev);
                found_work += kgnilnd_check_fma_rcv_cq(dev);
                /* always check these - they are super low cost  */
                found_work += kgnilnd_check_fma_send_cq(dev);
                found_work += kgnilnd_check_fma_rcv_cq(dev);
@@ -4299,21 +4906,23 @@ kgnilnd_scheduler(void *arg)
                 * transistion
                 * ...should.... */
 
                 * transistion
                 * ...should.... */
 
+               up_read(&dev->gnd_conn_sem);
+
                /* process all conns ready now */
                /* process all conns ready now */
-               found_work += kgnilnd_process_conns(dev);
+               found_work += kgnilnd_process_conns(dev, deadline);
 
                /* do an eager check to avoid the IRQ disabling in
                 * prepare_to_wait and friends */
 
 
                /* do an eager check to avoid the IRQ disabling in
                 * prepare_to_wait and friends */
 
-               if (found_work && busy_loops++ < *kgnilnd_tunables.kgn_loops) {
+               if (found_work &&
+                  (busy_loops++ < *kgnilnd_tunables.kgn_loops) &&
+                  time_before(jiffies, deadline)) {
                        found_work = 0;
                        if ((busy_loops % 10) == 0) {
                                /* tickle heartbeat and watchdog to ensure our
                                 * piggishness doesn't turn into heartbeat failure */
                                touch_nmi_watchdog();
                        found_work = 0;
                        if ((busy_loops % 10) == 0) {
                                /* tickle heartbeat and watchdog to ensure our
                                 * piggishness doesn't turn into heartbeat failure */
                                touch_nmi_watchdog();
-                               if (kgnilnd_hssops.hb_to_l0 != NULL) {
-                                       kgnilnd_hssops.hb_to_l0();
-                               }
+                               kgnilnd_hw_hb();
                        }
                        continue;
                }
                        }
                        continue;
                }
@@ -4332,7 +4941,8 @@ kgnilnd_scheduler(void *arg)
 
                found_work += xchg(&dev->gnd_ready, GNILND_DEV_IDLE);
 
 
                found_work += xchg(&dev->gnd_ready, GNILND_DEV_IDLE);
 
-               if (busy_loops >= *kgnilnd_tunables.kgn_loops) {
+               if ((busy_loops >= *kgnilnd_tunables.kgn_loops) ||
+                  time_after_eq(jiffies, deadline)) {
                        CDEBUG(D_INFO,
                               "yeilding: found_work %d busy_loops %d\n",
                               found_work, busy_loops);
                        CDEBUG(D_INFO,
                               "yeilding: found_work %d busy_loops %d\n",
                               found_work, busy_loops);
@@ -4346,8 +4956,10 @@ kgnilnd_scheduler(void *arg)
                         * again. yield() ensures we wake up without another
                         * waitq poke in that case */
                        atomic_inc(&dev->gnd_n_yield);
                         * again. yield() ensures we wake up without another
                         * waitq poke in that case */
                        atomic_inc(&dev->gnd_n_yield);
+                       kgnilnd_data.kgn_last_condresched = jiffies;
                        yield();
                        CDEBUG(D_INFO, "awake after yeild\n");
                        yield();
                        CDEBUG(D_INFO, "awake after yeild\n");
+                       deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_sched_timeout);
                } else if (found_work == GNILND_DEV_IDLE) {
                        /* busy_loops is low and there is nothing to do,
                         * go to sleep and wait for a waitq poke */
                } else if (found_work == GNILND_DEV_IDLE) {
                        /* busy_loops is low and there is nothing to do,
                         * go to sleep and wait for a waitq poke */
@@ -4355,8 +4967,10 @@ kgnilnd_scheduler(void *arg)
                               "scheduling: found_work %d busy_loops %d\n",
                               found_work, busy_loops);
                        atomic_inc(&dev->gnd_n_schedule);
                               "scheduling: found_work %d busy_loops %d\n",
                               found_work, busy_loops);
                        atomic_inc(&dev->gnd_n_schedule);
+                       kgnilnd_data.kgn_last_scheduled = jiffies;
                        schedule();
                        CDEBUG(D_INFO, "awake after schedule\n");
                        schedule();
                        CDEBUG(D_INFO, "awake after schedule\n");
+                       deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_sched_timeout);
                }
                finish_wait(&dev->gnd_waitq, &wait);
        }
                }
                finish_wait(&dev->gnd_waitq, &wait);
        }
index 38aee5b..39716b8 100644 (file)
@@ -1,7 +1,6 @@
 /*
  * Copyright (C) 2012 Cray, Inc.
  *
 /*
  * Copyright (C) 2012 Cray, Inc.
  *
- *   Author: Igor Gorodetsky <iogordet@cray.com>
  *   Author: Nic Henke <nic@cray.com>
  *   Author: James Shimek <jshimek@cray.com>
  *
  *   Author: Nic Henke <nic@cray.com>
  *   Author: James Shimek <jshimek@cray.com>
  *
@@ -263,6 +262,7 @@ kgnilnd_unmap_fmablk(kgn_device_t *dev, kgn_fma_memblock_t *fma_blk)
        /* PHYS blocks don't get mapped */
        if (fma_blk->gnm_state != GNILND_FMABLK_PHYS) {
                atomic64_sub(fma_blk->gnm_blk_size, &dev->gnd_nbytes_map);
        /* PHYS blocks don't get mapped */
        if (fma_blk->gnm_state != GNILND_FMABLK_PHYS) {
                atomic64_sub(fma_blk->gnm_blk_size, &dev->gnd_nbytes_map);
+               fma_blk->gnm_state = GNILND_FMABLK_IDLE;
        } else if (kgnilnd_data.kgn_in_reset) {
                /* in stack reset, clear MDD handle for PHYS blocks, as we'll
                 * re-use the fma_blk after reset so we don't have to drop/allocate
        } else if (kgnilnd_data.kgn_in_reset) {
                /* in stack reset, clear MDD handle for PHYS blocks, as we'll
                 * re-use the fma_blk after reset so we don't have to drop/allocate
@@ -388,6 +388,8 @@ kgnilnd_find_free_mbox(kgn_conn_t *conn)
 
                mbox = &fma_blk->gnm_mbox_info[id];
                mbox->mbx_create_conn_memset = jiffies;
 
                mbox = &fma_blk->gnm_mbox_info[id];
                mbox->mbx_create_conn_memset = jiffies;
+               mbox->mbx_nallocs++;
+               mbox->mbx_nallocs_total++;
 
                /* zero mbox to remove any old data from our last use.
                 * this better be safe, if not our purgatory timers
 
                /* zero mbox to remove any old data from our last use.
                 * this better be safe, if not our purgatory timers
@@ -508,6 +510,7 @@ kgnilnd_release_mbox(kgn_conn_t *conn, int purgatory_hold)
                        "conn %p bit %d already cleared in fma_blk %p\n",
                         conn, id, fma_blk);
                conn->gnc_fma_blk = NULL;
                        "conn %p bit %d already cleared in fma_blk %p\n",
                         conn, id, fma_blk);
                conn->gnc_fma_blk = NULL;
+               mbox->mbx_nallocs--;
        }
 
        if (CFS_FAIL_CHECK(CFS_FAIL_GNI_FMABLK_AVAIL)) {
        }
 
        if (CFS_FAIL_CHECK(CFS_FAIL_GNI_FMABLK_AVAIL)) {
@@ -923,7 +926,7 @@ kgnilnd_alloc_dgram(kgn_dgram_t **dgramp, kgn_device_t *dev, kgn_dgram_type_t ty
        kgn_dgram_t         *dgram;
 
        dgram = cfs_mem_cache_alloc(kgnilnd_data.kgn_dgram_cache,
        kgn_dgram_t         *dgram;
 
        dgram = cfs_mem_cache_alloc(kgnilnd_data.kgn_dgram_cache,
-                                   CFS_ALLOC_ATOMIC);
+                                       CFS_ALLOC_ATOMIC);
        if (dgram == NULL)
                return -ENOMEM;
 
        if (dgram == NULL)
                return -ENOMEM;
 
@@ -1326,9 +1329,11 @@ kgnilnd_release_dgram(kgn_device_t *dev, kgn_dgram_t *dgram)
                        int     rerc;
 
                        rerc = kgnilnd_post_dgram(dev, LNET_NID_ANY, GNILND_CONNREQ_REQ, 0);
                        int     rerc;
 
                        rerc = kgnilnd_post_dgram(dev, LNET_NID_ANY, GNILND_CONNREQ_REQ, 0);
-                       LASSERTF(rerc == 0,
-                               "error %d: dev %d could not repost wildcard datagram id 0x%p\n",
-                               rerc, dev->gnd_id, dgram);
+                       if (rerc != 0) {
+                               /* We failed to repost the WC dgram for some reason
+                                * mark it so the repost system attempts to repost */
+                               kgnilnd_admin_addref(dev->gnd_nwcdgrams);
+                       }
                }
 
                /* always free the old dgram */
                }
 
                /* always free the old dgram */
@@ -1740,6 +1745,12 @@ kgnilnd_finish_connect(kgn_dgram_t *dgram)
                }
        }
 
                }
        }
 
+       if (peer->gnp_down == GNILND_RCA_NODE_DOWN) {
+               CNETERR("Received connection request from %s that RCA thinks is"
+                       " down.\n", libcfs_nid2str(her_nid));
+               peer->gnp_down = GNILND_RCA_NODE_UP;
+       }
+
        nstale = kgnilnd_close_stale_conns_locked(peer, conn);
 
        /* either way with peer (new or existing), we are ok with ref counts here as the
        nstale = kgnilnd_close_stale_conns_locked(peer, conn);
 
        /* either way with peer (new or existing), we are ok with ref counts here as the
@@ -1761,6 +1772,9 @@ kgnilnd_finish_connect(kgn_dgram_t *dgram)
        conn->gnc_last_tx = jiffies - (cfs_time_seconds(GNILND_TO2KA(conn->gnc_timeout)) * 2);
        conn->gnc_state = GNILND_CONN_ESTABLISHED;
 
        conn->gnc_last_tx = jiffies - (cfs_time_seconds(GNILND_TO2KA(conn->gnc_timeout)) * 2);
        conn->gnc_state = GNILND_CONN_ESTABLISHED;
 
+       /* save the dgram type used to establish this connection */
+       conn->gnc_dgram_type = dgram->gndg_type;
+
        /* refs are not transferred from dgram to tables, so increment to
         * take ownership */
        kgnilnd_conn_addref(conn);
        /* refs are not transferred from dgram to tables, so increment to
         * take ownership */
        kgnilnd_conn_addref(conn);
@@ -1838,10 +1852,6 @@ kgnilnd_finish_connect(kgn_dgram_t *dgram)
        lnet_notify(peer->gnp_net->gnn_ni, peer->gnp_nid,
                     1, cfs_time_current());
 
        lnet_notify(peer->gnp_net->gnn_ni, peer->gnp_nid,
                     1, cfs_time_current());
 
-       /* schedule the conn to pick up any SMSG sent by peer before we could
-        * process this dgram */
-       kgnilnd_schedule_conn(conn);
-
        /* drop our 'hold' ref */
        kgnilnd_conn_decref(conn);
 
        /* drop our 'hold' ref */
        kgnilnd_conn_decref(conn);
 
@@ -2203,7 +2213,7 @@ kgnilnd_dgram_waitq(void *arg)
 }
 
 int
 }
 
 int
-kgnilnd_start_outbound_dgrams(kgn_device_t *dev)
+kgnilnd_start_outbound_dgrams(kgn_device_t *dev, unsigned long deadline)
 {
        int                      did_something = 0, rc;
        kgn_peer_t              *peer = NULL;
 {
        int                      did_something = 0, rc;
        kgn_peer_t              *peer = NULL;
@@ -2211,7 +2221,7 @@ kgnilnd_start_outbound_dgrams(kgn_device_t *dev)
        spin_lock(&dev->gnd_connd_lock);
 
        /* Active connect - we added this in kgnilnd_launch_tx */
        spin_lock(&dev->gnd_connd_lock);
 
        /* Active connect - we added this in kgnilnd_launch_tx */
-       while (!list_empty(&dev->gnd_connd_peers)) {
+       while (!list_empty(&dev->gnd_connd_peers) && time_before(jiffies, deadline)) {
                peer = list_first_entry(&dev->gnd_connd_peers,
                                        kgn_peer_t, gnp_connd_list);
 
                peer = list_first_entry(&dev->gnd_connd_peers,
                                        kgn_peer_t, gnp_connd_list);
 
@@ -2298,6 +2308,29 @@ kgnilnd_start_outbound_dgrams(kgn_device_t *dev)
        RETURN(did_something);
 }
 
        RETURN(did_something);
 }
 
+int
+kgnilnd_repost_wc_dgrams(kgn_device_t *dev)
+{
+       int did_something = 0, to_repost, i;
+       to_repost = atomic_read(&dev->gnd_nwcdgrams);
+       ENTRY;
+
+       for (i = 0; i < to_repost; ++i) {
+               int     rerc;
+               rerc = kgnilnd_post_dgram(dev, LNET_NID_ANY, GNILND_CONNREQ_REQ, 0);
+               if (rerc == 0) {
+                       kgnilnd_admin_decref(dev->gnd_nwcdgrams);
+                       did_something += 1;
+               } else {
+                       CDEBUG(D_NETERROR, "error %d: dev %d could not post wildcard datagram\n",
+                               rerc, dev->gnd_id);
+                       break;
+               }
+       }
+
+       RETURN(did_something);
+}
+
 static void
 kgnilnd_dgram_poke_with_stick(unsigned long arg)
 {
 static void
 kgnilnd_dgram_poke_with_stick(unsigned long arg)
 {
@@ -2317,6 +2350,7 @@ kgnilnd_dgram_mover(void *arg)
        unsigned long            next_purge_check = jiffies - 1;
        unsigned long            timeout;
        struct timer_list        timer;
        unsigned long            next_purge_check = jiffies - 1;
        unsigned long            timeout;
        struct timer_list        timer;
+       unsigned long            deadline = 0;
        DEFINE_WAIT(wait);
 
        snprintf(name, sizeof(name), "kgnilnd_dg_%02d", dev->gnd_id);
        DEFINE_WAIT(wait);
 
        snprintf(name, sizeof(name), "kgnilnd_dg_%02d", dev->gnd_id);
@@ -2328,7 +2362,7 @@ kgnilnd_dgram_mover(void *arg)
        /* we are ok not locking for these variables as the dgram waitq threads
         * will block both due to tying up net (kgn_shutdown) and the completion
         * event for the dgram_waitq (kgn_quiesce_trigger) */
        /* we are ok not locking for these variables as the dgram waitq threads
         * will block both due to tying up net (kgn_shutdown) and the completion
         * event for the dgram_waitq (kgn_quiesce_trigger) */
-
+       deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_dgram_timeout);
        while (!kgnilnd_data.kgn_shutdown) {
                /* Safe: kgn_shutdown only set when quiescent */
 
        while (!kgnilnd_data.kgn_shutdown) {
                /* Safe: kgn_shutdown only set when quiescent */
 
@@ -2356,8 +2390,10 @@ kgnilnd_dgram_mover(void *arg)
 
                up_read(&kgnilnd_data.kgn_net_rw_sem);
 
 
                up_read(&kgnilnd_data.kgn_net_rw_sem);
 
+               CFS_FAIL_TIMEOUT(CFS_FAIL_GNI_DGRAM_DEADLINE,
+                       (*kgnilnd_tunables.kgn_dgram_timeout + 1));
                /* start new outbound dgrams */
                /* start new outbound dgrams */
-               did_something += kgnilnd_start_outbound_dgrams(dev);
+               did_something += kgnilnd_start_outbound_dgrams(dev, deadline);
 
                /* find dead dgrams */
                if (time_after_eq(jiffies, next_purge_check)) {
 
                /* find dead dgrams */
                if (time_after_eq(jiffies, next_purge_check)) {
@@ -2368,13 +2404,15 @@ kgnilnd_dgram_mover(void *arg)
                                      cfs_time_seconds(kgnilnd_data.kgn_new_min_timeout / 4);
                }
 
                                      cfs_time_seconds(kgnilnd_data.kgn_new_min_timeout / 4);
                }
 
+               did_something += kgnilnd_repost_wc_dgrams(dev);
+
                /* careful with the jiffy wrap... */
                timeout = (long)(next_purge_check - jiffies);
 
                CDEBUG(D_INFO, "did %d timeout %lu next %lu jiffies %lu\n",
                       did_something, timeout, next_purge_check, jiffies);
 
                /* careful with the jiffy wrap... */
                timeout = (long)(next_purge_check - jiffies);
 
                CDEBUG(D_INFO, "did %d timeout %lu next %lu jiffies %lu\n",
                       did_something, timeout, next_purge_check, jiffies);
 
-               if (did_something || timeout <= 0) {
+               if ((did_something || timeout <= 0) && time_before(jiffies, deadline)) {
                        did_something = 0;
                        continue;
                }
                        did_something = 0;
                        continue;
                }
@@ -2387,8 +2425,9 @@ kgnilnd_dgram_mover(void *arg)
                /* last second chance for others to poke us */
                did_something += xchg(&dev->gnd_dgram_ready, GNILND_DGRAM_IDLE);
 
                /* last second chance for others to poke us */
                did_something += xchg(&dev->gnd_dgram_ready, GNILND_DGRAM_IDLE);
 
-               /* check flag variables before comitting */
-               if (!did_something &&
+               /* check flag variables before comittingi even if we did something;
+                * if we are after the deadline call schedule */
+               if ((!did_something || time_after(jiffies, deadline)) &&
                    !kgnilnd_data.kgn_shutdown &&
                    !kgnilnd_data.kgn_quiesce_trigger) {
                        CDEBUG(D_INFO, "schedule timeout %ld (%lu sec)\n",
                    !kgnilnd_data.kgn_shutdown &&
                    !kgnilnd_data.kgn_quiesce_trigger) {
                        CDEBUG(D_INFO, "schedule timeout %ld (%lu sec)\n",
@@ -2396,6 +2435,7 @@ kgnilnd_dgram_mover(void *arg)
                        wake_up_all(&dev->gnd_dgping_waitq);
                        schedule();
                        CDEBUG(D_INFO, "awake after schedule\n");
                        wake_up_all(&dev->gnd_dgping_waitq);
                        schedule();
                        CDEBUG(D_INFO, "awake after schedule\n");
+                       deadline = jiffies + cfs_time_seconds(*kgnilnd_tunables.kgn_dgram_timeout);
                }
 
                del_singleshot_timer_sync(&timer);
                }
 
                del_singleshot_timer_sync(&timer);
diff --git a/lnet/klnds/gnilnd/gnilnd_gemini.h b/lnet/klnds/gnilnd/gnilnd_gemini.h
new file mode 100644 (file)
index 0000000..61ebe81
--- /dev/null
@@ -0,0 +1,135 @@
+/*
+ * Copyright (C) 2009-2012 Cray, Inc.
+ *
+ *   Author: Nic Henke <nic@cray.com>
+ *   Author: James Shimek <jshimek@cray.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+#ifndef _GNILND_GEMINI_H
+#define _GNILND_GEMINI_H
+
+#ifndef _GNILND_HSS_OPS_H
+# error "must include gnilnd_hss_ops.h first"
+#endif
+
+/* Set HW related values */
+#define GNILND_BASE_TIMEOUT        60            /* default sane timeout */
+#define GNILND_CHECKSUM_DEFAULT     3            /* all on for Gemini */
+
+#define GNILND_REVERSE_RDMA        GNILND_REVERSE_NONE
+#define GNILND_RDMA_DLVR_OPTION     GNI_DLVMODE_PERFORMANCE
+
+/* plug in our functions for use on the simulator */
+#if !defined(GNILND_USE_RCA)
+
+#define kgnilnd_hw_hb()              do {} while(0)
+
+/* fake a table that'd work for lookups in SimNow */
+
+typedef struct kgn_nid_entry {
+       __u32   nid;
+       __u32   nicaddr;
+} kgn_nid_entry_t;
+
+/* Gemini SimNow has a hard coded table to use - no RCA there */
+#define GNILND_MAX_NID_TABLE    0xffffffff
+
+/* this is all of the nodes defined in the Baker SimNow "sim_platforms" page */
+static kgn_nid_entry_t kgn_nid_table[] = {
+       {0x1, 0x100}, {0x2, 0x101}, {0x3, 0x104}, {0x4, 0x105},
+       {0x5, 0x108}, {0x6, 0x109}, {0x7, 0x10c}, {0x8, 0x10d},
+       {0x9, 0x110}, {0xa, 0x111}, {0xb, 0x114}, {0xc, 0x115},
+       {0xd, 0x118}, {0xe, 0x119}, {0xf, 0x11c}, {0x10, 0x11d},
+       {0x11, 0x120}, {0x12, 0x121}, {0x13, 0x124}, {0x14, 0x125},
+       {0x15, 0x128}, {0x16, 0x129}, {0x17, 0x12c}, {0x18, 0x12d},
+       {0x19, 0x130}, {0x1a, 0x131}, {0x1b, 0x134}, {0x1c, 0x135},
+       {0x1d, 0x138}, {0x1e, 0x139}, {0x1f, 0x13c}, {0x20, 0x13d},
+       {0x21, 0x140}, {0x22, 0x141}, {0x23, 0x144}, {0x24, 0x145},
+       {0x25, 0x148}, {0x26, 0x149}, {0x27, 0x14c}, {0x28, 0x14d},
+       {0x29, 0x150}, {0x2a, 0x151}, {0x2b, 0x154}, {0x2c, 0x155},
+       {0x2d, 0x158}, {0x2e, 0x159}, {0x2f, 0x15c}, {0x30, 0x15d},
+       {0x31, 0x160}, {0x32, 0x161}, {0x33, 0x164}, {0x3d, 0x178},
+       {0x34, 0x165}, {0x3e, 0x179}, {0x35, 0x168}, {0x3f, 0x17c},
+       {0x36, 0x169}, {0x40, 0x17d}, {0x37, 0x16c}, {0x41, 0x180},
+       {0x38, 0x16d}, {0x42, 0x181}, {0x39, 0x170}, {0x3a, 0x171},
+       {0x3b, 0x174}, {0x3c, 0x175}, {0x43, 0x184}, {0x44, 0x185},
+       {0x45, 0x188}, {0x46, 0x189}, {0x47, 0x18c}, {0x48, 0x18d},
+       /* entries after this are for 'dead' peer tests */
+       {0x63, 0x1ff}, {0x111, 0x209},
+       {GNILND_MAX_NID_TABLE, GNILND_MAX_NID_TABLE}
+};
+
+static inline int
+kgnilnd_nid_to_nicaddrs(__u32 nid, int numnic, __u32 *nicaddr)
+{
+       int i;
+       /* GNILND_NO_RCA, so use hardcoded table for Gemini SimNow */
+       if (numnic > 1) {
+               CERROR("manual nid2nic translation doesn't support"
+                      "multiple nic addrs (you asked for %d)\n",
+                       numnic);
+               return -EINVAL;
+       }
+       for (i = 0; ; i++) {
+               if (kgn_nid_table[i].nid == GNILND_MAX_NID_TABLE) {
+                       CERROR("could not translate %u to a NIC "
+                              "address\n", nid);
+                       return -ESRCH;
+               }
+               if (kgn_nid_table[i].nid == nid) {
+                       *nicaddr = kgn_nid_table[i].nicaddr;
+                       CDEBUG(D_NETTRACE, "Sim nid %d -> nic 0x%x\n", nid, *nicaddr);
+
+                       return 1;
+               }
+       }
+}
+
+static inline int
+kgnilnd_nicaddr_to_nid(__u32 nicaddr, __u32 *nid)
+{
+       int i;
+       /* GNILND_RCA_NOT_HOME, so use hardcoded table for SimNow */
+       for (i = 0; ; i++) {
+               if (kgn_nid_table[i].nicaddr == GNILND_MAX_NID_TABLE) {
+                       CERROR("could not translate NIC address "
+                               "%u\n",
+                               nicaddr);
+                       return -ESRCH;
+               }
+               if (kgn_nid_table[i].nicaddr == nicaddr) {
+                       *nid = kgn_nid_table[i].nid;
+                       return 1;
+               }
+       }
+}
+
+static inline int
+kgnilnd_setup_nic_translation(__u32 device_id)
+{
+       LCONSOLE_INFO("using Gemini SimNow nid table for RCA translation\n");
+
+       /* no real setup for Gemini Sim, just log the console message */
+
+       return 0;
+}
+
+#endif /* GNILND_USE_RCA */
+
+
+#endif /* _GNILND_GEMINI_H */
index ec75177..8e5a902 100644 (file)
@@ -1,6 +1,8 @@
 /*
 /*
- * Copyright (C) 2010-2012 Cray, Inc.
+ * Copyright (C) 2009-2012 Cray, Inc.
+ *
  *   Author: Nic Henke <nic@cray.com>
  *   Author: Nic Henke <nic@cray.com>
+ *   Author: James Shimek <jshimek@cray.com>
  *
  *   This file is part of Lustre, http://www.lustre.org.
  *
  *
  *   This file is part of Lustre, http://www.lustre.org.
  *
 #ifndef _GNILND_HSS_OPS_H
 #define _GNILND_HSS_OPS_H
 
 #ifndef _GNILND_HSS_OPS_H
 #define _GNILND_HSS_OPS_H
 
-/* for krca nid & nic translation */
-#include <krca_lib.h>
 #include <linux/typecheck.h>
 
 #include <linux/typecheck.h>
 
-/* the SimNow nodes can't load rca.ko, so we need to detect this
- * and fake a table that'd work for lookups there */
-
-typedef struct kgn_nid_entry {
-       __u32   nid;
-       __u32   nicaddr;
-} kgn_nid_entry_t;
-
-typedef struct kgn_hssops
-{
-       /* function pointers for nid and nic conversion */
-       /* from krca_lib.h */
-       int     (*nid_to_nicaddr)(__u32 nid, int numnic, __u32 *nicaddr);
-       int     (*nicaddr_to_nid)(__u32 nicaddr, __u32 *nid);
-       void    (*hb_to_l0)(void);
-} kgn_hssops_t;
-
-/* pull in static store in gnilnd.c */
-extern kgn_hssops_t             kgnilnd_hssops;
-
-#define GNILND_NO_RCA           0xdeadbeef
-#define GNILND_NO_QUIESCE       0xdeadbeef
-
-static inline int
-kgnilnd_lookup_rca_funcs(void)
-{
-        void    *funcp;
-
-       funcp = __symbol_get("send_hb_2_l0");
-       if (funcp == 0) {
-               CERROR("couldn't find send_hb_2_l0\n");
-               /* not fatal for now */
-       } else {
-               kgnilnd_hssops.hb_to_l0 = funcp;
-       }
-
-       /* if we find one, we should get the other */
-
-       funcp = __symbol_get("krca_nid_to_nicaddrs");
-       if (funcp == 0) {
-               kgnilnd_hssops.nid_to_nicaddr = (void *)GNILND_NO_RCA;
-               kgnilnd_hssops.nicaddr_to_nid = (void *)GNILND_NO_RCA;
-               LCONSOLE_INFO("using SimNow nid table for RCA translation\n");
-               return 0;
-       }
-       kgnilnd_hssops.nid_to_nicaddr = funcp;
-
-       funcp = __symbol_get("krca_nicaddr_to_nid");
-       if (funcp == 0) {
-               CERROR("found krca_nid_to_nicaddrs but not "
-                      "krca_nicaddr_to_nid\n");
-               return -ESRCH;
-       }
-       kgnilnd_hssops.nicaddr_to_nid = funcp;
-       return 0;
-}
-
-#if defined(CONFIG_CRAY_GEMINI)
-/* Gemini SimNow has a hard coded table to use - no RCA there */
-#define GNILND_MAX_NID_TABLE    0xffffffff
-/* this is all of the nodes defined in the Baker SimNow "sim_platforms" page */
-static kgn_nid_entry_t kgn_nid_table[] = {
-       {0x1, 0x100}, {0x2, 0x101}, {0x3, 0x104}, {0x4, 0x105},
-       {0x5, 0x108}, {0x6, 0x109}, {0x7, 0x10c}, {0x8, 0x10d},
-       {0x9, 0x110}, {0xa, 0x111}, {0xb, 0x114}, {0xc, 0x115},
-       {0xd, 0x118}, {0xe, 0x119}, {0xf, 0x11c}, {0x10, 0x11d},
-       {0x11, 0x120}, {0x12, 0x121}, {0x13, 0x124}, {0x14, 0x125},
-       {0x15, 0x128}, {0x16, 0x129}, {0x17, 0x12c}, {0x18, 0x12d},
-       {0x19, 0x130}, {0x1a, 0x131}, {0x1b, 0x134}, {0x1c, 0x135},
-       {0x1d, 0x138}, {0x1e, 0x139}, {0x1f, 0x13c}, {0x20, 0x13d},
-       {0x21, 0x140}, {0x22, 0x141}, {0x23, 0x144}, {0x24, 0x145},
-       {0x25, 0x148}, {0x26, 0x149}, {0x27, 0x14c}, {0x28, 0x14d},
-       {0x29, 0x150}, {0x2a, 0x151}, {0x2b, 0x154}, {0x2c, 0x155},
-       {0x2d, 0x158}, {0x2e, 0x159}, {0x2f, 0x15c}, {0x30, 0x15d},
-       {0x31, 0x160}, {0x32, 0x161}, {0x33, 0x164}, {0x3d, 0x178},
-       {0x34, 0x165}, {0x3e, 0x179}, {0x35, 0x168}, {0x3f, 0x17c},
-       {0x36, 0x169}, {0x40, 0x17d}, {0x37, 0x16c}, {0x41, 0x180},
-       {0x38, 0x16d}, {0x42, 0x181}, {0x39, 0x170}, {0x3a, 0x171},
-       {0x3b, 0x174}, {0x3c, 0x175}, {0x43, 0x184}, {0x44, 0x185},
-       {0x45, 0x188}, {0x46, 0x189}, {0x47, 0x18c}, {0x48, 0x18d},
-       /* entries after this are for 'dead' peer tests */
-       {0x63, 0x1ff}, {0x111, 0x209},
-       {GNILND_MAX_NID_TABLE, GNILND_MAX_NID_TABLE}
-};
-static int
-gemini_nid_to_nicaddr(__u32 nid, int numnic, __u32 *nicaddr)
-{
-       int i;
-
-       /* GNILND_NO_RCA, so use hardcoded table for Gemini SimNow */
-       if (numnic > 1) {
-               CERROR("manual nid2nic translation doesn't support"
-                      "multiple nic addrs (you asked for %d)\n",
-                       numnic);
-               return -EINVAL;
-       }
-
-       for (i = 0;;i++) {
-               if (kgn_nid_table[i].nid == GNILND_MAX_NID_TABLE) {
-                       CERROR("could not translate %u to a NIC "
-                              "address\n", nid);
-                       return -ESRCH;
-               }
-               if (kgn_nid_table[i].nid == nid) {
-                       *nicaddr = kgn_nid_table[i].nicaddr;
-                       return 1;
-               }
-       }
-}
-
-static int
-gemini_nicaddr_to_nid(__u32 nicaddr, __u32 *nid)
-{
-       int i;
-
-       /* GNILND_RCA_NOT_HOME, so use hardcoded table for SimNow */
-       for (i = 0;;i++) {
-               if (kgn_nid_table[i].nicaddr == GNILND_MAX_NID_TABLE) {
-                       CERROR("could not translate NIC address "
-                               "%u\n",
-                               nicaddr);
-                       return -ESRCH;
-               }
-               if (kgn_nid_table[i].nicaddr == nicaddr) {
-                       *nid = kgn_nid_table[i].nid;
-                       return 1;
-               }
-       }
-}
-
-static inline int
-kgnilnd_setup_nic_translation(__u32 device_id)
-{
-        int rc;
-
-       /* do lookup on first use */
-       if (unlikely(kgnilnd_hssops.nid_to_nicaddr == NULL)) {
-               rc = kgnilnd_lookup_rca_funcs();
-               if (rc)
-                       return rc;
-       }
-
-       /* if we have a real function, return - we'll use those going forward */
-       if (likely(kgnilnd_hssops.nid_to_nicaddr != (void *)GNILND_NO_RCA))
-               return 0;
-
-       kgnilnd_hssops.nid_to_nicaddr = gemini_nid_to_nicaddr;
-       kgnilnd_hssops.nicaddr_to_nid = gemini_nicaddr_to_nid;
-       return 0;
-}
-
-#elif defined(CONFIG_CRAY_ARIES)
-/* for libcfs_ipif_query */
-#include <libcfs/libcfs.h>
-
-/* Aries Sim doesn't have hardcoded tables, so we'll hijack the nic_pe
- * and decode our address and nic addr from that - the rest are just offsets */
-static __u32 aries_sim_base_nid;
-static __u32 aries_sim_nic;
-
-static int
-aries_nid_to_nicaddr(__u32 nid, int numnic, __u32 *nicaddr)
-{
-       if (numnic > 1) {
-               CERROR("manual nid2nic translation doesn't support"
-                      "multiple nic addrs (you asked for %d)\n",
-                       numnic);
-               return -EINVAL;
-       }
-       if (nid < aries_sim_base_nid) {
-               CERROR("Request for invalid nid translation %u, minimum %u\n",
-                      nid, aries_sim_base_nid);
-               return -ESRCH;
-       }
+#if defined(GNILND_USE_RCA)
+/* for krca nid & nic translation */
+#include <krca_lib.h>
 
 
-       *nicaddr = nid - aries_sim_base_nid;
-       return 1;
-}
+/* it isn't exported, so just point directly to it */
+extern void send_hb_2_l0(void);
 
 
-static int
-aries_nicaddr_to_nid(__u32 nicaddr, __u32 *nid)
+static inline void
+kgnilnd_hw_hb(void)
 {
 {
-       *nid = aries_sim_base_nid + nicaddr;
-       return 1;
+       send_hb_2_l0();
 }
 
 }
 
-/* XXX Nic: This does not support multiple device!!!! */
-static inline int
-kgnilnd_setup_nic_translation(__u32 device_id)
-{
-       char              *if_name = "ipogif0";
-       __u32              ipaddr, netmask, my_nid;
-       int                up, rc;
-
-       /* do lookup on first use */
-       if (unlikely(kgnilnd_hssops.nid_to_nicaddr == NULL)) {
-               rc = kgnilnd_lookup_rca_funcs();
-               if (rc)
-                       return rc;
-       }
-
-       /* if we have a real function, return - we'll use those going forward */
-       if (likely(kgnilnd_hssops.nid_to_nicaddr != (void *)GNILND_NO_RCA))
-               return 0;
-
-       rc = libcfs_ipif_query(if_name, &up, &ipaddr, &netmask);
-       if (rc != 0) {
-               CERROR("can't get IP interface for %s: %d\n", if_name, rc);
-               return rc;
-       }
-       if (!up) {
-               CERROR("IP interface %s is down\n", if_name);
-               return -ENODEV;
-       }
-
-       my_nid = ((ipaddr >> 8) & 0xFF) + (ipaddr & 0xFF);
-       aries_sim_nic = device_id;
-       aries_sim_base_nid = my_nid - aries_sim_nic;
-
-       kgnilnd_hssops.nid_to_nicaddr = aries_nid_to_nicaddr;
-       kgnilnd_hssops.nicaddr_to_nid = aries_nicaddr_to_nid;
-
-       return 0;
-}
-#else
-#error "Undefined Network Type"
-#endif
-
 /* we use RCA types here to get the compiler to whine when we have
  * mismatched types */
 static inline int
 kgnilnd_nid_to_nicaddrs(rca_nid_t nid, int numnic, nic_addr_t *nicaddrs)
 {
 /* we use RCA types here to get the compiler to whine when we have
  * mismatched types */
 static inline int
 kgnilnd_nid_to_nicaddrs(rca_nid_t nid, int numnic, nic_addr_t *nicaddrs)
 {
+       int     rc;
+
        /* compile time checks to ensure that the RCA types match
         * the LNet idea of NID and NIC */
        typecheck(__u32, nid);
        typecheck(__u32, *nicaddrs);
 
        /* compile time checks to ensure that the RCA types match
         * the LNet idea of NID and NIC */
        typecheck(__u32, nid);
        typecheck(__u32, *nicaddrs);
 
-       LASSERTF(kgnilnd_hssops.nid_to_nicaddr != NULL, "missing setup?\n");
+       rc = krca_nid_to_nicaddrs(nid, numnic, nicaddrs);
 
 
-       return kgnilnd_hssops.nid_to_nicaddr(nid, numnic, nicaddrs);
+       CDEBUG(D_NETTRACE, "RCA nid %d -> nic 0x%x, rc: %d\n",
+              nid, nicaddrs[0], rc);
+
+       RETURN(rc);
 }
 
 static inline int
 }
 
 static inline int
@@ -276,9 +66,15 @@ kgnilnd_nicaddr_to_nid(nic_addr_t nicaddr, rca_nid_t *nid)
        typecheck(__u32, nicaddr);
        typecheck(__u32, nid[0]);
 
        typecheck(__u32, nicaddr);
        typecheck(__u32, nid[0]);
 
-       LASSERTF(kgnilnd_hssops.nicaddr_to_nid != NULL, "missing setup ?\n");
+       return krca_nicaddr_to_nid(nicaddr, nid);
+}
 
 
-       return kgnilnd_hssops.nicaddr_to_nid(nicaddr, nid);
+static inline int
+kgnilnd_setup_nic_translation(__u32 device_id)
+{
+       return 0;
 }
 
 }
 
+#endif /* GNILND_USE_RCA */
+
 #endif /* _GNILND_HSS_OPS_H */
 #endif /* _GNILND_HSS_OPS_H */
index 17cbfd6..6707e58 100644 (file)
@@ -66,11 +66,7 @@ static int max_immediate = (2<<10);
 CFS_MODULE_PARM(max_immediate, "i", int, 0644,
                "immediate/RDMA breakpoint");
 
 CFS_MODULE_PARM(max_immediate, "i", int, 0644,
                "immediate/RDMA breakpoint");
 
-#ifdef CONFIG_CRAY_GEMINI
-static int checksum = GNILND_CHECKSUM_SMSG_BTE;
-#else
-static int checksum = 0;
-#endif
+static int checksum = GNILND_CHECKSUM_DEFAULT;
 CFS_MODULE_PARM(checksum, "i", int, 0644,
                "0: None, 1: headers, 2: short msg, 3: all traffic");
 
 CFS_MODULE_PARM(checksum, "i", int, 0644,
                "0: None, 1: headers, 2: short msg, 3: all traffic");
 
@@ -78,14 +74,10 @@ static int checksum_dump = 0;
 CFS_MODULE_PARM(checksum_dump, "i", int, 0644,
                "0: None, 1: dump log on failure, 2: payload data to D_INFO log");
 
 CFS_MODULE_PARM(checksum_dump, "i", int, 0644,
                "0: None, 1: dump log on failure, 2: payload data to D_INFO log");
 
-static int bte_hash = 1;
-CFS_MODULE_PARM(bte_hash, "i", int, 0644,
+static int bte_dlvr_mode = GNILND_RDMA_DLVR_OPTION;
+CFS_MODULE_PARM(bte_dlvr_mode, "i", int, 0644,
                "enable hashing for BTE (RDMA) transfers");
 
                "enable hashing for BTE (RDMA) transfers");
 
-static int bte_adapt = 1;
-CFS_MODULE_PARM(bte_adapt, "i", int, 0644,
-               "enable adaptive request and response for BTE (RDMA) transfers");
-
 static int bte_relaxed_ordering = 1;
 CFS_MODULE_PARM(bte_relaxed_ordering, "i", int, 0644,
                "enable relaxed ordering (PASSPW) for BTE (RDMA) transfers");
 static int bte_relaxed_ordering = 1;
 CFS_MODULE_PARM(bte_relaxed_ordering, "i", int, 0644,
                "enable relaxed ordering (PASSPW) for BTE (RDMA) transfers");
@@ -95,7 +87,7 @@ CFS_MODULE_PARM(ptag, "i", int, 0444,
                "ptag for Gemini CDM");
 
 static int max_retransmits = 1024;
                "ptag for Gemini CDM");
 
 static int max_retransmits = 1024;
-CFS_MODULE_PARM(max_retransmits, "i", int, 0644,
+CFS_MODULE_PARM(max_retransmits, "i", int, 0444,
                "max retransmits for FMA");
 
 static int nwildcard = 4;
                "max retransmits for FMA");
 
 static int nwildcard = 4;
@@ -122,6 +114,10 @@ static int peer_health = 0;
 CFS_MODULE_PARM(peer_health, "i", int, 0444,
                "Disable peer timeout for LNet peer health, default off, > 0 to enable");
 
 CFS_MODULE_PARM(peer_health, "i", int, 0444,
                "Disable peer timeout for LNet peer health, default off, > 0 to enable");
 
+static int peer_timeout = -1;
+CFS_MODULE_PARM(peer_timeout, "i", int, 0444,
+               "Peer timeout used for peer_health, default based on gnilnd timeout, > -1 to manually set");
+
 static int vmap_cksum = 0;
 CFS_MODULE_PARM(vmap_cksum, "i", int, 0644,
                "use vmap for all kiov checksumming, default off");
 static int vmap_cksum = 0;
 CFS_MODULE_PARM(vmap_cksum, "i", int, 0644,
                "use vmap for all kiov checksumming, default off");
@@ -154,6 +150,22 @@ static int mdd_timeout = GNILND_MDD_TIMEOUT;
 CFS_MODULE_PARM(mdd_timeout, "i", int, 0644,
                "maximum time (in minutes) for mdd to be held");
 
 CFS_MODULE_PARM(mdd_timeout, "i", int, 0644,
                "maximum time (in minutes) for mdd to be held");
 
+static int sched_timeout = GNILND_SCHED_TIMEOUT;
+CFS_MODULE_PARM(sched_timeout, "i", int, 0644,
+               "scheduler aliveness in seconds max time");
+
+static int sched_nice = GNILND_SCHED_NICE;
+CFS_MODULE_PARM(sched_nice, "i", int, 0444,
+               "scheduler's nice setting, default compute 0 service -20");
+
+static int reverse_rdma = GNILND_REVERSE_RDMA;
+CFS_MODULE_PARM(reverse_rdma, "i", int, 0644,
+               "Normal 0: Reverse GET: 1 Reverse Put: 2 Reverse Both: 3");
+
+static int dgram_timeout = GNILND_DGRAM_TIMEOUT;
+CFS_MODULE_PARM(dgram_timeout, "i", int, 0644,
+               "dgram thread aliveness seconds max time");
+
 kgn_tunables_t kgnilnd_tunables = {
        .kgn_min_reconnect_interval = &min_reconnect_interval,
        .kgn_max_reconnect_interval = &max_reconnect_interval,
 kgn_tunables_t kgnilnd_tunables = {
        .kgn_min_reconnect_interval = &min_reconnect_interval,
        .kgn_max_reconnect_interval = &max_reconnect_interval,
@@ -165,8 +177,7 @@ kgn_tunables_t kgnilnd_tunables = {
        .kgn_max_immediate          = &max_immediate,
        .kgn_checksum               = &checksum,
        .kgn_checksum_dump          = &checksum_dump,
        .kgn_max_immediate          = &max_immediate,
        .kgn_checksum               = &checksum,
        .kgn_checksum_dump          = &checksum_dump,
-       .kgn_bte_hash               = &bte_hash,
-       .kgn_bte_adapt              = &bte_adapt,
+       .kgn_bte_dlvr_mode          = &bte_dlvr_mode,
        .kgn_bte_relaxed_ordering   = &bte_relaxed_ordering,
        .kgn_ptag                   = &ptag,
        .kgn_max_retransmits        = &max_retransmits,
        .kgn_bte_relaxed_ordering   = &bte_relaxed_ordering,
        .kgn_ptag                   = &ptag,
        .kgn_max_retransmits        = &max_retransmits,
@@ -176,6 +187,7 @@ kgn_tunables_t kgnilnd_tunables = {
        .kgn_loops                  = &loops,
        .kgn_peer_hash_size         = &hash_size,
        .kgn_peer_health            = &peer_health,
        .kgn_loops                  = &loops,
        .kgn_peer_hash_size         = &hash_size,
        .kgn_peer_health            = &peer_health,
+       .kgn_peer_timeout           = &peer_timeout,
        .kgn_vmap_cksum             = &vmap_cksum,
        .kgn_mbox_per_block         = &mbox_per_block,
        .kgn_nphys_mbox             = &nphys_mbox,
        .kgn_vmap_cksum             = &vmap_cksum,
        .kgn_mbox_per_block         = &mbox_per_block,
        .kgn_nphys_mbox             = &nphys_mbox,
@@ -183,7 +195,11 @@ kgn_tunables_t kgnilnd_tunables = {
        .kgn_sched_threads          = &sched_threads,
        .kgn_net_hash_size          = &net_hash_size,
        .kgn_hardware_timeout       = &hardware_timeout,
        .kgn_sched_threads          = &sched_threads,
        .kgn_net_hash_size          = &net_hash_size,
        .kgn_hardware_timeout       = &hardware_timeout,
-       .kgn_mdd_timeout            = &mdd_timeout
+       .kgn_mdd_timeout            = &mdd_timeout,
+       .kgn_sched_timeout          = &sched_timeout,
+       .kgn_sched_nice             = &sched_nice,
+       .kgn_reverse_rdma           = &reverse_rdma,
+       .kgn_dgram_timeout          = &dgram_timeout
 };
 
 #if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM
 };
 
 #if CONFIG_SYSCTL && !CFS_SYSFS_MODULE_PARM
@@ -254,16 +270,8 @@ static cfs_sysctl_table_t kgnilnd_ctl_table[] = {
        },
        {
                INIT_CTL_NAME(11)
        },
        {
                INIT_CTL_NAME(11)
-               .procname = "bte_hash",
-               .data     = &bte_hash,
-               .maxlen   = sizeof(int),
-               .mode     = 0644,
-               .proc_handler = &proc_dointvec
-       },
-       {
-               INIT_CTL_NAME(12)
-               .procname = "bte_adapt",
-               .data     = &bte_adapt,
+               .procname = "bte_dlvr_mode",
+               .data     = &bte_dlvr_mode,
                .maxlen   = sizeof(int),
                .mode     = 0644,
                .proc_handler = &proc_dointvec
                .maxlen   = sizeof(int),
                .mode     = 0644,
                .proc_handler = &proc_dointvec
@@ -420,6 +428,45 @@ static cfs_sysctl_table_t kgnilnd_ctl_table[] = {
                .mode     = 0444,
                .proc_handler = &proc_dointvec
        },
                .mode     = 0444,
                .proc_handler = &proc_dointvec
        },
+       {
+               INIT_CTL_NAME(32)
+               .procname = "sched_timeout",
+               .data     = &sched_timeout,
+               .maxlen   = sizeof(int),
+               .mode     = 0644,
+               .proc_handler = &proc_dointvec
+       },
+       {
+               INIT_CTL_NAME(33)
+               .procname = "sched_nice",
+               .data     = &sched_nice,
+               .maxlen   = sizeof(int),
+               .mode     = 0444,
+               .proc_handler = &proc_dointvec
+       },
+       {
+               INIT_CTL_NAME(34)
+               .procname = "reverse_rdma",
+               .data     = &reverse_rdma,
+               .maxlen   = sizeof(int),
+               .mode     = 0644,
+               .proc_handler = &proc_dointvec
+       },
+               INIT_CTL_NAME(35)
+               .procname = "dgram_timeout"
+               .data     = &dgram_timeout,
+               .maxlen   = sizeof(int),
+               .mode     = 0644,
+               .proc_handler = &proc_dointvec
+       },
+       {
+               INIT_CTL_NAME(36)
+               .procname = "peer_timeout"
+               .data     = &peer_timeout,
+               .maxlen   = sizeof(int),
+               .mode     = 0444,
+               .proc_handler = &proc_dointvec
+       },
        {0}
 };
 
        {0}
 };
 
index f161224..6170583 100644 (file)
@@ -29,6 +29,7 @@
 #define GNILND_PROC_MDD         "mdd"
 #define GNILND_PROC_SMSG        "smsg"
 #define GNILND_PROC_CONN        "conn"
 #define GNILND_PROC_MDD         "mdd"
 #define GNILND_PROC_SMSG        "smsg"
 #define GNILND_PROC_CONN        "conn"
+#define GNILND_PROC_PEER_CONNS  "peer_conns"
 #define GNILND_PROC_PEER        "peer"
 #define GNILND_PROC_CKSUM_TEST  "cksum_test"
 
 #define GNILND_PROC_PEER        "peer"
 #define GNILND_PROC_CKSUM_TEST  "cksum_test"
 
@@ -236,7 +237,10 @@ kgnilnd_proc_stats_read(char *page, char **start, off_t off,
                           "RDMA rx_bytes: %ld\n"
                           "VMAP short: %d\n"
                           "VMAP cksum: %d\n"
                           "RDMA rx_bytes: %ld\n"
                           "VMAP short: %d\n"
                           "VMAP cksum: %d\n"
-                          "KMAP short: %d\n",
+                          "KMAP short: %d\n"
+                          "RDMA REV length: %d\n"
+                          "RDMA REV offset: %d\n"
+                          "RDMA REV copy: %d\n",
                now.tv_sec, now.tv_usec,
                atomic_read(&kgnilnd_data.kgn_ntx),
                atomic_read(&kgnilnd_data.kgn_npeers),
                now.tv_sec, now.tv_usec,
                atomic_read(&kgnilnd_data.kgn_ntx),
                atomic_read(&kgnilnd_data.kgn_npeers),
@@ -262,7 +266,10 @@ kgnilnd_proc_stats_read(char *page, char **start, off_t off,
                atomic_read(&dev->gnd_rdma_nrx), atomic64_read(&dev->gnd_rdma_rxbytes),
                atomic_read(&kgnilnd_data.kgn_nvmap_short),
                atomic_read(&kgnilnd_data.kgn_nvmap_cksum),
                atomic_read(&dev->gnd_rdma_nrx), atomic64_read(&dev->gnd_rdma_rxbytes),
                atomic_read(&kgnilnd_data.kgn_nvmap_short),
                atomic_read(&kgnilnd_data.kgn_nvmap_cksum),
-               atomic_read(&kgnilnd_data.kgn_nkmap_short));
+               atomic_read(&kgnilnd_data.kgn_nkmap_short),
+               atomic_read(&kgnilnd_data.kgn_rev_length),
+               atomic_read(&kgnilnd_data.kgn_rev_offset),
+               atomic_read(&kgnilnd_data.kgn_rev_copy_buff));
 
        return rc;
 }
 
        return rc;
 }
@@ -899,6 +906,123 @@ static struct seq_operations kgn_conn_sops = {
 
 };
 
 
 };
 
+#define KGN_DEBUG_PEER_NID_DEFAULT -1
+static int kgnilnd_debug_peer_nid = KGN_DEBUG_PEER_NID_DEFAULT;
+
+static int
+kgnilnd_proc_peer_conns_write(struct file *file, const char *ubuffer,
+                             unsigned long count, void *data)
+{
+       char dummy[8];
+       int  rc;
+
+       if (count >= sizeof(dummy) || count == 0)
+               return -EINVAL;
+
+       if (copy_from_user(dummy, ubuffer, count))
+               return -EFAULT;
+
+       rc = sscanf(dummy, "%d", &kgnilnd_debug_peer_nid);
+
+       if (rc != 1) {
+               return -EINVAL;
+       }
+
+       RETURN(count);
+}
+
+/* debug data to print from conns associated with peer nid
+  -  date/time
+  -  peer nid
+  -  mbox_addr (msg_buffer + mbox_offset)
+  -  gnc_dgram_type
+  -  gnc_in_purgatory
+  -  gnc_state
+  -  gnc_error
+  -  gnc_peer_error
+  -  gnc_tx_seq
+  -  gnc_last_tx
+  -  gnc_last_tx_cq
+  -  gnc_rx_seq
+  -  gnc_first_rx
+  -  gnc_last_rx
+  -  gnc_last_rx_cq
+  -  gnc_tx_retrans
+  -  gnc_close_sent
+  -  gnc_close_recvd
+*/
+
+static int
+kgnilnd_proc_peer_conns_read(char *page, char **start, off_t off,
+                            int count, int *eof, void *data)
+{
+       kgn_peer_t      *peer;
+       kgn_conn_t      *conn;
+       struct tm       ctm;
+       struct timespec now;
+       unsigned long   jifs;
+       int             len = 0;
+       int             rc;
+
+       if (kgnilnd_debug_peer_nid == KGN_DEBUG_PEER_NID_DEFAULT) {
+               rc = sprintf(page, "peer_conns not initialized\n");
+               return rc;
+       }
+
+       /* sample date/time stamp - print time in UTC
+        * 2012-12-11T16:06:16.966751 123@gni ...
+        */
+       getnstimeofday(&now);
+       time_to_tm(now.tv_sec, 0, &ctm);
+       jifs = jiffies;
+
+       write_lock(&kgnilnd_data.kgn_peer_conn_lock);
+       peer = kgnilnd_find_peer_locked(kgnilnd_debug_peer_nid);
+
+       if (peer == NULL) {
+               rc = sprintf(page, "peer not found for this nid %d\n",
+                            kgnilnd_debug_peer_nid);
+               write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+               return rc;
+       }
+
+       list_for_each_entry(conn, &peer->gnp_conns, gnc_list) {
+               len += scnprintf(page, count - len,
+                       "%04ld-%02d-%02dT%02d:%02d:%02d.%06ld %s "
+                       "mbox adr %p "
+                       "dg type %s "
+                       "%s "
+                       "purg %d "
+                       "close s/r %d/%d "
+                       "err %d peer err %d "
+                       "tx sq %u %dms/%dms "
+                       "rx sq %u %dms/%dms/%dms "
+                       "tx retran %lld\n",
+                       ctm.tm_year+1900, ctm.tm_mon+1, ctm.tm_mday,
+                       ctm.tm_hour, ctm.tm_min, ctm.tm_sec, now.tv_nsec,
+                       libcfs_nid2str(peer->gnp_nid),
+                       conn->remote_mbox_addr,
+                       kgnilnd_conn_dgram_type2str(conn->gnc_dgram_type),
+                       kgnilnd_conn_state2str(conn),
+                       conn->gnc_in_purgatory,
+                       conn->gnc_close_sent,
+                       conn->gnc_close_recvd,
+                       conn->gnc_error,
+                       conn->gnc_peer_error,
+                       conn->gnc_tx_seq,
+                       jiffies_to_msecs(jifs - conn->gnc_last_tx),
+                       jiffies_to_msecs(jifs - conn->gnc_last_tx_cq),
+                       conn->gnc_rx_seq,
+                       jiffies_to_msecs(jifs - conn->gnc_first_rx),
+                       jiffies_to_msecs(jifs - conn->gnc_last_rx),
+                       jiffies_to_msecs(jifs - conn->gnc_last_rx_cq),
+                       conn->gnc_tx_retrans);
+       }
+
+       write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
+       return len;
+}
+
 static int
 kgnilnd_conn_seq_open(struct inode *inode, struct file *file)
 {
 static int
 kgnilnd_conn_seq_open(struct inode *inode, struct file *file)
 {
@@ -1092,11 +1216,12 @@ kgnilnd_peer_seq_show(struct seq_file *s, void *iter)
 
        read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
 
 
        read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
 
-       seq_printf(s, "%p->%s [%d] NIC 0x%x q %d conn %c purg %d "
+       seq_printf(s, "%p->%s [%d] %s NIC 0x%x q %d conn %c purg %d "
                "last %d@%dms dgram %d@%dms "
                "reconn %dms to %lus \n",
                peer, libcfs_nid2str(peer->gnp_nid),
                atomic_read(&peer->gnp_refcount),
                "last %d@%dms dgram %d@%dms "
                "reconn %dms to %lus \n",
                peer, libcfs_nid2str(peer->gnp_nid),
                atomic_read(&peer->gnp_refcount),
+               (peer->gnp_down == GNILND_RCA_NODE_DOWN) ? "down" : "up",
                peer->gnp_host_id,
                kgnilnd_count_list(&peer->gnp_tx_queue),
                conn_str,
                peer->gnp_host_id,
                kgnilnd_count_list(&peer->gnp_tx_queue),
                conn_str,
@@ -1219,18 +1344,32 @@ kgnilnd_proc_init(void)
        pde->data = NULL;
        pde->proc_fops = &kgn_conn_fops;
 
        pde->data = NULL;
        pde->proc_fops = &kgn_conn_fops;
 
+       /* Initialize peer conns debug */
+       pde = create_proc_entry(GNILND_PROC_PEER_CONNS, 0644, kgn_proc_root);
+       if (pde == NULL) {
+               CERROR("couldn't create proc entry %s\n", GNILND_PROC_PEER_CONNS);
+               rc = -ENOENT;
+               GOTO(remove_conn, rc);
+       }
+
+       pde->data = NULL;
+       pde->read_proc = kgnilnd_proc_peer_conns_read;
+       pde->write_proc = kgnilnd_proc_peer_conns_write;
+
        /* Initialize PEER */
        pde = create_proc_entry(GNILND_PROC_PEER, 0444, kgn_proc_root);
        if (pde == NULL) {
                CERROR("couldn't create proc entry %s\n", GNILND_PROC_PEER);
                rc = -ENOENT;
        /* Initialize PEER */
        pde = create_proc_entry(GNILND_PROC_PEER, 0444, kgn_proc_root);
        if (pde == NULL) {
                CERROR("couldn't create proc entry %s\n", GNILND_PROC_PEER);
                rc = -ENOENT;
-               GOTO(remove_conn, rc);
+               GOTO(remove_pc, rc);
        }
 
        pde->data = NULL;
        pde->proc_fops = &kgn_peer_fops;
        RETURN_EXIT;
 
        }
 
        pde->data = NULL;
        pde->proc_fops = &kgn_peer_fops;
        RETURN_EXIT;
 
+remove_pc:
+       remove_proc_entry(GNILND_PROC_PEER_CONNS, kgn_proc_root);
 remove_conn:
        remove_proc_entry(GNILND_PROC_CONN, kgn_proc_root);
 remove_smsg:
 remove_conn:
        remove_proc_entry(GNILND_PROC_CONN, kgn_proc_root);
 remove_smsg:
@@ -1250,6 +1389,7 @@ remove_dir:
 void
 kgnilnd_proc_fini(void)
 {
 void
 kgnilnd_proc_fini(void)
 {
+       remove_proc_entry(GNILND_PROC_PEER_CONNS, kgn_proc_root);
        remove_proc_entry(GNILND_PROC_PEER, kgn_proc_root);
        remove_proc_entry(GNILND_PROC_CONN, kgn_proc_root);
        remove_proc_entry(GNILND_PROC_MDD, kgn_proc_root);
        remove_proc_entry(GNILND_PROC_PEER, kgn_proc_root);
        remove_proc_entry(GNILND_PROC_CONN, kgn_proc_root);
        remove_proc_entry(GNILND_PROC_MDD, kgn_proc_root);
index 10ae493..0dec950 100644 (file)
@@ -20,6 +20,7 @@
  *
  */
 #include "gnilnd.h"
  *
  */
 #include "gnilnd.h"
+#include <rsms/rs_sm_states.h>
 
 /* Advance all timeouts by nap_time seconds. */
 void
 
 /* Advance all timeouts by nap_time seconds. */
 void
@@ -73,6 +74,7 @@ kgnilnd_bump_timeouts(__u32 nap_time, char *reason)
                                 * we'll back it up and schedule the conn to trigger
                                 * a NOOP */
                                conn->gnc_last_tx = jiffies - timeout;
                                 * we'll back it up and schedule the conn to trigger
                                 * a NOOP */
                                conn->gnc_last_tx = jiffies - timeout;
+                               if (conn->gnc_state != GNILND_CONN_DONE)
                                kgnilnd_schedule_conn(conn);
                        }
                }
                                kgnilnd_schedule_conn(conn);
                        }
                }
@@ -112,6 +114,8 @@ kgnilnd_quiesce_wait(char *reason)
                        wake_up_all(&dev->gnd_dgping_waitq);
                }
 
                        wake_up_all(&dev->gnd_dgping_waitq);
                }
 
+               kgnilnd_wakeup_rca_thread();
+
                /* we'll wait for 10x the timeout for the threads to pause */
                quiesce_to = cfs_time_seconds(*kgnilnd_tunables.kgn_timeout * 10);
                quiesce_deadline = (long) jiffies + quiesce_to;
                /* we'll wait for 10x the timeout for the threads to pause */
                quiesce_to = cfs_time_seconds(*kgnilnd_tunables.kgn_timeout * 10);
                quiesce_deadline = (long) jiffies + quiesce_to;
@@ -562,3 +566,211 @@ kgnilnd_critical_error(struct gni_err *err_handle)
                CDEBUG(D_NET, "stack reset bypassed because of shutdown\n");
        }
 }
                CDEBUG(D_NET, "stack reset bypassed because of shutdown\n");
        }
 }
+
+#if defined(GNILND_USE_RCA)
+#include <krca_lib.h>
+#define RCA_EVENTS 3
+/* RCA ticket is needed for krca_wakeup_wait_event() */
+static krca_ticket_t rca_krt = KRCA_NULL_TICKET;
+struct rcadata {
+       rca_ticket_t ticket;
+       int subscribed;
+       rs_event_code_t ec;
+};
+static struct rcadata rd[RCA_EVENTS] = {
+       {0, 0, ec_node_unavailable},
+       {0, 0, ec_node_available},
+       {0, 0, ec_node_failed}
+};
+
+/* thread for receiving rca events */
+int
+kgnilnd_rca(void *arg)
+{
+       int        i, rc;
+       int        retry_count;
+       rs_event_t event;
+       lnet_nid_t nid;
+
+       cfs_daemonize("kgnilnd_rca");
+       cfs_block_allsigs();
+
+       /* all gnilnd threads need to run fairly urgently */
+       set_user_nice(current, *kgnilnd_tunables.kgn_nice);
+
+       /*
+        * Register our service with RCA and subscribe to events
+        * of interest.
+        */
+       rca_krt = KRCA_NULL_TICKET;
+       rc = krca_register(&rca_krt, RCA_SVCTYPE_GNILND, current->pid, 0);
+       if (rc < 0) {
+               CNETERR("krca_register(%x) returned %d\n", current->pid, rc);
+               goto done;
+       }
+
+       for (i = 0; i < RCA_EVENTS; i++) {
+               retry_count = 0;
+subscribe_retry:
+               rc = krca_subscribe(&rca_krt, rd[i].ec, RCA_RX_SVC_ANY,
+                                   &rd[i].ticket);
+
+               if ((rc == -EINTR) && !retry_count) {
+                       retry_count++;
+                       CNETERR("krca_subscribe returned %d - retrying\n", rc);
+                       goto subscribe_retry;
+               }
+
+               if (rc < 0) {
+                       CNETERR("rca subscription failed (%d)\n", rc);
+                       goto done;
+               }
+
+               rd[i].subscribed = 1;
+       }
+
+       while (!kgnilnd_data.kgn_shutdown) {
+               if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
+                       KGNILND_SPIN_QUIESCE;
+               }
+               /* wait here for a subscribed event */
+               rc = krca_wait_event(&rca_krt);
+
+               /* RCA return values:
+                * 0 indicates krca_wakeup_wait_event caused krca_wait_event
+                *   return.
+                * -ERESTARTSYS indicates krca_wait_event returned because of a
+                *   signal.
+                * -ENOSPC indicates no space available to create an rcad_reg_t
+                * 1 indicates a message is waiting.
+                */
+               if (rc <= 0) {
+                       continue;
+               }
+
+               if (krca_get_message(&rca_krt, &event) == 0) {
+                       int node_down = GNILND_RCA_NODE_UNKNOWN;
+                       rs_state_t state;
+                       CFS_LIST_HEAD(zombies);
+
+                       /* Compute nodes don't care about other compute nodes
+                        * so we don't need to create a peer.
+                        */
+                       if (GNILND_COMPUTE &&
+                           !RSN_GET_FLD(event.ev_gen.svid_node.rs_node_flat,
+                                       IS_SVC)) {
+                               continue;
+                       }
+
+                       /* Only care about compute and service nodes not GPUs */
+                       if (RSN_GET_FLD(event.ev_gen.svid_node.rs_node_flat,
+                                       TYPE) != rt_node) {
+                               continue;
+                       }
+
+                       switch (event.ev_id) {
+                       case ec_node_available:
+                               CDEBUG(D_INFO, "ec_node_available\n");
+                               node_down = GNILND_RCA_NODE_UP;
+                               break;
+                       case ec_node_failed:
+                               CDEBUG(D_INFO, "ec_node_failed\n");
+                               if (event.ev_len > 0) {
+                                       CDEBUG(D_ERROR,
+                                               "ec_node_failed ignored\n");
+                                       break;
+                               }
+                               node_down = GNILND_RCA_NODE_DOWN;
+                               break;
+                       case ec_node_unavailable:
+                               state = RSN_GET_FLD(event.ev_gen.svid_node.rsn_intval, STATE);
+
+                               CDEBUG(D_INFO, "ec_node_unavailable\n");
+
+                               /*
+                                * Ignore overloaded ec_node_unavailable events
+                                * generated by 'xtcli set_reserve'.
+                                */
+                               if (RS_GET_CS_STATE(state) == RS_CS_READY) {
+                                       CDEBUG(D_INFO, "ignoring "
+                                               "ec_node_unavailable event with"
+                                               " RS_CS_READY state\n");
+                                       break;
+                               }
+                               node_down = GNILND_RCA_NODE_DOWN;
+                               break;
+                       default:
+                               CDEBUG(D_INFO, "unknown event\n");
+                               break;
+                       }
+
+                       /* if we get an event we don't know about, just go ahead
+                        * and wait for another event */
+                       if (node_down == GNILND_RCA_NODE_UNKNOWN) {
+                               continue;
+                       }
+
+                       nid = RSN_GET_FLD(event.ev_gen.svid_node.rs_node_flat,
+                                         NID);
+                       CDEBUG(D_INFO,"kgnilnd_rca() reporting nid %d %s\n",
+                              (int)nid, node_down ? "down" : "up");
+                       kgnilnd_report_node_state(nid, node_down);
+
+               } else {
+                       CNETERR("krca_get_message failed\n");
+               }
+       }
+
+done:
+       CDEBUG(D_INFO, "done\n");
+
+       for (i = 0; i < RCA_EVENTS; i++) {
+               if (rd[i].subscribed) {
+                       rc = krca_unsubscribe(&rca_krt, rd[i].ticket);
+
+                       if (rc) {
+                               CNETERR("rca unsubscribe failed (%d)\n", rc);
+                       }
+
+                       rd[i].subscribed = 0;
+               }
+       }
+
+       krca_unregister(&rca_krt);
+       kgnilnd_thread_fini();
+       return 0;
+
+}
+
+int
+kgnilnd_start_rca_thread(void)
+{
+       return kgnilnd_thread_start(kgnilnd_rca, NULL, "kgnilnd_rca", 0);
+}
+
+void
+kgnilnd_wakeup_rca_thread(void)
+{
+       int ret;
+
+       ret = krca_wakeup_wait_event(&rca_krt);
+
+       if (ret) {
+               CDEBUG(D_ERROR, "krca_wakeup_wait_event failed\n");
+       }
+}
+
+#else /* GNILND_USE_RCA */
+
+int
+kgnilnd_start_rca_thread(void)
+{
+       return 0;
+}
+
+void
+kgnilnd_wakeup_rca_thread(void)
+{
+}
+
+#endif /* GNILND_USE_RCA */
index cd33d3e..0ee1204 100644 (file)
 
 #include "gnilnd.h"
 
 
 #include "gnilnd.h"
 
+#define GNILND_RCA_INJ_STRLEN 16
 typedef struct kgn_sysctl_data {
        int                     ksd_pause_trigger;
        int                     ksd_quiesce_secs;
        int                     ksd_rdmaq_override;
 typedef struct kgn_sysctl_data {
        int                     ksd_pause_trigger;
        int                     ksd_quiesce_secs;
        int                     ksd_rdmaq_override;
+       char                    ksd_rca_inject[GNILND_RCA_INJ_STRLEN];
 } kgn_sysctl_data_t;
 
 static kgn_sysctl_data_t        kgnilnd_sysctl;
 } kgn_sysctl_data_t;
 
 static kgn_sysctl_data_t        kgnilnd_sysctl;
@@ -45,6 +47,7 @@ enum {
        GNILND_HW_QUIESCE,
        GNILND_STACK_RESET,
        GNILND_RDMAQ_OVERRIDE,
        GNILND_HW_QUIESCE,
        GNILND_STACK_RESET,
        GNILND_RDMAQ_OVERRIDE,
+       GNILND_RCA_INJECT,
 };
 #else
 #define GNILND_VERSION             CTL_UNNUMBERED
 };
 #else
 #define GNILND_VERSION             CTL_UNNUMBERED
@@ -52,6 +55,7 @@ enum {
 #define GNILND_HW_QUIESCE          CTL_UNNUMBERED
 #define GNILND_STACK_RESET         CTL_UNNUMBERED
 #define GNILND_RDMAQ_OVERRIDE      CTL_UNNUMBERED
 #define GNILND_HW_QUIESCE          CTL_UNNUMBERED
 #define GNILND_STACK_RESET         CTL_UNNUMBERED
 #define GNILND_RDMAQ_OVERRIDE      CTL_UNNUMBERED
+#define GNILND_RCA_INJECT          CTL_UNNUMBERED
 #endif
 
 static int LL_PROC_PROTO(proc_toggle_thread_pause)
 #endif
 
 static int LL_PROC_PROTO(proc_toggle_thread_pause)
@@ -172,6 +176,63 @@ static int LL_PROC_PROTO(proc_toggle_rdmaq_override)
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+/* /proc/sys entry point for injecting up/down nid event
+ * <up|down> <nid>
+ */
+static int LL_PROC_PROTO(proc_rca_inject)
+{
+       int             rc;
+       int             nid;
+       int             node_down;
+       char            command[10];
+       ENTRY;
+
+       rc = ll_proc_dostring(table, write, filp, buffer, lenp, ppos);
+
+       if (!write) {
+               /* read */
+               RETURN(rc);
+       }
+
+       if (kgnilnd_data.kgn_init != GNILND_INIT_ALL) {
+               rc = -EINVAL;
+               RETURN(rc);
+       }
+
+       /* convert to nid, up/down values */
+       rc = sscanf(kgnilnd_sysctl.ksd_rca_inject, "%s %d", command, &nid);
+       CDEBUG(D_INFO, "command %s, nid %d\n", command, nid);
+
+       if (rc != 2) {
+               CDEBUG(D_ERROR, "invalid parameter\n");
+               RETURN(rc);
+       } else {
+               switch (command[0]) {
+               case 'd': /* down */
+                       node_down = 1;
+                       CDEBUG(D_INFO, "take node %d down\n", nid);
+                       break;
+               case 'u': /* up */
+                       node_down = 0;
+                       CDEBUG(D_INFO, "bring node %d up\n", nid);
+                       break;
+               default:
+                       CDEBUG(D_ERROR, "invalid command %s\n", command);
+                       RETURN(-EINVAL);
+               }
+       }
+
+       CDEBUG(D_INFO, "proc_rca_inject: reporting node_down %d, nid %d\n",
+                     node_down, nid);
+       rc = kgnilnd_report_node_state(nid, node_down);
+
+       if (rc) {
+               rc = -EINVAL;
+       }
+
+       RETURN(rc);
+}
+
 static cfs_sysctl_table_t kgnilnd_table[] = {
        /*
         * NB No .strategy entries have been provided since sysctl(8) prefers
 static cfs_sysctl_table_t kgnilnd_table[] = {
        /*
         * NB No .strategy entries have been provided since sysctl(8) prefers
@@ -217,6 +278,14 @@ static cfs_sysctl_table_t kgnilnd_table[] = {
                .mode     = 0644,
                .proc_handler = &proc_toggle_rdmaq_override,
        },
                .mode     = 0644,
                .proc_handler = &proc_toggle_rdmaq_override,
        },
+       {
+               INIT_CTL_NAME(GNILND_RCA_INJECT)
+               .procname = "rca_inject",
+               .data     = kgnilnd_sysctl.ksd_rca_inject,
+               .maxlen   = GNILND_RCA_INJ_STRLEN,
+               .mode     = 0644,
+               .proc_handler = &proc_rca_inject,
+       },
        {       INIT_CTL_NAME(0)   }
 };
 
        {       INIT_CTL_NAME(0)   }
 };
 
index 10f6278..360a0cb 100644 (file)
@@ -1 +1,23 @@
+/*
+ * Copyright (C) 2012 Cray, Inc.
+ *
+ *   Author: Nic Henke <nic@cray.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
 #define KGNILND_BUILD_REV        SVN_CODE_REV
 #define KGNILND_BUILD_REV        SVN_CODE_REV