Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_cb.c
index 89456ac..d3227fb 100644 (file)
@@ -1,19 +1,41 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
- *   Author: PJ Kirner <pjkirner@clusterfs.com>
+ * GPL HEADER START
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is confidential source code owned by Cluster File Systems.
- *   No viewing, modification, compilation, redistribution, or any other
- *   form of use is permitted except through a signed license agreement.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   If you have not signed such an agreement, then you have no rights to
- *   this file.  Please destroy it immediately and contact CFS.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/klnds/ptllnd/ptllnd_cb.c
+ *
+ * Author: PJ Kirner <pjkirner@clusterfs.com>
  */
 
 #include "ptllnd.h"
@@ -127,7 +149,7 @@ kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
         
         memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
 
-        tx->tx_rdma_md.start     = tx->tx_rdma_frags;
+        tx->tx_rdma_md.start     = tx->tx_frags;
         tx->tx_rdma_md.user_ptr  = &tx->tx_rdma_eventarg;
         tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
         tx->tx_rdma_md.options   = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
@@ -151,7 +173,7 @@ kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
                 break;
                 
         case TX_TYPE_GET_RESPONSE:              /* active: I put */
-                tx->tx_rdma_md.threshold = 1;   /* SEND */
+                tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1;   /* SEND + ACK? */
                 break;
         }
 
@@ -164,7 +186,7 @@ kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
         if (iov != NULL) {
                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
                 tx->tx_rdma_md.length = 
-                        lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+                        lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
                                          niov, iov, offset, nob);
                 return;
         }
@@ -180,20 +202,20 @@ kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
         
         tx->tx_rdma_md.options |= PTL_MD_KIOV;
         tx->tx_rdma_md.length = 
-                lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->kiov,
+                lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
                                   niov, kiov, offset, nob);
 #else
         if (iov != NULL) {
                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
                 tx->tx_rdma_md.length = 
-                        kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+                        kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
                                             niov, iov, offset, nob);
                 return;
         }
 
         tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
         tx->tx_rdma_md.length =
-                kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+                kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
                                      niov, kiov, offset, nob);
 #endif
 }
@@ -227,13 +249,14 @@ kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
         ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md, 
                           PTL_UNLINK, &mdh);
         if (ptlrc != PTL_OK) {
-                CERROR("PtlMDBind(%s) failed: %d\n",
-                       libcfs_id2str(peer->peer_id), ptlrc);
+                CERROR("PtlMDBind(%s) failed: %s(%d)\n",
+                       libcfs_id2str(peer->peer_id),
+                       kptllnd_errtype2str(ptlrc), ptlrc);
                 tx->tx_status = -EIO;
                 kptllnd_tx_decref(tx);
                 return -EIO;
         }
-        
+
         spin_lock_irqsave(&peer->peer_lock, flags);
 
         tx->tx_lnet_msg = lntmsg;
@@ -249,9 +272,11 @@ kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
 
         spin_unlock_irqrestore(&peer->peer_lock, flags);
 
+        tx->tx_tposted = jiffies;
+
         if (type == TX_TYPE_GET_RESPONSE)
                 ptlrc = PtlPut(mdh,
-                               PTL_NOACK_REQ,
+                               tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
                                rx->rx_initiator,
                                *kptllnd_tunables.kptl_portal,
                                0,                     /* acl cookie */
@@ -269,8 +294,9 @@ kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
                                0);                    /* offset */
 
         if (ptlrc != PTL_OK) {
-                CERROR("Ptl%s failed: %d\n", 
-                       (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get", ptlrc);
+                CERROR("Ptl%s failed: %s(%d)\n", 
+                       (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",
+                       kptllnd_errtype2str(ptlrc), ptlrc);
                 
                 kptllnd_peer_close(peer, -EIO);
                 /* Everything (including this RDMA) queued on the peer will
@@ -293,8 +319,11 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
         unsigned int      payload_offset = lntmsg->msg_offset;
         unsigned int      payload_nob = lntmsg->msg_len;
+        kptl_peer_t      *peer;
         kptl_tx_t        *tx;
         int               nob;
+        int               nfrag;
+        int               rc;
 
         LASSERT (payload_nob == 0 || payload_niov > 0);
         LASSERT (payload_niov <= LNET_MAX_IOV);
@@ -302,6 +331,10 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
         LASSERT (!in_interrupt());
 
+        rc = kptllnd_find_target(&peer, target);
+        if (rc != 0)
+                return rc;
+        
         switch (type) {
         default:
                 LBUG();
@@ -309,9 +342,10 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
 
         case LNET_MSG_REPLY:
         case LNET_MSG_PUT:
-                /* Is the payload small enough not to need RDMA? */
+                /* Should the payload avoid RDMA? */
                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
-                if (nob <= *kptllnd_tunables.kptl_max_msg_size)
+                if (payload_kiov == NULL && 
+                    nob <= peer->peer_max_msg_size)
                         break;
 
                 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
@@ -319,7 +353,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                         CERROR("Can't send %s to %s: can't allocate descriptor\n",
                                lnet_msgtyp2str(type),
                                libcfs_id2str(target));
-                        return -ENOMEM;
+                        rc = -ENOMEM;
+                        goto out;
                 }
 
                 kptllnd_init_rdma_md(tx, payload_niov, 
@@ -330,8 +365,13 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
                                   sizeof(kptl_rdma_msg_t));
-                kptllnd_tx_launch(tx, target);
-                return 0;
+
+                CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
+                       libcfs_id2str(target),
+                       le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
+
+                kptllnd_tx_launch(peer, tx, 0);
+                goto out;
 
         case LNET_MSG_GET:
                 /* routed gets don't RDMA */
@@ -342,14 +382,15 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 nob = lntmsg->msg_md->md_length;
                 nob = offsetof(kptl_msg_t, 
                                ptlm_u.immediate.kptlim_payload[nob]);
-                if (nob <= *kptllnd_tunables.kptl_max_msg_size)
+                if (nob <= peer->peer_max_msg_size)
                         break;
 
                 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
                 if (tx == NULL) {
                         CERROR("Can't send GET to %s: can't allocate descriptor\n",
                                libcfs_id2str(target));
-                        return -ENOMEM;
+                        rc = -ENOMEM;
+                        goto out;
                 }
 
                 tx->tx_lnet_replymsg =
@@ -358,7 +399,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                         CERROR("Failed to allocate LNET reply for %s\n",
                                libcfs_id2str(target));
                         kptllnd_tx_decref(tx);
-                        return -ENOMEM;
+                        rc = -ENOMEM;
+                        goto out;
                 }
 
                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
@@ -374,8 +416,13 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
                                   sizeof(kptl_rdma_msg_t));
-                kptllnd_tx_launch(tx, target);
-                return 0;
+
+                CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
+                       libcfs_id2str(target),
+                       le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
+
+                kptllnd_tx_launch(peer, tx, 0);
+                goto out;
 
         case LNET_MSG_ACK:
                 CDEBUG(D_NET, "LNET_MSG_ACK\n");
@@ -383,33 +430,59 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 break;
         }
 
+        /* I don't have to handle kiovs */
+        LASSERT (payload_nob == 0 || payload_iov != NULL);
+
         tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
         if (tx == NULL) {
                 CERROR("Can't send %s to %s: can't allocate descriptor\n",
                        lnet_msgtyp2str(type), libcfs_id2str(target));
-                        return -ENOMEM;
+                rc = -ENOMEM;
+                goto out;
         }
 
         tx->tx_lnet_msg = lntmsg;
         tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
 
-        if (payload_kiov != NULL)
-                lnet_copy_kiov2flat(*kptllnd_tunables.kptl_max_msg_size,
-                                    tx->tx_msg->ptlm_u.immediate.kptlim_payload,
-                                    0,
-                                    payload_niov, payload_kiov,
-                                    payload_offset, payload_nob);
-        else
-                lnet_copy_iov2flat(*kptllnd_tunables.kptl_max_msg_size,
-                                   tx->tx_msg->ptlm_u.immediate.kptlim_payload,
-                                   0,
-                                   payload_niov, payload_iov,
-                                   payload_offset, payload_nob);
+        if (payload_nob == 0) {
+                nfrag = 0;
+        } else {
+                tx->tx_frags->iov[0].iov_base = tx->tx_msg;
+                tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
+                                                        ptlm_u.immediate.kptlim_payload);
 
+                /* NB relying on lustre not asking for PTL_MD_MAX_IOV
+                 * fragments!! */
+#ifdef _USING_LUSTRE_PORTALS_
+                nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1, 
+                                             &tx->tx_frags->iov[1],
+                                             payload_niov, payload_iov,
+                                             payload_offset, payload_nob);
+#else
+                nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
+                                                &tx->tx_frags->iov[1],
+                                                payload_niov, payload_iov,
+                                                payload_offset, payload_nob);
+#endif
+        }
+        
         nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
-        kptllnd_tx_launch(tx, target);
-        return 0;
+
+        CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
+               libcfs_id2str(target),
+               lnet_msgtyp2str(lntmsg->msg_type),
+               (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ? 
+               le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :
+               (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ? 
+               le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
+               tx);
+
+        kptllnd_tx_launch(peer, tx, nfrag);
+
+ out:
+        kptllnd_peer_decref(peer);
+        return rc;
 }
 
 int 
@@ -547,7 +620,7 @@ kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
         /*
          * We're done with the RX
          */
-        kptllnd_rx_done(rx);
+        kptllnd_rx_done(rx, PTLLND_POSTRX_PEER_CREDIT);
         return rc;
 }
 
@@ -599,6 +672,7 @@ kptllnd_watchdog(void *arg)
         int                 id = (long)arg;
         char                name[16];
         wait_queue_t        waitlink;
+        int                 stamp = 0;
         int                 peer_index = 0;
         unsigned long       deadline = jiffies;
         int                 timeout;
@@ -614,7 +688,7 @@ kptllnd_watchdog(void *arg)
         while (kptllnd_data.kptl_shutdown < 2) {
 
                 timeout = (int)(deadline - jiffies);
-                
+
                 if (timeout <= 0) {
                         const int n = 4;
                         const int p = 1;
@@ -636,12 +710,13 @@ kptllnd_watchdog(void *arg)
                                 chunk = 1;
 
                         for (i = 0; i < chunk; i++) {
-                                kptllnd_peer_check_bucket(peer_index);
+                                kptllnd_peer_check_bucket(peer_index, stamp);
                                 peer_index = (peer_index + 1) %
                                      kptllnd_data.kptl_peer_hash_size;
                         }
 
                         deadline += p * HZ;
+                        stamp++;
                         continue;
                 }
 
@@ -757,4 +832,3 @@ kptllnd_scheduler (void *arg)
         kptllnd_thread_fini();
         return 0;
 }
-