Whamcloud - gitweb
LU-946 lprocfs: List open files in filesystem
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_tx.c
index a3eac04..1fa8d05 100644 (file)
@@ -1,19 +1,41 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
+/*
+ * GPL HEADER START
  *
- * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
- *   Author: PJ Kirner <pjkirner@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   This file is confidential source code owned by Cluster File Systems.
- *   No viewing, modification, compilation, redistribution, or any other
- *   form of use is permitted except through a signed license agreement.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you have not signed such an agreement, then you have no rights to
- *   this file.  Please destroy it immediately and contact CFS.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/klnds/ptllnd/ptllnd_tx.c
+ *
+ * Author: PJ Kirner <pjkirner@clusterfs.com>
  */
 
  #include "ptllnd.h"
@@ -22,19 +44,17 @@ void
 kptllnd_free_tx(kptl_tx_t *tx)
 {
         if (tx->tx_msg != NULL)
-                LIBCFS_FREE(tx->tx_msg, 
-                            *kptllnd_tunables.kptl_max_msg_size);
+                LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
                         
-        if (tx->tx_rdma_frags != NULL)
-                LIBCFS_FREE(tx->tx_rdma_frags, 
-                            sizeof(*tx->tx_rdma_frags));
+        if (tx->tx_frags != NULL)
+                LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
 
         LIBCFS_FREE(tx, sizeof(*tx));
 
-        atomic_dec(&kptllnd_data.kptl_ntx);
+        cfs_atomic_dec(&kptllnd_data.kptl_ntx);
 
         /* Keep the tunable in step for visibility */
-        *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
+        *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
 }
 
 kptl_tx_t *
@@ -48,10 +68,10 @@ kptllnd_alloc_tx(void)
                 return NULL;
         }
 
-        atomic_inc(&kptllnd_data.kptl_ntx);
+        cfs_atomic_inc(&kptllnd_data.kptl_ntx);
 
         /* Keep the tunable in step for visibility */
-        *kptllnd_tunables.kptl_ntx = atomic_read(&kptllnd_data.kptl_ntx);
+        *kptllnd_tunables.kptl_ntx = cfs_atomic_read(&kptllnd_data.kptl_ntx);
 
         tx->tx_idle = 1;
         tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
@@ -59,16 +79,17 @@ kptllnd_alloc_tx(void)
         tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
         tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
         tx->tx_msg = NULL;
-        tx->tx_rdma_frags = NULL;
+        tx->tx_peer = NULL;
+        tx->tx_frags = NULL;
                 
-        LIBCFS_ALLOC(tx->tx_msg, *kptllnd_tunables.kptl_max_msg_size);
+        LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
         if (tx->tx_msg == NULL) {
                 CERROR("Failed to allocate TX payload\n");
                 goto failed;
         }
 
-        LIBCFS_ALLOC(tx->tx_rdma_frags, sizeof(*tx->tx_rdma_frags));
-        if (tx->tx_rdma_frags == NULL) {
+        LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
+        if (tx->tx_frags == NULL) {
                 CERROR("Failed to allocate TX frags\n");
                 goto failed;
         }
@@ -85,20 +106,17 @@ kptllnd_setup_tx_descs()
 {
         int       n = *kptllnd_tunables.kptl_ntx;
         int       i;
-        
+
         for (i = 0; i < n; i++) {
                 kptl_tx_t *tx = kptllnd_alloc_tx();
-                
                 if (tx == NULL)
                         return -ENOMEM;
-                
-                spin_lock(&kptllnd_data.kptl_tx_lock);
-                
-                list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
-                
-                spin_unlock(&kptllnd_data.kptl_tx_lock);
+
+               spin_lock(&kptllnd_data.kptl_tx_lock);
+                cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
+               spin_unlock(&kptllnd_data.kptl_tx_lock);
         }
-        
+
         return 0;
 }
 
@@ -110,15 +128,15 @@ kptllnd_cleanup_tx_descs()
         /* No locking; single threaded now */
         LASSERT (kptllnd_data.kptl_shutdown == 2);
 
-        while (!list_empty(&kptllnd_data.kptl_idle_txs)) {
-                tx = list_entry(kptllnd_data.kptl_idle_txs.next,
-                                kptl_tx_t, tx_list);
-                
-                list_del(&tx->tx_list);
+        while (!cfs_list_empty(&kptllnd_data.kptl_idle_txs)) {
+                tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next,
+                                    kptl_tx_t, tx_list);
+
+                cfs_list_del(&tx->tx_list);
                 kptllnd_free_tx(tx);
         }
 
-        LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);
+        LASSERT (cfs_atomic_read(&kptllnd_data.kptl_ntx) == 0);
 }
 
 kptl_tx_t *
@@ -126,13 +144,13 @@ kptllnd_get_idle_tx(enum kptl_tx_type type)
 {
         kptl_tx_t      *tx = NULL;
 
-        if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) && 
+        if (IS_SIMULATION_ENABLED(FAIL_TX_PUT_ALLOC) &&
             type == TX_TYPE_PUT_REQUEST) {
                 CERROR("FAIL_TX_PUT_ALLOC SIMULATION triggered\n");
                 return NULL;
         }
 
-        if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) && 
+        if (IS_SIMULATION_ENABLED(FAIL_TX_GET_ALLOC) &&
             type == TX_TYPE_GET_REQUEST) {
                 CERROR ("FAIL_TX_GET_ALLOC SIMULATION triggered\n");
                 return NULL;
@@ -143,23 +161,23 @@ kptllnd_get_idle_tx(enum kptl_tx_type type)
                 return NULL;
         }
 
-        spin_lock(&kptllnd_data.kptl_tx_lock);
+       spin_lock(&kptllnd_data.kptl_tx_lock);
 
-        if (list_empty (&kptllnd_data.kptl_idle_txs)) {
-                spin_unlock(&kptllnd_data.kptl_tx_lock);
+        if (cfs_list_empty (&kptllnd_data.kptl_idle_txs)) {
+               spin_unlock(&kptllnd_data.kptl_tx_lock);
 
                 tx = kptllnd_alloc_tx();
                 if (tx == NULL)
                         return NULL;
         } else {
-                tx = list_entry(kptllnd_data.kptl_idle_txs.next, 
-                                kptl_tx_t, tx_list);
-                list_del(&tx->tx_list);
+                tx = cfs_list_entry(kptllnd_data.kptl_idle_txs.next, 
+                                    kptl_tx_t, tx_list);
+                cfs_list_del(&tx->tx_list);
 
-                spin_unlock(&kptllnd_data.kptl_tx_lock);
+               spin_unlock(&kptllnd_data.kptl_tx_lock);
         }
 
-        LASSERT (atomic_read(&tx->tx_refcount)== 0);
+        LASSERT (cfs_atomic_read(&tx->tx_refcount)== 0);
         LASSERT (tx->tx_idle);
         LASSERT (!tx->tx_active);
         LASSERT (tx->tx_lnet_msg == NULL);
@@ -169,9 +187,11 @@ kptllnd_get_idle_tx(enum kptl_tx_type type)
         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
         
         tx->tx_type = type;
-        atomic_set(&tx->tx_refcount, 1);
+        cfs_atomic_set(&tx->tx_refcount, 1);
         tx->tx_status = 0;
         tx->tx_idle = 0;
+        tx->tx_tposted = 0;
+        tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
 
         CDEBUG(D_NET, "tx=%p\n", tx);
         return tx;
@@ -186,17 +206,17 @@ kptllnd_tx_abort_netio(kptl_tx_t *tx)
         ptl_handle_md_t  rdma_mdh;
         unsigned long    flags;
 
-        LASSERT (atomic_read(&tx->tx_refcount) == 0);
+        LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
         LASSERT (!tx->tx_active);
 
-        spin_lock_irqsave(&peer->peer_lock, flags);
+       spin_lock_irqsave(&peer->peer_lock, flags);
 
         msg_mdh = tx->tx_msg_mdh;
         rdma_mdh = tx->tx_rdma_mdh;
 
         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+               spin_unlock_irqrestore(&peer->peer_lock, flags);
                 return 0;
         }
         
@@ -207,11 +227,11 @@ kptllnd_tx_abort_netio(kptl_tx_t *tx)
                   tx->tx_lnet_replymsg == NULL));
 
         /* stash the tx on its peer until it completes */
-        atomic_set(&tx->tx_refcount, 1);
+        cfs_atomic_set(&tx->tx_refcount, 1);
         tx->tx_active = 1;
-        list_add_tail(&tx->tx_list, &peer->peer_activeq);
+        cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
         
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
+       spin_unlock_irqrestore(&peer->peer_lock, flags);
 
         /* These unlinks will ensure completion events (normal or unlink) will
          * happen ASAP */
@@ -234,17 +254,17 @@ kptllnd_tx_abort_netio(kptl_tx_t *tx)
         unsigned long    flags;
         ptl_err_t        prc;
 
-        LASSERT (atomic_read(&tx->tx_refcount) == 0);
+        LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
         LASSERT (!tx->tx_active);
 
-        spin_lock_irqsave(&peer->peer_lock, flags);
+       spin_lock_irqsave(&peer->peer_lock, flags);
 
         msg_mdh = tx->tx_msg_mdh;
         rdma_mdh = tx->tx_rdma_mdh;
 
         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+               spin_unlock_irqrestore(&peer->peer_lock, flags);
                 return 0;
         }
         
@@ -254,7 +274,7 @@ kptllnd_tx_abort_netio(kptl_tx_t *tx)
                  (tx->tx_lnet_msg == NULL && 
                   tx->tx_replymsg == NULL));
 
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
+       spin_unlock_irqrestore(&peer->peer_lock, flags);
 
         if (!PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE)) {
                 prc = PtlMDUnlink(msg_mdh);
@@ -268,7 +288,7 @@ kptllnd_tx_abort_netio(kptl_tx_t *tx)
                         rdma_mdh = PTL_INVALID_HANDLE;
         }
 
-        spin_lock_irqsave(&peer->peer_lock, flags);
+       spin_lock_irqsave(&peer->peer_lock, flags);
 
         /* update tx_???_mdh if callback hasn't fired */
         if (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE))
@@ -283,18 +303,18 @@ kptllnd_tx_abort_netio(kptl_tx_t *tx)
 
         if (PtlHandleIsEqual(msg_mdh, PTL_INVALID_HANDLE) &&
             PtlHandleIsEqual(rdma_mdh, PTL_INVALID_HANDLE)) {
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+               spin_unlock_irqrestore(&peer->peer_lock, flags);
                 return 0;
         }
 
         /* stash the tx on its peer until it completes */
-        atomic_set(&tx->tx_refcount, 1);
+        cfs_atomic_set(&tx->tx_refcount, 1);
         tx->tx_active = 1;
-        list_add_tail(&tx->tx_list, &peer->peer_activeq);
+        cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
 
         kptllnd_peer_addref(peer);              /* extra ref for me... */
 
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
+       spin_unlock_irqrestore(&peer->peer_lock, flags);
 
         /* This will get the watchdog thread to try aborting all the peer's
          * comms again.  NB, this deems it fair that 1 failing tx which can't
@@ -311,24 +331,24 @@ kptllnd_tx_abort_netio(kptl_tx_t *tx)
 void
 kptllnd_tx_fini (kptl_tx_t *tx)
 {
-        lnet_msg_t     *replymsg = tx->tx_lnet_replymsg;
-        lnet_msg_t     *msg      = tx->tx_lnet_msg;
-        kptl_peer_t    *peer     = tx->tx_peer;
-        int             status   = tx->tx_status;
-        int             rc;
-
-        LASSERT (!in_interrupt());
-        LASSERT (atomic_read(&tx->tx_refcount) == 0);
-        LASSERT (!tx->tx_idle);
-        LASSERT (!tx->tx_active);
+       lnet_msg_t     *replymsg = tx->tx_lnet_replymsg;
+       lnet_msg_t     *msg      = tx->tx_lnet_msg;
+       kptl_peer_t    *peer     = tx->tx_peer;
+       int             status   = tx->tx_status;
+       int             rc;
 
-        /* TX has completed or failed */
+       LASSERT (!in_interrupt());
+       LASSERT (cfs_atomic_read(&tx->tx_refcount) == 0);
+       LASSERT (!tx->tx_idle);
+       LASSERT (!tx->tx_active);
 
-        if (peer != NULL) {
-                rc = kptllnd_tx_abort_netio(tx);
-                if (rc != 0)
-                        return;
-        }
+       /* TX has completed or failed */
+
+       if (peer != NULL) {
+               rc = kptllnd_tx_abort_netio(tx);
+               if (rc != 0)
+                       return;
+       }
 
         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
@@ -337,17 +357,16 @@ kptllnd_tx_fini (kptl_tx_t *tx)
         tx->tx_peer = NULL;
         tx->tx_idle = 1;
 
-        spin_lock(&kptllnd_data.kptl_tx_lock);
-        list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
-        spin_unlock(&kptllnd_data.kptl_tx_lock);
+       spin_lock(&kptllnd_data.kptl_tx_lock);
+        cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_idle_txs);
+       spin_unlock(&kptllnd_data.kptl_tx_lock);
 
         /* Must finalize AFTER freeing 'tx' */
         if (msg != NULL)
-                lnet_finalize(kptllnd_data.kptl_ni, msg,
-                              (replymsg == NULL) ? status : 0);
+                lnet_finalize(NULL, msg, (replymsg == NULL) ? status : 0);
 
         if (replymsg != NULL)
-                lnet_finalize(kptllnd_data.kptl_ni, replymsg, status);
+                lnet_finalize(NULL, replymsg, status);
 
         if (peer != NULL)
                 kptllnd_peer_decref(peer);
@@ -401,11 +420,12 @@ kptllnd_tx_callback(ptl_event_t *ev)
 #else
         unlinked = (ev->type == PTL_EVENT_UNLINK);
 #endif
-        CDEBUG(D_NETTRACE, "%s[%d/%d]: %s(%d) tx=%p fail=%d unlinked=%d\n",
-               libcfs_id2str(peer->peer_id),
-               peer->peer_credits, peer->peer_outstanding_credits,
+        CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
+               libcfs_id2str(peer->peer_id), peer->peer_credits,
+               peer->peer_outstanding_credits, peer->peer_sent_credits,
                kptllnd_evtype2str(ev->type), ev->type, 
-               tx, ev->ni_fail_type, unlinked);
+               tx, kptllnd_errtype2str(ev->ni_fail_type),
+               ev->ni_fail_type, unlinked);
 
         switch (tx->tx_type) {
         default:
@@ -414,24 +434,26 @@ kptllnd_tx_callback(ptl_event_t *ev)
         case TX_TYPE_SMALL_MESSAGE:
                 LASSERT (ismsg);
                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
-                         ev->type == PTL_EVENT_SEND_END);
+                         ev->type == PTL_EVENT_SEND_END ||
+                         (ev->type == PTL_EVENT_ACK && tx->tx_acked));
                 break;
 
         case TX_TYPE_PUT_REQUEST:
                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
+                         (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
                          (!ismsg && ev->type == PTL_EVENT_GET_END));
                 break;
 
         case TX_TYPE_GET_REQUEST:
                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
+                         (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
                          (!ismsg && ev->type == PTL_EVENT_PUT_END));
 
                 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
                         if (ev->hdr_data == PTLLND_RDMA_OK) {
-                                lnet_set_reply_msg_len(
-                                        kptllnd_data.kptl_ni,
+                                lnet_set_reply_msg_len(NULL,
                                         tx->tx_lnet_replymsg,
                                         ev->mlength);
                         } else {
@@ -451,19 +473,27 @@ kptllnd_tx_callback(ptl_event_t *ev)
         case TX_TYPE_GET_RESPONSE:
                 LASSERT (!ismsg);
                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
-                         ev->type == PTL_EVENT_SEND_END);
+                         ev->type == PTL_EVENT_SEND_END ||
+                         (ev->type == PTL_EVENT_ACK && tx->tx_acked));
                 break;
         }
 
-        if (!ok)
-                kptllnd_peer_close(peer, -EIO);
-        else
+        if (ok) {
                 kptllnd_peer_alive(peer);
+        } else {
+                CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
+                       libcfs_id2str(peer->peer_id),
+                       kptllnd_evtype2str(ev->type), ev->type, 
+                       tx, kptllnd_errtype2str(ev->ni_fail_type),
+                       ev->ni_fail_type, unlinked);
+                tx->tx_status = -EIO; 
+                kptllnd_peer_close(peer, -EIO);
+        }
 
         if (!unlinked)
                 return;
 
-        spin_lock_irqsave(&peer->peer_lock, flags);
+       spin_lock_irqsave(&peer->peer_lock, flags);
 
         if (ismsg)
                 tx->tx_msg_mdh = PTL_INVALID_HANDLE;
@@ -473,23 +503,24 @@ kptllnd_tx_callback(ptl_event_t *ev)
         if (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ||
             !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ||
             !tx->tx_active) {
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
+               spin_unlock_irqrestore(&peer->peer_lock, flags);
                 return;
         }
 
-        list_del(&tx->tx_list);
+        cfs_list_del(&tx->tx_list);
         tx->tx_active = 0;
 
-        spin_unlock_irqrestore(&peer->peer_lock, flags);
+       spin_unlock_irqrestore(&peer->peer_lock, flags);
 
-        /* drop peer's ref, but if it was the last one... */
-        if (atomic_dec_and_test(&tx->tx_refcount)) {
-                /* ...finalize it in thread context! */
-                spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
+       /* drop peer's ref, but if it was the last one... */
+       if (cfs_atomic_dec_and_test(&tx->tx_refcount)) {
+               /* ...finalize it in thread context! */
+               spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
 
-                list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
-                wake_up(&kptllnd_data.kptl_sched_waitq);
+               cfs_list_add_tail(&tx->tx_list, &kptllnd_data.kptl_sched_txq);
+               wake_up(&kptllnd_data.kptl_sched_waitq);
 
-                spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
-        }
+               spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
+                                          flags);
+       }
 }