-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
+/*
+ * GPL HEADER START
*
- * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
- * Author: PJ Kirner <pjkirner@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * This file is confidential source code owned by Cluster File Systems.
- * No viewing, modification, compilation, redistribution, or any other
- * form of use is permitted except through a signed license agreement.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * If you have not signed such an agreement, then you have no rights to
- * this file. Please destroy it immediately and contact CFS.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/klnds/ptllnd/ptllnd_cb.c
+ *
+ * Author: PJ Kirner <pjkirner@clusterfs.com>
*/
#include "ptllnd.h"
memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
- tx->tx_rdma_md.start = tx->tx_rdma_frags;
+ tx->tx_rdma_md.start = tx->tx_frags;
tx->tx_rdma_md.user_ptr = &tx->tx_rdma_eventarg;
tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
tx->tx_rdma_md.options = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
break;
case TX_TYPE_GET_RESPONSE: /* active: I put */
- tx->tx_rdma_md.threshold = 1; /* SEND */
+ tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1; /* SEND + ACK? */
break;
}
if (iov != NULL) {
tx->tx_rdma_md.options |= PTL_MD_IOVEC;
tx->tx_rdma_md.length =
- lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+ lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
niov, iov, offset, nob);
return;
}
tx->tx_rdma_md.options |= PTL_MD_KIOV;
tx->tx_rdma_md.length =
- lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->kiov,
+ lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
niov, kiov, offset, nob);
#else
if (iov != NULL) {
tx->tx_rdma_md.options |= PTL_MD_IOVEC;
tx->tx_rdma_md.length =
- kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+ kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
niov, iov, offset, nob);
return;
}
tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
tx->tx_rdma_md.length =
- kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+ kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
niov, kiov, offset, nob);
#endif
}
ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md,
PTL_UNLINK, &mdh);
if (ptlrc != PTL_OK) {
- CERROR("PtlMDBind(%s) failed: %d\n",
- libcfs_id2str(peer->peer_id), ptlrc);
+ CERROR("PtlMDBind(%s) failed: %s(%d)\n",
+ libcfs_id2str(peer->peer_id),
+ kptllnd_errtype2str(ptlrc), ptlrc);
tx->tx_status = -EIO;
kptllnd_tx_decref(tx);
return -EIO;
}
-
- spin_lock_irqsave(&peer->peer_lock, flags);
+
+ spin_lock_irqsave(&peer->peer_lock, flags);
tx->tx_lnet_msg = lntmsg;
/* lnet_finalize() will be called when tx is torn down, so I must
* return success from here on... */
- tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
+ tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
tx->tx_rdma_mdh = mdh;
tx->tx_active = 1;
- list_add_tail(&tx->tx_list, &peer->peer_activeq);
+ cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
/* peer has now got my ref on 'tx' */
- spin_unlock_irqrestore(&peer->peer_lock, flags);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
+
+ tx->tx_tposted = jiffies;
if (type == TX_TYPE_GET_RESPONSE)
ptlrc = PtlPut(mdh,
- PTL_NOACK_REQ,
+ tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
rx->rx_initiator,
*kptllnd_tunables.kptl_portal,
0, /* acl cookie */
0); /* offset */
if (ptlrc != PTL_OK) {
- CERROR("Ptl%s failed: %d\n",
- (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get", ptlrc);
+ CERROR("Ptl%s failed: %s(%d)\n",
+ (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",
+ kptllnd_errtype2str(ptlrc), ptlrc);
kptllnd_peer_close(peer, -EIO);
/* Everything (including this RDMA) queued on the peer will
lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
unsigned int payload_offset = lntmsg->msg_offset;
unsigned int payload_nob = lntmsg->msg_len;
+ kptl_net_t *net = ni->ni_data;
+ kptl_peer_t *peer = NULL;
+ int mpflag = 0;
kptl_tx_t *tx;
int nob;
+ int nfrag;
+ int rc;
+ LASSERT (net->net_ni == ni);
+ LASSERT (!net->net_shutdown);
LASSERT (payload_nob == 0 || payload_niov > 0);
LASSERT (payload_niov <= LNET_MAX_IOV);
LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
- LASSERT (!in_interrupt());
+ LASSERT (!cfs_in_interrupt());
+
+ if (lntmsg->msg_vmflush)
+ mpflag = cfs_memory_pressure_get_and_set();
+ rc = kptllnd_find_target(net, target, &peer);
+ if (rc != 0)
+ goto out;
+
+ /* NB peer->peer_id does NOT always equal target, be careful with
+ * which one to use */
switch (type) {
default:
LBUG();
case LNET_MSG_REPLY:
case LNET_MSG_PUT:
- /* Is the payload small enough not to need RDMA? */
+ /* Should the payload avoid RDMA? */
nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
- if (nob <= *kptllnd_tunables.kptl_max_msg_size)
+ if (payload_kiov == NULL &&
+ nob <= peer->peer_max_msg_size)
break;
tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
CERROR("Can't send %s to %s: can't allocate descriptor\n",
lnet_msgtyp2str(type),
libcfs_id2str(target));
- return -ENOMEM;
+ rc = -ENOMEM;
+ goto out;
}
kptllnd_init_rdma_md(tx, payload_niov,
tx->tx_lnet_msg = lntmsg;
tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
- sizeof(kptl_rdma_msg_t));
+ target, sizeof(kptl_rdma_msg_t));
CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
libcfs_id2str(target),
le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
- kptllnd_tx_launch(tx, target);
- return 0;
+ kptllnd_tx_launch(peer, tx, 0);
+ goto out;
case LNET_MSG_GET:
/* routed gets don't RDMA */
nob = lntmsg->msg_md->md_length;
nob = offsetof(kptl_msg_t,
ptlm_u.immediate.kptlim_payload[nob]);
- if (nob <= *kptllnd_tunables.kptl_max_msg_size)
+ if (nob <= peer->peer_max_msg_size)
break;
tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
if (tx == NULL) {
CERROR("Can't send GET to %s: can't allocate descriptor\n",
libcfs_id2str(target));
- return -ENOMEM;
+ rc = -ENOMEM;
+ goto out;
}
- tx->tx_lnet_replymsg =
- lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg);
+ tx->tx_lnet_replymsg = lnet_create_reply_msg(ni, lntmsg);
if (tx->tx_lnet_replymsg == NULL) {
CERROR("Failed to allocate LNET reply for %s\n",
libcfs_id2str(target));
kptllnd_tx_decref(tx);
- return -ENOMEM;
+ rc = -ENOMEM;
+ goto out;
}
if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
NULL, lntmsg->msg_md->md_iov.kiov,
0, lntmsg->msg_md->md_length);
-
+
tx->tx_lnet_msg = lntmsg;
tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
- sizeof(kptl_rdma_msg_t));
+ target, sizeof(kptl_rdma_msg_t));
CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
libcfs_id2str(target),
le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
- kptllnd_tx_launch(tx, target);
- return 0;
+ kptllnd_tx_launch(peer, tx, 0);
+ goto out;
case LNET_MSG_ACK:
CDEBUG(D_NET, "LNET_MSG_ACK\n");
break;
}
+ /* I don't have to handle kiovs */
+ LASSERT (payload_nob == 0 || payload_iov != NULL);
+
tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
if (tx == NULL) {
CERROR("Can't send %s to %s: can't allocate descriptor\n",
lnet_msgtyp2str(type), libcfs_id2str(target));
- return -ENOMEM;
+ rc = -ENOMEM;
+ goto out;
}
tx->tx_lnet_msg = lntmsg;
tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
- if (payload_kiov != NULL)
- lnet_copy_kiov2flat(*kptllnd_tunables.kptl_max_msg_size,
- tx->tx_msg->ptlm_u.immediate.kptlim_payload,
- 0,
- payload_niov, payload_kiov,
- payload_offset, payload_nob);
- else
- lnet_copy_iov2flat(*kptllnd_tunables.kptl_max_msg_size,
- tx->tx_msg->ptlm_u.immediate.kptlim_payload,
- 0,
- payload_niov, payload_iov,
- payload_offset, payload_nob);
+ if (payload_nob == 0) {
+ nfrag = 0;
+ } else {
+ tx->tx_frags->iov[0].iov_base = tx->tx_msg;
+ tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
+ ptlm_u.immediate.kptlim_payload);
+
+ /* NB relying on lustre not asking for PTL_MD_MAX_IOV
+ * fragments!! */
+#ifdef _USING_LUSTRE_PORTALS_
+ nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1,
+ &tx->tx_frags->iov[1],
+ payload_niov, payload_iov,
+ payload_offset, payload_nob);
+#else
+ nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
+ &tx->tx_frags->iov[1],
+ payload_niov, payload_iov,
+ payload_offset, payload_nob);
+#endif
+ }
nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
- kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
+ kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob);
CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
libcfs_id2str(target),
le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
tx);
- kptllnd_tx_launch(tx, target);
- return 0;
+ kptllnd_tx_launch(peer, tx, nfrag);
+
+ out:
+ if (lntmsg->msg_vmflush)
+ cfs_memory_pressure_restore(mpflag);
+ if (peer)
+ kptllnd_peer_decref(peer);
+ return rc;
}
int
LASSERT (mlen <= rlen);
LASSERT (mlen >= 0);
- LASSERT (!in_interrupt());
+ LASSERT (!cfs_in_interrupt());
LASSERT (!(kiov != NULL && iov != NULL)); /* never both */
LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
-#ifdef CRAY_XT3
- if (lntmsg != NULL &&
- rx->rx_uid != 0) {
- /* Set the UID if the sender's uid isn't 0; i.e. non-root
- * running in userspace (e.g. a catamount node; linux kernel
- * senders, including routers have uid 0). If this is a lustre
- * RPC request, this tells lustre not to trust the creds in the
- * RPC message body. */
- lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);
- }
-#endif
switch(rxmsg->ptlm_type)
{
default:
/*
* We're done with the RX
*/
- kptllnd_rx_done(rx);
+ kptllnd_rx_done(rx, PTLLND_POSTRX_PEER_CREDIT);
return rc;
}
switch (eva->eva_type) {
default:
LBUG();
-
+
case PTLLND_EVENTARG_TYPE_MSG:
case PTLLND_EVENTARG_TYPE_RDMA:
kptllnd_tx_callback(ev);
break;
-
+
case PTLLND_EVENTARG_TYPE_BUF:
kptllnd_rx_buffer_callback(ev);
break;
void
kptllnd_thread_fini (void)
{
- atomic_dec(&kptllnd_data.kptl_nthreads);
+ cfs_atomic_dec(&kptllnd_data.kptl_nthreads);
}
int
{
long pid;
- atomic_inc(&kptllnd_data.kptl_nthreads);
+ cfs_atomic_inc(&kptllnd_data.kptl_nthreads);
- pid = kernel_thread (fn, arg, 0);
+ pid = cfs_create_thread (fn, arg, 0);
if (pid >= 0)
return 0;
-
- CERROR("Failed to start kernel_thread: error %d\n", (int)pid);
+
+ CERROR("Failed to start thread: error %d\n", (int)pid);
kptllnd_thread_fini();
return (int)pid;
}
{
int id = (long)arg;
char name[16];
- wait_queue_t waitlink;
+ cfs_waitlink_t waitlink;
+ int stamp = 0;
int peer_index = 0;
unsigned long deadline = jiffies;
int timeout;
cfs_daemonize(name);
cfs_block_allsigs();
- init_waitqueue_entry(&waitlink, current);
+ cfs_waitlink_init(&waitlink);
/* threads shut down in phase 2 after all peers have been destroyed */
while (kptllnd_data.kptl_shutdown < 2) {
timeout = (int)(deadline - jiffies);
-
if (timeout <= 0) {
const int n = 4;
const int p = 1;
chunk = 1;
for (i = 0; i < chunk; i++) {
- kptllnd_peer_check_bucket(peer_index);
+ kptllnd_peer_check_bucket(peer_index, stamp);
peer_index = (peer_index + 1) %
kptllnd_data.kptl_peer_hash_size;
}
- deadline += p * HZ;
+ deadline += p * CFS_HZ;
+ stamp++;
continue;
}
kptllnd_handle_closing_peers();
- set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue_exclusive(&kptllnd_data.kptl_watchdog_waitq,
- &waitlink);
+ cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
+ cfs_waitq_add_exclusive(&kptllnd_data.kptl_watchdog_waitq,
+ &waitlink);
- schedule_timeout(timeout);
-
- set_current_state (TASK_RUNNING);
- remove_wait_queue(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
+ cfs_waitq_timedwait(&waitlink, CFS_TASK_INTERRUPTIBLE, timeout);
+
+ cfs_set_current_state (CFS_TASK_RUNNING);
+ cfs_waitq_del(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
}
kptllnd_thread_fini();
{
int id = (long)arg;
char name[16];
- wait_queue_t waitlink;
+ cfs_waitlink_t waitlink;
unsigned long flags;
int did_something;
int counter = 0;
cfs_daemonize(name);
cfs_block_allsigs();
- init_waitqueue_entry(&waitlink, current);
+ cfs_waitlink_init(&waitlink);
- spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
+ spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
/* threads shut down in phase 2 after all peers have been destroyed */
while (kptllnd_data.kptl_shutdown < 2) {
did_something = 0;
- if (!list_empty(&kptllnd_data.kptl_sched_rxq)) {
- rx = list_entry (kptllnd_data.kptl_sched_rxq.next,
- kptl_rx_t, rx_list);
- list_del(&rx->rx_list);
-
- spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
- flags);
+ if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxq)) {
+ rx = cfs_list_entry (kptllnd_data.kptl_sched_rxq.next,
+ kptl_rx_t, rx_list);
+ cfs_list_del(&rx->rx_list);
+
+ spin_unlock_irqrestore(&kptllnd_data. \
+ kptl_sched_lock,
+ flags);
kptllnd_rx_parse(rx);
did_something = 1;
- spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
+ spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
+ flags);
}
- if (!list_empty(&kptllnd_data.kptl_sched_rxbq)) {
- rxb = list_entry (kptllnd_data.kptl_sched_rxbq.next,
- kptl_rx_buffer_t, rxb_repost_list);
- list_del(&rxb->rxb_repost_list);
+ if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxbq)) {
+ rxb = cfs_list_entry (kptllnd_data.kptl_sched_rxbq.next,
+ kptl_rx_buffer_t,
+ rxb_repost_list);
+ cfs_list_del(&rxb->rxb_repost_list);
- spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
- flags);
+ spin_unlock_irqrestore(&kptllnd_data. \
+ kptl_sched_lock,
+ flags);
kptllnd_rx_buffer_post(rxb);
did_something = 1;
- spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
+ spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
+ flags);
}
- if (!list_empty(&kptllnd_data.kptl_sched_txq)) {
- tx = list_entry (kptllnd_data.kptl_sched_txq.next,
- kptl_tx_t, tx_list);
- list_del_init(&tx->tx_list);
+ if (!cfs_list_empty(&kptllnd_data.kptl_sched_txq)) {
+ tx = cfs_list_entry (kptllnd_data.kptl_sched_txq.next,
+ kptl_tx_t, tx_list);
+ cfs_list_del_init(&tx->tx_list);
- spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
+ spin_unlock_irqrestore(&kptllnd_data. \
+ kptl_sched_lock, flags);
kptllnd_tx_fini(tx);
did_something = 1;
- spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
+ spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
+ flags);
}
if (did_something) {
continue;
}
- set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue_exclusive(&kptllnd_data.kptl_sched_waitq,
- &waitlink);
- spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
+ cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
+ cfs_waitq_add_exclusive(&kptllnd_data.kptl_sched_waitq,
+ &waitlink);
+ spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
+ flags);
if (!did_something)
- schedule();
+ cfs_waitq_wait(&waitlink, CFS_TASK_INTERRUPTIBLE);
else
- cond_resched();
+ cfs_cond_resched();
- set_current_state(TASK_RUNNING);
- remove_wait_queue(&kptllnd_data.kptl_sched_waitq, &waitlink);
+ cfs_set_current_state(CFS_TASK_RUNNING);
+ cfs_waitq_del(&kptllnd_data.kptl_sched_waitq, &waitlink);
- spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
+ spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
counter = 0;
}
- spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
+ spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
kptllnd_thread_fini();
return 0;
}
-