-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
+/*
+ * GPL HEADER START
*
- * Copyright (C) 2004 Cluster File Systems, Inc.
- * Author: Eric Barton <eric@bartonsoftware.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * This file is part of Lustre, http://www.lustre.org.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
*
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/klnds/ralnd/ralnd_cb.c
+ *
+ * Author: Eric Barton <eric@bartonsoftware.com>
*/
-#include "ranal.h"
-
-int
-kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
-{
- /* I would guess that if kranal_get_peer (nid) == NULL,
- and we're not routing, then 'nid' is very distant :) */
- if ( nal->libnal_ni.ni_pid.nid == nid ) {
- *dist = 0;
- } else {
- *dist = 1;
- }
-
- return 0;
-}
+#include <asm/page.h>
+#include "ralnd.h"
void
kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
if (dev->rad_id != devid)
continue;
- spin_lock_irqsave(&dev->rad_lock, flags);
+ spin_lock_irqsave(&dev->rad_lock, flags);
- if (!dev->rad_ready) {
- dev->rad_ready = 1;
- wake_up(&dev->rad_waitq);
- }
+ if (!dev->rad_ready) {
+ dev->rad_ready = 1;
+ wake_up(&dev->rad_waitq);
+ }
- spin_unlock_irqrestore(&dev->rad_lock, flags);
+ spin_unlock_irqrestore(&dev->rad_lock, flags);
return;
}
kra_device_t *dev = conn->rac_device;
unsigned long flags;
- spin_lock_irqsave(&dev->rad_lock, flags);
+ spin_lock_irqsave(&dev->rad_lock, flags);
- if (!conn->rac_scheduled) {
- kranal_conn_addref(conn); /* +1 ref for scheduler */
- conn->rac_scheduled = 1;
- list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
- wake_up(&dev->rad_waitq);
- }
+ if (!conn->rac_scheduled) {
+ kranal_conn_addref(conn); /* +1 ref for scheduler */
+ conn->rac_scheduled = 1;
+ cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
+ wake_up(&dev->rad_waitq);
+ }
- spin_unlock_irqrestore(&dev->rad_lock, flags);
+ spin_unlock_irqrestore(&dev->rad_lock, flags);
}
kra_tx_t *
-kranal_get_idle_tx (int may_block)
+kranal_get_idle_tx (void)
{
unsigned long flags;
- kra_tx_t *tx = NULL;
+ kra_tx_t *tx;
- for (;;) {
- spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
+ spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
- /* "normal" descriptor is free */
- if (!list_empty(&kranal_data.kra_idle_txs)) {
- tx = list_entry(kranal_data.kra_idle_txs.next,
- kra_tx_t, tx_list);
- break;
- }
-
- if (!may_block) {
- /* may dip into reserve pool */
- if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
- CERROR("reserved tx desc pool exhausted\n");
- break;
- }
-
- tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
- kra_tx_t, tx_list);
- break;
- }
-
- /* block for idle tx */
- spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
-
- wait_event(kranal_data.kra_idle_tx_waitq,
- !list_empty(&kranal_data.kra_idle_txs));
+ if (cfs_list_empty(&kranal_data.kra_idle_txs)) {
+ spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+ return NULL;
}
- if (tx != NULL) {
- list_del(&tx->tx_list);
+ tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
+ cfs_list_del(&tx->tx_list);
- /* Allocate a new completion cookie. It might not be
- * needed, but we've got a lock right now... */
- tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
+ /* Allocate a new completion cookie. It might not be needed, but we've
+ * got a lock right now... */
+ tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
- LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
- LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
- LASSERT (tx->tx_conn == NULL);
- LASSERT (tx->tx_libmsg[0] == NULL);
- LASSERT (tx->tx_libmsg[1] == NULL);
- }
+ spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
- spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+ LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
+ LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
+ LASSERT (tx->tx_conn == NULL);
+ LASSERT (tx->tx_lntmsg[0] == NULL);
+ LASSERT (tx->tx_lntmsg[1] == NULL);
return tx;
}
msg->ram_magic = RANAL_MSG_MAGIC;
msg->ram_version = RANAL_MSG_VERSION;
msg->ram_type = type;
- msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
+ msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
/* ram_connstamp gets set when FMA is sent */
}
kra_tx_t *
-kranal_new_tx_msg (int may_block, int type)
+kranal_new_tx_msg (int type)
{
- kra_tx_t *tx = kranal_get_idle_tx(may_block);
+ kra_tx_t *tx = kranal_get_idle_tx();
- if (tx == NULL)
- return NULL;
+ if (tx != NULL)
+ kranal_init_msg(&tx->tx_msg, type);
- kranal_init_msg(&tx->tx_msg, type);
return tx;
}
int
-kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
+kranal_setup_immediate_buffer (kra_tx_t *tx,
+ unsigned int niov, struct iovec *iov,
int offset, int nob)
{
}
int
-kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
+kranal_setup_virt_buffer (kra_tx_t *tx,
+ unsigned int niov, struct iovec *iov,
int offset, int nob)
{
}
int
-kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
+kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
int offset, int nob)
{
RAP_PHYS_REGION *phys = tx->tx_phys;
tx->tx_nob = nob;
tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
- phys->Address = kranal_page2phys(kiov->kiov_page);
+ phys->Address = page_to_phys(kiov->kiov_page);
phys++;
resid = nob - (kiov->kiov_len - offset);
return -EINVAL;
}
- if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
+ if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
return -EMSGSIZE;
}
- phys->Address = kranal_page2phys(kiov->kiov_page);
+ phys->Address = page_to_phys(kiov->kiov_page);
phys++;
resid -= PAGE_SIZE;
}
static inline int
-kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
- struct iovec *iov, ptl_kiov_t *kiov,
+kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
+ struct iovec *iov, lnet_kiov_t *kiov,
int offset, int nob)
{
LASSERT ((iov == NULL) != (kiov == NULL));
return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
}
-void
+int
kranal_map_buffer (kra_tx_t *tx)
{
kra_conn_t *conn = tx->tx_conn;
case RANAL_BUF_IMMEDIATE:
case RANAL_BUF_PHYS_MAPPED:
case RANAL_BUF_VIRT_MAPPED:
- break;
+ return 0;
case RANAL_BUF_PHYS_UNMAPPED:
rrc = RapkRegisterPhys(dev->rad_handle,
tx->tx_phys, tx->tx_phys_npages,
&tx->tx_map_key);
- LASSERT (rrc == RAP_SUCCESS);
+ if (rrc != RAP_SUCCESS) {
+ CERROR ("Can't map %d pages: dev %d "
+ "phys %u pp %u, virt %u nob %lu\n",
+ tx->tx_phys_npages, dev->rad_id,
+ dev->rad_nphysmap, dev->rad_nppphysmap,
+ dev->rad_nvirtmap, dev->rad_nobvirtmap);
+ return -ENOMEM; /* assume insufficient resources */
+ }
+
+ dev->rad_nphysmap++;
+ dev->rad_nppphysmap += tx->tx_phys_npages;
+
tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
- break;
+ return 0;
case RANAL_BUF_VIRT_UNMAPPED:
rrc = RapkRegisterMemory(dev->rad_handle,
tx->tx_buffer, tx->tx_nob,
&tx->tx_map_key);
- LASSERT (rrc == RAP_SUCCESS);
+ if (rrc != RAP_SUCCESS) {
+ CERROR ("Can't map %d bytes: dev %d "
+ "phys %u pp %u, virt %u nob %lu\n",
+ tx->tx_nob, dev->rad_id,
+ dev->rad_nphysmap, dev->rad_nppphysmap,
+ dev->rad_nvirtmap, dev->rad_nobvirtmap);
+ return -ENOMEM; /* assume insufficient resources */
+ }
+
+ dev->rad_nvirtmap++;
+ dev->rad_nobvirtmap += tx->tx_nob;
+
tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
- break;
+ return 0;
}
}
rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
&tx->tx_map_key);
LASSERT (rrc == RAP_SUCCESS);
+
+ dev->rad_nphysmap--;
+ dev->rad_nppphysmap -= tx->tx_phys_npages;
+
tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
break;
rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
&tx->tx_map_key);
LASSERT (rrc == RAP_SUCCESS);
+
+ dev->rad_nvirtmap--;
+ dev->rad_nobvirtmap -= tx->tx_nob;
+
tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
break;
}
void
kranal_tx_done (kra_tx_t *tx, int completion)
{
- ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
- unsigned long flags;
- int i;
+ lnet_msg_t *lnetmsg[2];
+ unsigned long flags;
+ int i;
- LASSERT (!in_interrupt());
+ LASSERT (!in_interrupt());
- kranal_unmap_buffer(tx);
+ kranal_unmap_buffer(tx);
- for (i = 0; i < 2; i++) {
- /* tx may have up to 2 libmsgs to finalise */
- if (tx->tx_libmsg[i] == NULL)
- continue;
+ lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+ lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
- lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
- tx->tx_libmsg[i] = NULL;
- }
+ tx->tx_buftype = RANAL_BUF_NONE;
+ tx->tx_msg.ram_type = RANAL_MSG_NONE;
+ tx->tx_conn = NULL;
- tx->tx_buftype = RANAL_BUF_NONE;
- tx->tx_msg.ram_type = RANAL_MSG_NONE;
- tx->tx_conn = NULL;
+ spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
- spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
+ cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
- if (tx->tx_isnblk) {
- list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
- } else {
- list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
- wake_up(&kranal_data.kra_idle_tx_waitq);
- }
+ spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+
+ /* finalize AFTER freeing lnet msgs */
+ for (i = 0; i < 2; i++) {
+ if (lnetmsg[i] == NULL)
+ continue;
- spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+ lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
+ }
}
kra_conn_t *
kranal_find_conn_locked (kra_peer_t *peer)
{
- struct list_head *tmp;
+ cfs_list_t *tmp;
/* just return the first connection */
- list_for_each (tmp, &peer->rap_conns) {
- return list_entry(tmp, kra_conn_t, rac_list);
+ cfs_list_for_each (tmp, &peer->rap_conns) {
+ return cfs_list_entry(tmp, kra_conn_t, rac_list);
}
return NULL;
tx->tx_conn = conn;
- spin_lock_irqsave(&conn->rac_lock, flags);
- list_add_tail(&tx->tx_list, &conn->rac_fmaq);
+ spin_lock_irqsave(&conn->rac_lock, flags);
+ cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
tx->tx_qtime = jiffies;
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
kranal_schedule_conn(conn);
}
void
-kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
+kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
{
unsigned long flags;
kra_peer_t *peer;
kra_conn_t *conn;
- unsigned long now;
- rwlock_t *g_lock = &kranal_data.kra_global_lock;
+ int rc;
+ int retry;
+ rwlock_t *g_lock = &kranal_data.kra_global_lock;
/* If I get here, I've committed to send, so I complete the tx with
* failure on any problems */
- LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
+ LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
- read_lock(g_lock);
+ for (retry = 0; ; retry = 1) {
- peer = kranal_find_peer_locked(nid);
- if (peer == NULL) {
- read_unlock(g_lock);
- kranal_tx_done(tx, -EHOSTUNREACH);
- return;
- }
+ read_lock(g_lock);
- conn = kranal_find_conn_locked(peer);
- if (conn != NULL) {
- kranal_post_fma(conn, tx);
- read_unlock(g_lock);
- return;
- }
+ peer = kranal_find_peer_locked(nid);
+ if (peer != NULL) {
+ conn = kranal_find_conn_locked(peer);
+ if (conn != NULL) {
+ kranal_post_fma(conn, tx);
+ read_unlock(g_lock);
+ return;
+ }
+ }
+
+ /* Making connections; I'll need a write lock... */
+ read_unlock(g_lock);
+ write_lock_irqsave(g_lock, flags);
- /* Making one or more connections; I'll need a write lock... */
- read_unlock(g_lock);
- write_lock_irqsave(g_lock, flags);
+ peer = kranal_find_peer_locked(nid);
+ if (peer != NULL)
+ break;
+
+ write_unlock_irqrestore(g_lock, flags);
+
+ if (retry) {
+ CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
+ kranal_tx_done(tx, -EHOSTUNREACH);
+ return;
+ }
- peer = kranal_find_peer_locked(nid);
- if (peer == NULL) {
- write_unlock_irqrestore(g_lock, flags);
- kranal_tx_done(tx, -EHOSTUNREACH);
- return;
+ rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
+ lnet_acceptor_port());
+ if (rc != 0) {
+ CERROR("Can't add peer %s: %d\n",
+ libcfs_nid2str(nid), rc);
+ kranal_tx_done(tx, rc);
+ return;
+ }
}
-
+
conn = kranal_find_conn_locked(peer);
if (conn != NULL) {
/* Connection exists; queue message on it */
kranal_post_fma(conn, tx);
- write_unlock_irqrestore(g_lock, flags);
+ write_unlock_irqrestore(g_lock, flags);
return;
}
-
+
LASSERT (peer->rap_persistence > 0);
if (!peer->rap_connecting) {
- LASSERT (list_empty(&peer->rap_tx_queue));
+ LASSERT (cfs_list_empty(&peer->rap_tx_queue));
- now = CURRENT_SECONDS;
- if (now < peer->rap_reconnect_time) {
- write_unlock_irqrestore(g_lock, flags);
+ if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
+ cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) {
+ write_unlock_irqrestore(g_lock, flags);
kranal_tx_done(tx, -EHOSTUNREACH);
return;
}
peer->rap_connecting = 1;
kranal_peer_addref(peer); /* extra ref for connd */
- spin_lock(&kranal_data.kra_connd_lock);
+ spin_lock(&kranal_data.kra_connd_lock);
- list_add_tail(&peer->rap_connd_list,
- &kranal_data.kra_connd_peers);
- wake_up(&kranal_data.kra_connd_waitq);
+ cfs_list_add_tail(&peer->rap_connd_list,
+ &kranal_data.kra_connd_peers);
+ wake_up(&kranal_data.kra_connd_waitq);
- spin_unlock(&kranal_data.kra_connd_lock);
+ spin_unlock(&kranal_data.kra_connd_lock);
}
/* A connection is being established; queue the message... */
- list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
+ cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
- write_unlock_irqrestore(g_lock, flags);
+ write_unlock_irqrestore(g_lock, flags);
}
void
rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
LASSERT (rrc == RAP_SUCCESS);
- spin_lock_irqsave(&conn->rac_lock, flags);
- list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
+ spin_lock_irqsave(&conn->rac_lock, flags);
+ cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
tx->tx_qtime = jiffies;
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
}
int
conn->rac_rxmsg = NULL;
if (nob_received < nob) {
- CWARN("Incomplete immediate msg from "LPX64
- ": expected %d, got %d\n",
- conn->rac_peer->rap_nid, nob, nob_received);
+ CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
+ libcfs_nid2str(conn->rac_peer->rap_nid),
+ nob, nob_received);
return -EPROTO;
}
return 0;
}
-ptl_err_t
-kranal_do_send (lib_nal_t *nal,
- void *private,
- lib_msg_t *libmsg,
- ptl_hdr_t *hdr,
- int type,
- ptl_nid_t nid,
- ptl_pid_t pid,
- unsigned int niov,
- struct iovec *iov,
- ptl_kiov_t *kiov,
- int offset,
- int nob)
+int
+kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
{
- kra_conn_t *conn;
- kra_tx_t *tx;
- int rc;
+ lnet_hdr_t *hdr = &lntmsg->msg_hdr;
+ int type = lntmsg->msg_type;
+ lnet_process_id_t target = lntmsg->msg_target;
+ int target_is_router = lntmsg->msg_target_is_router;
+ int routing = lntmsg->msg_routing;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int nob = lntmsg->msg_len;
+ kra_tx_t *tx;
+ int rc;
/* NB 'private' is different depending on what we're sending.... */
- CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
- nob, niov, nid, pid);
+ CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
+ nob, niov, libcfs_id2str(target));
+
+ LASSERT (nob == 0 || niov > 0);
+ LASSERT (niov <= LNET_MAX_IOV);
- LASSERT (nob == 0 || niov > 0);
- LASSERT (niov <= PTL_MD_MAX_IOV);
+ LASSERT (!in_interrupt());
+ /* payload is either all vaddrs or all pages */
+ LASSERT (!(kiov != NULL && iov != NULL));
- LASSERT (!in_interrupt());
- /* payload is either all vaddrs or all pages */
- LASSERT (!(kiov != NULL && iov != NULL));
+ if (routing) {
+ CERROR ("Can't route\n");
+ return -EIO;
+ }
switch(type) {
default:
LBUG();
- case PTL_MSG_REPLY: {
- /* reply's 'private' is the conn that received the GET_REQ */
- conn = private;
- LASSERT (conn->rac_rxmsg != NULL);
-
- if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
- if (nob > RANAL_FMA_MAX_DATA) {
- CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
- nob, nid);
- return PTL_FAIL;
- }
- break; /* RDMA not expected */
- }
-
- /* Incoming message consistent with immediate reply? */
- if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
- CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
- nid, conn->rac_rxmsg->ram_type);
- return PTL_FAIL;
- }
-
- tx = kranal_get_idle_tx(0);
- if (tx == NULL)
- return PTL_FAIL;
-
- rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
- if (rc != 0) {
- kranal_tx_done(tx, rc);
- return PTL_FAIL;
- }
-
- tx->tx_conn = conn;
- tx->tx_libmsg[0] = libmsg;
-
- kranal_map_buffer(tx);
- kranal_rdma(tx, RANAL_MSG_GET_DONE,
- &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
- conn->rac_rxmsg->ram_u.get.ragm_cookie);
-
- /* flag matched by consuming rx message */
- kranal_consume_rxmsg(conn, NULL, 0);
- return PTL_OK;
- }
+ case LNET_MSG_ACK:
+ LASSERT (nob == 0);
+ break;
- case PTL_MSG_GET:
+ case LNET_MSG_GET:
LASSERT (niov == 0);
LASSERT (nob == 0);
/* We have to consider the eventual sink buffer rather than any
* payload passed here (there isn't any, and strictly, looking
- * inside libmsg is a layering violation). We send a simple
+ * inside lntmsg is a layering violation). We send a simple
* IMMEDIATE GET if the sink buffer is mapped already and small
* enough for FMA */
- if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
- libmsg->md->length <= RANAL_FMA_MAX_DATA &&
- libmsg->md->length <= kranal_tunables.kra_max_immediate)
- break;
+ if (routing || target_is_router)
+ break; /* send IMMEDIATE */
+
+ if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
+ lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
+ lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
+ break; /* send IMMEDIATE */
- tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
+ tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
if (tx == NULL)
- return PTL_NO_SPACE;
+ return -ENOMEM;
- if ((libmsg->md->options & PTL_MD_KIOV) == 0)
- rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
- libmsg->md->md_iov.iov,
- 0, libmsg->md->length);
+ if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
+ rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
+ lntmsg->msg_md->md_iov.iov,
+ 0, lntmsg->msg_md->md_length);
else
- rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
- libmsg->md->md_iov.kiov,
- 0, libmsg->md->length);
+ rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
+ lntmsg->msg_md->md_iov.kiov,
+ 0, lntmsg->msg_md->md_length);
if (rc != 0) {
kranal_tx_done(tx, rc);
- return PTL_FAIL;
+ return -EIO;
}
- tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
- if (tx->tx_libmsg[1] == NULL) {
- CERROR("Can't create reply for GET to "LPX64"\n", nid);
+ tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
+ if (tx->tx_lntmsg[1] == NULL) {
+ CERROR("Can't create reply for GET to %s\n",
+ libcfs_nid2str(target.nid));
kranal_tx_done(tx, rc);
- return PTL_FAIL;
+ return -EIO;
}
- tx->tx_libmsg[0] = libmsg;
+ tx->tx_lntmsg[0] = lntmsg;
tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
/* rest of tx_msg is setup just before it is sent */
- kranal_launch_tx(tx, nid);
- return PTL_OK;
-
- case PTL_MSG_ACK:
- LASSERT (nob == 0);
- break;
+ kranal_launch_tx(tx, target.nid);
+ return 0;
- case PTL_MSG_PUT:
+ case LNET_MSG_REPLY:
+ case LNET_MSG_PUT:
if (kiov == NULL && /* not paged */
nob <= RANAL_FMA_MAX_DATA && /* small enough */
- nob <= kranal_tunables.kra_max_immediate)
+ nob <= *kranal_tunables.kra_max_immediate)
break; /* send IMMEDIATE */
- tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
+ tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
if (tx == NULL)
- return PTL_NO_SPACE;
+ return -ENOMEM;
rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
if (rc != 0) {
kranal_tx_done(tx, rc);
- return PTL_FAIL;
+ return -EIO;
}
- tx->tx_libmsg[0] = libmsg;
+ tx->tx_lntmsg[0] = lntmsg;
tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
/* rest of tx_msg is setup just before it is sent */
- kranal_launch_tx(tx, nid);
- return PTL_OK;
+ kranal_launch_tx(tx, target.nid);
+ return 0;
}
+ /* send IMMEDIATE */
+
LASSERT (kiov == NULL);
LASSERT (nob <= RANAL_FMA_MAX_DATA);
- tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
- type == PTL_MSG_REPLY ||
- in_interrupt()),
- RANAL_MSG_IMMEDIATE);
+ tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
if (tx == NULL)
- return PTL_NO_SPACE;
+ return -ENOMEM;
rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
if (rc != 0) {
kranal_tx_done(tx, rc);
- return PTL_FAIL;
+ return -EIO;
}
tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
- tx->tx_libmsg[0] = libmsg;
- kranal_launch_tx(tx, nid);
- return PTL_OK;
+ tx->tx_lntmsg[0] = lntmsg;
+ kranal_launch_tx(tx, target.nid);
+ return 0;
}
-ptl_err_t
-kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov,
- size_t offset, size_t len)
+void
+kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
{
- return kranal_do_send(nal, private, cookie,
- hdr, type, nid, pid,
- niov, iov, NULL,
- offset, len);
+ kra_msg_t *rxmsg = conn->rac_rxmsg;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int nob = lntmsg->msg_len;
+ kra_tx_t *tx;
+ int rc;
+
+ tx = kranal_get_idle_tx();
+ if (tx == NULL)
+ goto failed_0;
+
+ rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
+ if (rc != 0)
+ goto failed_1;
+
+ tx->tx_conn = conn;
+
+ rc = kranal_map_buffer(tx);
+ if (rc != 0)
+ goto failed_1;
+
+ tx->tx_lntmsg[0] = lntmsg;
+
+ kranal_rdma(tx, RANAL_MSG_GET_DONE,
+ &rxmsg->ram_u.get.ragm_desc, nob,
+ rxmsg->ram_u.get.ragm_cookie);
+ return;
+
+ failed_1:
+ kranal_tx_done(tx, -EIO);
+ failed_0:
+ lnet_finalize(ni, lntmsg, -EIO);
}
-ptl_err_t
-kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, ptl_kiov_t *kiov,
- size_t offset, size_t len)
+int
+kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+ void **new_private)
{
- return kranal_do_send(nal, private, cookie,
- hdr, type, nid, pid,
- niov, NULL, kiov,
- offset, len);
+ kra_conn_t *conn = (kra_conn_t *)private;
+
+ LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
+ libcfs_nid2str(conn->rac_peer->rap_nid));
+
+ return -EDEADLK;
}
-ptl_err_t
-kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
- unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
- int offset, int mlen, int rlen)
+int
+kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+ int delayed, unsigned int niov,
+ struct iovec *iov, lnet_kiov_t *kiov,
+ unsigned int offset, unsigned int mlen, unsigned int rlen)
{
- kra_conn_t *conn = private;
- kra_msg_t *rxmsg = conn->rac_rxmsg;
- kra_tx_t *tx;
- void *buffer;
- int rc;
-
- LASSERT (mlen <= rlen);
- LASSERT (!in_interrupt());
- /* Either all pages or all vaddrs */
- LASSERT (!(kiov != NULL && iov != NULL));
-
- CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg);
-
- if (libmsg == NULL) {
- /* GET or ACK or portals is discarding */
- LASSERT (mlen == 0);
- lib_finalize(nal, NULL, libmsg, PTL_OK);
- return PTL_OK;
- }
+ kra_conn_t *conn = private;
+ kra_msg_t *rxmsg = conn->rac_rxmsg;
+ kra_tx_t *tx;
+ void *buffer;
+ int rc;
+
+ LASSERT (mlen <= rlen);
+ LASSERT (!in_interrupt());
+ /* Either all pages or all vaddrs */
+ LASSERT (!(kiov != NULL && iov != NULL));
+
+ CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
switch(rxmsg->ram_type) {
default:
LBUG();
- return PTL_FAIL;
case RANAL_MSG_IMMEDIATE:
if (mlen == 0) {
buffer = NULL;
} else if (kiov != NULL) {
CERROR("Can't recv immediate into paged buffer\n");
- return PTL_FAIL;
+ return -EIO;
} else {
LASSERT (niov > 0);
while (offset >= iov->iov_len) {
}
if (mlen > iov->iov_len - offset) {
CERROR("Can't handle immediate frags\n");
- return PTL_FAIL;
+ return -EIO;
}
buffer = ((char *)iov->iov_base) + offset;
}
rc = kranal_consume_rxmsg(conn, buffer, mlen);
- lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
- return PTL_OK;
+ lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
+ return 0;
case RANAL_MSG_PUT_REQ:
- tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
- if (tx == NULL)
- return PTL_NO_SPACE;
-
+ tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
+ if (tx == NULL) {
+ kranal_consume_rxmsg(conn, NULL, 0);
+ return -ENOMEM;
+ }
+
rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
if (rc != 0) {
kranal_tx_done(tx, rc);
- return PTL_FAIL;
+ kranal_consume_rxmsg(conn, NULL, 0);
+ return -EIO;
}
tx->tx_conn = conn;
- kranal_map_buffer(tx);
+ rc = kranal_map_buffer(tx);
+ if (rc != 0) {
+ kranal_tx_done(tx, rc);
+ kranal_consume_rxmsg(conn, NULL, 0);
+ return -EIO;
+ }
tx->tx_msg.ram_u.putack.rapam_src_cookie =
conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
(__u64)((unsigned long)tx->tx_buffer);
tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
- tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
+ tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
kranal_post_fma(conn, tx);
+ kranal_consume_rxmsg(conn, NULL, 0);
+ return 0;
- /* flag matched by consuming rx message */
+ case RANAL_MSG_GET_REQ:
+ if (lntmsg != NULL) {
+ /* Matched! */
+ kranal_reply(ni, conn, lntmsg);
+ } else {
+ /* No match */
+ tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
+ if (tx != NULL) {
+ tx->tx_msg.ram_u.completion.racm_cookie =
+ rxmsg->ram_u.get.ragm_cookie;
+ kranal_post_fma(conn, tx);
+ }
+ }
kranal_consume_rxmsg(conn, NULL, 0);
- return PTL_OK;
+ return 0;
}
}
-ptl_err_t
-kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
- unsigned int niov, struct iovec *iov,
- size_t offset, size_t mlen, size_t rlen)
-{
- return kranal_do_recv(nal, private, msg, niov, iov, NULL,
- offset, mlen, rlen);
-}
-
-ptl_err_t
-kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
- unsigned int niov, ptl_kiov_t *kiov,
- size_t offset, size_t mlen, size_t rlen)
-{
- return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
- offset, mlen, rlen);
-}
-
int
-kranal_thread_start (int(*fn)(void *arg), void *arg)
+kranal_thread_start(int(*fn)(void *arg), void *arg, char *name)
{
- long pid = kernel_thread(fn, arg, 0);
-
- if (pid < 0)
- return(int)pid;
+ struct task_struct *task = cfs_thread_run(fn, arg, name);
- atomic_inc(&kranal_data.kra_nthreads);
- return 0;
+ if (!IS_ERR(task))
+ atomic_inc(&kranal_data.kra_nthreads);
+ return PTR_ERR(task);
}
void
kranal_thread_fini (void)
{
- atomic_dec(&kranal_data.kra_nthreads);
+ atomic_dec(&kranal_data.kra_nthreads);
}
int
kranal_check_conn_timeouts (kra_conn_t *conn)
{
kra_tx_t *tx;
- struct list_head *ttmp;
+ cfs_list_t *ttmp;
unsigned long flags;
long timeout;
unsigned long now = jiffies;
LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
conn->rac_state == RANAL_CONN_CLOSING);
- if (!conn->rac_close_sent &&
- time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
- /* not sent in a while; schedule conn so scheduler sends a keepalive */
- CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n",
- conn, conn->rac_peer->rap_nid);
- kranal_schedule_conn(conn);
- }
-
- timeout = conn->rac_timeout * HZ;
-
- if (!conn->rac_close_recvd &&
- time_after_eq(now, conn->rac_last_rx + timeout)) {
- CERROR("%s received from "LPX64" within %lu seconds\n",
- (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
- "Nothing" : "CLOSE not",
- conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
- return -ETIMEDOUT;
- }
+ if (!conn->rac_close_sent &&
+ cfs_time_aftereq(now, conn->rac_last_tx +
+ msecs_to_jiffies(conn->rac_keepalive *
+ MSEC_PER_SEC))) {
+ /* not sent in a while; schedule conn so scheduler sends a keepalive */
+ CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
+ conn, libcfs_nid2str(conn->rac_peer->rap_nid));
+ kranal_schedule_conn(conn);
+ }
+
+ timeout = msecs_to_jiffies(conn->rac_timeout * MSEC_PER_SEC);
+
+ if (!conn->rac_close_recvd &&
+ cfs_time_aftereq(now, conn->rac_last_rx + timeout)) {
+ CERROR("%s received from %s within %lu seconds\n",
+ (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
+ "Nothing" : "CLOSE not",
+ libcfs_nid2str(conn->rac_peer->rap_nid),
+ jiffies_to_msecs(now - conn->rac_last_rx)/MSEC_PER_SEC);
+ return -ETIMEDOUT;
+ }
if (conn->rac_state != RANAL_CONN_ESTABLISHED)
return 0;
* in case of hardware/software errors that make this conn seem
* responsive even though it isn't progressing its message queues. */
- spin_lock_irqsave(&conn->rac_lock, flags);
-
- list_for_each (ttmp, &conn->rac_fmaq) {
- tx = list_entry(ttmp, kra_tx_t, tx_list);
-
- if (time_after_eq(now, tx->tx_qtime + timeout)) {
- spin_unlock_irqrestore(&conn->rac_lock, flags);
- CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
- conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
- return -ETIMEDOUT;
- }
- }
-
- list_for_each (ttmp, &conn->rac_rdmaq) {
- tx = list_entry(ttmp, kra_tx_t, tx_list);
-
- if (time_after_eq(now, tx->tx_qtime + timeout)) {
- spin_unlock_irqrestore(&conn->rac_lock, flags);
- CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
- conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
- return -ETIMEDOUT;
- }
- }
-
- list_for_each (ttmp, &conn->rac_replyq) {
- tx = list_entry(ttmp, kra_tx_t, tx_list);
-
- if (time_after_eq(now, tx->tx_qtime + timeout)) {
- spin_unlock_irqrestore(&conn->rac_lock, flags);
- CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
- conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
- return -ETIMEDOUT;
- }
- }
-
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_lock_irqsave(&conn->rac_lock, flags);
+
+ cfs_list_for_each (ttmp, &conn->rac_fmaq) {
+ tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
+
+ if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
+ CERROR("tx on fmaq for %s blocked %lu seconds\n",
+ libcfs_nid2str(conn->rac_peer->rap_nid),
+ jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
+ return -ETIMEDOUT;
+ }
+ }
+
+ cfs_list_for_each (ttmp, &conn->rac_rdmaq) {
+ tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
+
+ if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
+ CERROR("tx on rdmaq for %s blocked %lu seconds\n",
+ libcfs_nid2str(conn->rac_peer->rap_nid),
+ jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
+ return -ETIMEDOUT;
+ }
+ }
+
+ cfs_list_for_each (ttmp, &conn->rac_replyq) {
+ tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
+
+ if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
+ CERROR("tx on replyq for %s blocked %lu seconds\n",
+ libcfs_nid2str(conn->rac_peer->rap_nid),
+ jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
+ return -ETIMEDOUT;
+ }
+ }
+
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
return 0;
}
void
kranal_reaper_check (int idx, unsigned long *min_timeoutp)
{
- struct list_head *conns = &kranal_data.kra_conns[idx];
- struct list_head *ctmp;
+ cfs_list_t *conns = &kranal_data.kra_conns[idx];
+ cfs_list_t *ctmp;
kra_conn_t *conn;
unsigned long flags;
int rc;
again:
/* NB. We expect to check all the conns and not find any problems, so
* we just use a shared lock while we take a look... */
- read_lock(&kranal_data.kra_global_lock);
+ read_lock(&kranal_data.kra_global_lock);
- list_for_each (ctmp, conns) {
- conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
+ cfs_list_for_each (ctmp, conns) {
+ conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist);
if (conn->rac_timeout < *min_timeoutp )
*min_timeoutp = conn->rac_timeout;
continue;
kranal_conn_addref(conn);
- read_unlock(&kranal_data.kra_global_lock);
+ read_unlock(&kranal_data.kra_global_lock);
- CERROR("Conn to "LPX64", cqid %d timed out\n",
- conn->rac_peer->rap_nid, conn->rac_cqid);
+ CERROR("Conn to %s, cqid %d timed out\n",
+ libcfs_nid2str(conn->rac_peer->rap_nid),
+ conn->rac_cqid);
- write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+ write_lock_irqsave(&kranal_data.kra_global_lock, flags);
switch (conn->rac_state) {
default:
break;
}
- write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+ write_unlock_irqrestore(&kranal_data.kra_global_lock,
+ flags);
kranal_conn_decref(conn);
goto again;
}
- read_unlock(&kranal_data.kra_global_lock);
+ read_unlock(&kranal_data.kra_global_lock);
}
int
kranal_connd (void *arg)
{
- long id = (long)arg;
- char name[16];
- wait_queue_t wait;
- unsigned long flags;
- kra_peer_t *peer;
- kra_acceptsock_t *ras;
- int did_something;
+ long id = (long)arg;
+ wait_queue_t wait;
+ unsigned long flags;
+ kra_peer_t *peer;
+ kra_acceptsock_t *ras;
+ int did_something;
- snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
- kportal_daemonize(name);
- kportal_blockallsigs();
+ cfs_block_allsigs();
- init_waitqueue_entry(&wait, current);
+ init_waitqueue_entry_current(&wait);
- spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+ spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
- while (!kranal_data.kra_shutdown) {
- did_something = 0;
+ while (!kranal_data.kra_shutdown) {
+ did_something = 0;
- if (!list_empty(&kranal_data.kra_connd_acceptq)) {
- ras = list_entry(kranal_data.kra_connd_acceptq.next,
- kra_acceptsock_t, ras_list);
- list_del(&ras->ras_list);
+ if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
+ ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next,
+ kra_acceptsock_t, ras_list);
+ cfs_list_del(&ras->ras_list);
- spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+ spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
+ flags);
- CDEBUG(D_NET,"About to handshake someone\n");
+ CDEBUG(D_NET,"About to handshake someone\n");
- kranal_conn_handshake(ras->ras_sock, NULL);
- kranal_free_acceptsock(ras);
+ kranal_conn_handshake(ras->ras_sock, NULL);
+ kranal_free_acceptsock(ras);
- CDEBUG(D_NET,"Finished handshaking someone\n");
+ CDEBUG(D_NET,"Finished handshaking someone\n");
- spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
- did_something = 1;
- }
+ spin_lock_irqsave(&kranal_data.kra_connd_lock,
+ flags);
+ did_something = 1;
+ }
- if (!list_empty(&kranal_data.kra_connd_peers)) {
- peer = list_entry(kranal_data.kra_connd_peers.next,
- kra_peer_t, rap_connd_list);
+ if (!cfs_list_empty(&kranal_data.kra_connd_peers)) {
+ peer = cfs_list_entry(kranal_data.kra_connd_peers.next,
+ kra_peer_t, rap_connd_list);
- list_del_init(&peer->rap_connd_list);
- spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+ cfs_list_del_init(&peer->rap_connd_list);
+ spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
+ flags);
- kranal_connect(peer);
- kranal_peer_decref(peer);
+ kranal_connect(peer);
+ kranal_peer_decref(peer);
- spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
- did_something = 1;
- }
+ spin_lock_irqsave(&kranal_data.kra_connd_lock,
+ flags);
+ did_something = 1;
+ }
- if (did_something)
- continue;
+ if (did_something)
+ continue;
- set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
+ set_current_state(TASK_INTERRUPTIBLE);
+ add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
- spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+ spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
- schedule ();
+ waitq_wait(&wait, TASK_INTERRUPTIBLE);
- set_current_state(TASK_RUNNING);
- remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
+ set_current_state(TASK_RUNNING);
+ remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
- spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
- }
+ spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
+ }
- spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
+ spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
- kranal_thread_fini();
- return 0;
+ kranal_thread_fini();
+ return 0;
}
void
LASSERT (timeout > 0);
- spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+ spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
if (timeout < kranal_data.kra_new_min_timeout)
kranal_data.kra_new_min_timeout = timeout;
- spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
+ spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
}
int
kranal_reaper (void *arg)
{
- wait_queue_t wait;
- unsigned long flags;
- long timeout;
- int i;
- int conn_entries = kranal_data.kra_conn_hash_size;
- int conn_index = 0;
- int base_index = conn_entries - 1;
- unsigned long next_check_time = jiffies;
- long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
- long current_min_timeout = 1;
-
- kportal_daemonize("kranal_reaper");
- kportal_blockallsigs();
-
- init_waitqueue_entry(&wait, current);
-
- spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-
- while (!kranal_data.kra_shutdown) {
- /* I wake up every 'p' seconds to check for timeouts on some
- * more peers. I try to check every connection 'n' times
- * within the global minimum of all keepalive and timeout
- * intervals, to ensure I attend to every connection within
- * (n+1)/n times its timeout intervals. */
- const int p = 1;
- const int n = 3;
- unsigned long min_timeout;
- int chunk;
-
- /* careful with the jiffy wrap... */
- timeout = (long)(next_check_time - jiffies);
- if (timeout > 0) {
- set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
-
- spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
-
- schedule_timeout(timeout);
-
- spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-
- set_current_state(TASK_RUNNING);
- remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
- continue;
- }
-
- if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
- /* new min timeout set: restart min timeout scan */
- next_min_timeout = MAX_SCHEDULE_TIMEOUT;
- base_index = conn_index - 1;
- if (base_index < 0)
- base_index = conn_entries - 1;
-
- if (kranal_data.kra_new_min_timeout < current_min_timeout) {
- current_min_timeout = kranal_data.kra_new_min_timeout;
- CDEBUG(D_NET, "Set new min timeout %ld\n",
- current_min_timeout);
- }
-
- kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
- }
- min_timeout = current_min_timeout;
-
- spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
-
- LASSERT (min_timeout > 0);
-
- /* Compute how many table entries to check now so I get round
- * the whole table fast enough given that I do this at fixed
- * intervals of 'p' seconds) */
- chunk = conn_entries;
- if (min_timeout > n * p)
- chunk = (chunk * n * p) / min_timeout;
- if (chunk == 0)
- chunk = 1;
-
- for (i = 0; i < chunk; i++) {
- kranal_reaper_check(conn_index,
- &next_min_timeout);
- conn_index = (conn_index + 1) % conn_entries;
- }
-
- next_check_time += p * HZ;
-
- spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
-
- if (((conn_index - chunk <= base_index &&
- base_index < conn_index) ||
- (conn_index - conn_entries - chunk <= base_index &&
- base_index < conn_index - conn_entries))) {
-
- /* Scanned all conns: set current_min_timeout... */
- if (current_min_timeout != next_min_timeout) {
- current_min_timeout = next_min_timeout;
- CDEBUG(D_NET, "Set new min timeout %ld\n",
- current_min_timeout);
- }
-
- /* ...and restart min timeout scan */
- next_min_timeout = MAX_SCHEDULE_TIMEOUT;
- base_index = conn_index - 1;
- if (base_index < 0)
- base_index = conn_entries - 1;
- }
- }
-
- kranal_thread_fini();
- return 0;
+ wait_queue_t wait;
+ unsigned long flags;
+ long timeout;
+ int i;
+ int conn_entries = kranal_data.kra_conn_hash_size;
+ int conn_index = 0;
+ int base_index = conn_entries - 1;
+ unsigned long next_check_time = jiffies;
+ long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+ long current_min_timeout = 1;
+
+ cfs_block_allsigs();
+
+ init_waitqueue_entry_current(&wait);
+
+ spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+
+ while (!kranal_data.kra_shutdown) {
+ /* I wake up every 'p' seconds to check for timeouts on some
+ * more peers. I try to check every connection 'n' times
+ * within the global minimum of all keepalive and timeout
+ * intervals, to ensure I attend to every connection within
+ * (n+1)/n times its timeout intervals. */
+ const int p = 1;
+ const int n = 3;
+ unsigned long min_timeout;
+ int chunk;
+
+ /* careful with the jiffy wrap... */
+ timeout = (long)(next_check_time - jiffies);
+ if (timeout > 0) {
+ set_current_state(TASK_INTERRUPTIBLE);
+ add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+
+ spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
+ flags);
+
+ waitq_timedwait(&wait, TASK_INTERRUPTIBLE,
+ timeout);
+
+ spin_lock_irqsave(&kranal_data.kra_reaper_lock,
+ flags);
+
+ set_current_state(TASK_RUNNING);
+ remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
+ continue;
+ }
+
+ if (kranal_data.kra_new_min_timeout !=
+ MAX_SCHEDULE_TIMEOUT) {
+ /* new min timeout set: restart min timeout scan */
+ next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+ base_index = conn_index - 1;
+ if (base_index < 0)
+ base_index = conn_entries - 1;
+
+ if (kranal_data.kra_new_min_timeout <
+ current_min_timeout) {
+ current_min_timeout =
+ kranal_data.kra_new_min_timeout;
+ CDEBUG(D_NET, "Set new min timeout %ld\n",
+ current_min_timeout);
+ }
+
+ kranal_data.kra_new_min_timeout =
+ MAX_SCHEDULE_TIMEOUT;
+ }
+ min_timeout = current_min_timeout;
+
+ spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
+
+ LASSERT (min_timeout > 0);
+
+ /* Compute how many table entries to check now so I get round
+ * the whole table fast enough given that I do this at fixed
+ * intervals of 'p' seconds) */
+ chunk = conn_entries;
+ if (min_timeout > n * p)
+ chunk = (chunk * n * p) / min_timeout;
+ if (chunk == 0)
+ chunk = 1;
+
+ for (i = 0; i < chunk; i++) {
+ kranal_reaper_check(conn_index,
+ &next_min_timeout);
+ conn_index = (conn_index + 1) % conn_entries;
+ }
+
+ next_check_time += msecs_to_jiffies(p * MSEC_PER_SEC);
+
+ spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
+
+ if (((conn_index - chunk <= base_index &&
+ base_index < conn_index) ||
+ (conn_index - conn_entries - chunk <= base_index &&
+ base_index < conn_index - conn_entries))) {
+
+ /* Scanned all conns: set current_min_timeout... */
+ if (current_min_timeout != next_min_timeout) {
+ current_min_timeout = next_min_timeout;
+ CDEBUG(D_NET, "Set new min timeout %ld\n",
+ current_min_timeout);
+ }
+
+ /* ...and restart min timeout scan */
+ next_min_timeout = MAX_SCHEDULE_TIMEOUT;
+ base_index = conn_index - 1;
+ if (base_index < 0)
+ base_index = conn_entries - 1;
+ }
+ }
+
+ kranal_thread_fini();
+ return 0;
}
void
LASSERT (rrc == RAP_SUCCESS);
LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
- read_lock(&kranal_data.kra_global_lock);
+ read_lock(&kranal_data.kra_global_lock);
conn = kranal_cqid2conn_locked(cqid);
if (conn == NULL) {
/* Conn was destroyed? */
CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
- read_unlock(&kranal_data.kra_global_lock);
+ read_unlock(&kranal_data.kra_global_lock);
continue;
}
LASSERT (rrc == RAP_SUCCESS);
CDEBUG(D_NET, "Completed %p\n",
- list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
+ cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
- spin_lock_irqsave(&conn->rac_lock, flags);
+ spin_lock_irqsave(&conn->rac_lock, flags);
- LASSERT (!list_empty(&conn->rac_rdmaq));
- tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
- list_del(&tx->tx_list);
+ LASSERT (!cfs_list_empty(&conn->rac_rdmaq));
+ tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
+ cfs_list_del(&tx->tx_list);
LASSERT(desc->AppPtr == (void *)tx);
LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
- list_add_tail(&tx->tx_list, &conn->rac_fmaq);
+ cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
tx->tx_qtime = jiffies;
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
/* Get conn's fmaq processed, now I've just put something
* there */
kranal_schedule_conn(conn);
- read_unlock(&kranal_data.kra_global_lock);
+ read_unlock(&kranal_data.kra_global_lock);
}
}
RAP_RETURN rrc;
__u32 cqid;
__u32 event_type;
- struct list_head *conns;
- struct list_head *tmp;
+ cfs_list_t *conns;
+ cfs_list_t *tmp;
int i;
for (;;) {
if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
- read_lock(&kranal_data.kra_global_lock);
+ read_lock(&kranal_data.kra_global_lock);
conn = kranal_cqid2conn_locked(cqid);
if (conn == NULL) {
kranal_schedule_conn(conn);
}
- read_unlock(&kranal_data.kra_global_lock);
+ read_unlock(&kranal_data.kra_global_lock);
continue;
}
/* FMA CQ has overflowed: check ALL conns */
- CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
+ CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n",
+ dev->rad_id);
for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
- read_lock(&kranal_data.kra_global_lock);
+ read_lock(&kranal_data.kra_global_lock);
conns = &kranal_data.kra_conns[i];
- list_for_each (tmp, conns) {
- conn = list_entry(tmp, kra_conn_t,
- rac_hashlist);
+ cfs_list_for_each (tmp, conns) {
+ conn = cfs_list_entry(tmp, kra_conn_t,
+ rac_hashlist);
if (conn->rac_device == dev)
kranal_schedule_conn(conn);
}
/* don't block write lockers for too long... */
- read_unlock(&kranal_data.kra_global_lock);
+ read_unlock(&kranal_data.kra_global_lock);
}
}
}
return 0;
case RAP_NOT_DONE:
- return -EAGAIN;
+ if (cfs_time_aftereq(jiffies,
+ conn->rac_last_tx +
+ msecs_to_jiffies(conn->rac_keepalive *
+ MSEC_PER_SEC)))
+ CWARN("EAGAIN sending %02x (idle %lu secs)\n",
+ msg->ram_type,
+ jiffies_to_msecs(jiffies - conn->rac_last_tx) /
+ MSEC_PER_SEC);
+ return -EAGAIN;
}
}
int expect_reply;
/* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
- * However I will be rescheduled some by an FMA completion event
+ * However I will be rescheduled by an FMA completion event
* when I eventually get some.
* NB 2. Sampling rac_state here races with setting it elsewhere.
* But it doesn't matter if I try to send a "real" message just
LASSERT (current == conn->rac_device->rad_scheduler);
if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
- if (!list_empty(&conn->rac_rdmaq)) {
+ if (!cfs_list_empty(&conn->rac_rdmaq)) {
/* RDMAs in progress */
LASSERT (!conn->rac_close_sent);
- if (time_after_eq(jiffies,
- conn->rac_last_tx +
- conn->rac_keepalive * HZ)) {
- CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
- kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
- kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
- }
+ if (cfs_time_aftereq(jiffies,
+ conn->rac_last_tx +
+ msecs_to_jiffies(conn->rac_keepalive *
+ MSEC_PER_SEC))) {
+ CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
+ kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
+ kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+ }
return;
}
if (conn->rac_close_sent)
return;
- CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid);
+ CWARN("sending CLOSE to %s\n",
+ libcfs_nid2str(conn->rac_peer->rap_nid));
kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
if (rc != 0)
if (!conn->rac_close_recvd)
return;
- write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+ write_lock_irqsave(&kranal_data.kra_global_lock, flags);
if (conn->rac_state == RANAL_CONN_CLOSING)
kranal_terminate_conn_locked(conn);
- write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+ write_unlock_irqrestore(&kranal_data.kra_global_lock,
+ flags);
return;
}
- spin_lock_irqsave(&conn->rac_lock, flags);
+ spin_lock_irqsave(&conn->rac_lock, flags);
- if (list_empty(&conn->rac_fmaq)) {
+ if (cfs_list_empty(&conn->rac_fmaq)) {
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
- if (time_after_eq(jiffies,
- conn->rac_last_tx + conn->rac_keepalive * HZ)) {
- CDEBUG(D_NET, "sending NOOP (idle)\n");
- kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
- kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
- }
+ if (cfs_time_aftereq(jiffies,
+ conn->rac_last_tx +
+ msecs_to_jiffies(conn->rac_keepalive *
+ MSEC_PER_SEC))) {
+ CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
+ libcfs_nid2str(conn->rac_peer->rap_nid), conn,
+ jiffies_to_msecs(jiffies - conn->rac_last_tx) /
+ MSEC_PER_SEC,
+ conn->rac_keepalive);
+ kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
+ kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
+ }
return;
}
- tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
- list_del(&tx->tx_list);
- more_to_do = !list_empty(&conn->rac_fmaq);
+ tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
+ cfs_list_del(&tx->tx_list);
+ more_to_do = !cfs_list_empty(&conn->rac_fmaq);
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
expect_reply = 0;
CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
case RANAL_MSG_IMMEDIATE:
rc = kranal_sendmsg(conn, &tx->tx_msg,
tx->tx_buffer, tx->tx_nob);
- expect_reply = 0;
break;
case RANAL_MSG_PUT_NAK:
case RANAL_MSG_GET_NAK:
case RANAL_MSG_GET_DONE:
rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
- expect_reply = 0;
break;
case RANAL_MSG_PUT_REQ:
+ rc = kranal_map_buffer(tx);
+ LASSERT (rc != -EAGAIN);
+ if (rc != 0)
+ break;
+
tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
- kranal_map_buffer(tx);
expect_reply = 1;
break;
break;
case RANAL_MSG_GET_REQ:
- kranal_map_buffer(tx);
+ rc = kranal_map_buffer(tx);
+ LASSERT (rc != -EAGAIN);
+ if (rc != 0)
+ break;
+
tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
/* I need credits to send this. Replace tx at the head of the
* fmaq and I'll get rescheduled when credits appear */
CDEBUG(D_NET, "EAGAIN on %p\n", conn);
- spin_lock_irqsave(&conn->rac_lock, flags);
- list_add(&tx->tx_list, &conn->rac_fmaq);
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_lock_irqsave(&conn->rac_lock, flags);
+ cfs_list_add(&tx->tx_list, &conn->rac_fmaq);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
return;
}
- LASSERT (rc == 0);
-
- if (!expect_reply) {
- kranal_tx_done(tx, 0);
+ if (!expect_reply || rc != 0) {
+ kranal_tx_done(tx, rc);
} else {
/* LASSERT(current) above ensures this doesn't race with reply
* processing */
- spin_lock_irqsave(&conn->rac_lock, flags);
- list_add_tail(&tx->tx_list, &conn->rac_replyq);
+ spin_lock_irqsave(&conn->rac_lock, flags);
+ cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq);
tx->tx_qtime = jiffies;
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
}
if (more_to_do) {
kra_tx_t *
kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
{
- struct list_head *ttmp;
+ cfs_list_t *ttmp;
kra_tx_t *tx;
unsigned long flags;
- spin_lock_irqsave(&conn->rac_lock, flags);
+ spin_lock_irqsave(&conn->rac_lock, flags);
- list_for_each(ttmp, &conn->rac_replyq) {
- tx = list_entry(ttmp, kra_tx_t, tx_list);
+ cfs_list_for_each(ttmp, &conn->rac_replyq) {
+ tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
tx, tx->tx_msg.ram_type, tx->tx_cookie);
continue;
if (tx->tx_msg.ram_type != type) {
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
CWARN("Unexpected type %x (%x expected) "
- "matched reply from "LPX64"\n",
+ "matched reply from %s\n",
tx->tx_msg.ram_type, type,
- conn->rac_peer->rap_nid);
+ libcfs_nid2str(conn->rac_peer->rap_nid));
return NULL;
}
- list_del(&tx->tx_list);
- spin_unlock_irqrestore(&conn->rac_lock, flags);
+ cfs_list_del(&tx->tx_list);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
return tx;
}
- spin_unlock_irqrestore(&conn->rac_lock, flags);
- CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n",
- type, cookie, conn->rac_peer->rap_nid);
+ spin_unlock_irqrestore(&conn->rac_lock, flags);
+ CWARN("Unmatched reply %02x/"LPX64" from %s\n",
+ type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
return NULL;
}
void *prefix;
RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
kra_peer_t *peer = conn->rac_peer;
+ int rc = 0;
+ int repost = 1;
if (rrc == RAP_NOT_DONE)
return;
if (msg->ram_magic != RANAL_MSG_MAGIC) {
if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
- CERROR("Unexpected magic %08x from "LPX64"\n",
- msg->ram_magic, peer->rap_nid);
+ CERROR("Unexpected magic %08x from %s\n",
+ msg->ram_magic, libcfs_nid2str(peer->rap_nid));
+ rc = -EPROTO;
goto out;
}
}
if (msg->ram_version != RANAL_MSG_VERSION) {
- CERROR("Unexpected protocol version %d from "LPX64"\n",
- msg->ram_version, peer->rap_nid);
+ CERROR("Unexpected protocol version %d from %s\n",
+ msg->ram_version, libcfs_nid2str(peer->rap_nid));
+ rc = -EPROTO;
goto out;
}
if (msg->ram_srcnid != peer->rap_nid) {
- CERROR("Unexpected peer "LPX64" from "LPX64"\n",
- msg->ram_srcnid, peer->rap_nid);
+ CERROR("Unexpected peer %s from %s\n",
+ libcfs_nid2str(msg->ram_srcnid),
+ libcfs_nid2str(peer->rap_nid));
+ rc = -EPROTO;
goto out;
}
if (msg->ram_connstamp != conn->rac_peer_connstamp) {
CERROR("Unexpected connstamp "LPX64"("LPX64
- " expected) from "LPX64"\n",
+ " expected) from %s\n",
msg->ram_connstamp, conn->rac_peer_connstamp,
- peer->rap_nid);
+ libcfs_nid2str(peer->rap_nid));
+ rc = -EPROTO;
goto out;
}
if (msg->ram_seq != seq) {
- CERROR("Unexpected sequence number %d(%d expected) from "
- LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
+ CERROR("Unexpected sequence number %d(%d expected) from %s\n",
+ msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
+ rc = -EPROTO;
goto out;
}
if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
/* This message signals RDMA completion... */
rrc = RapkFmaSyncWait(conn->rac_rihandle);
- LASSERT (rrc == RAP_SUCCESS);
+ if (rrc != RAP_SUCCESS) {
+ CERROR("RapkFmaSyncWait failed: %d\n", rrc);
+ rc = -ENETDOWN;
+ goto out;
+ }
}
if (conn->rac_close_recvd) {
- CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
- msg->ram_type, conn->rac_peer->rap_nid);
+ CERROR("Unexpected message %d after CLOSE from %s\n",
+ msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
+ rc = -EPROTO;
goto out;
}
if (msg->ram_type == RANAL_MSG_CLOSE) {
- CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid);
+ CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
conn->rac_close_recvd = 1;
- write_lock_irqsave(&kranal_data.kra_global_lock, flags);
+ write_lock_irqsave(&kranal_data.kra_global_lock, flags);
if (conn->rac_state == RANAL_CONN_ESTABLISHED)
kranal_close_conn_locked(conn, 0);
conn->rac_close_sent)
kranal_terminate_conn_locked(conn);
- write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
+ write_unlock_irqrestore(&kranal_data.kra_global_lock,
+ flags);
goto out;
}
case RANAL_MSG_IMMEDIATE:
CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
- lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
+ rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr,
+ msg->ram_srcnid, conn, 0);
+ repost = rc < 0;
break;
case RANAL_MSG_PUT_REQ:
CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
- lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
-
- if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
- break;
-
- tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
- if (tx == NULL)
- break;
-
- tx->tx_msg.ram_u.completion.racm_cookie =
- msg->ram_u.putreq.raprm_cookie;
- kranal_post_fma(conn, tx);
+ rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr,
+ msg->ram_srcnid, conn, 1);
+ repost = rc < 0;
break;
case RANAL_MSG_PUT_NAK:
case RANAL_MSG_GET_REQ:
CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
- lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
-
- if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
- break;
-
- tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
- if (tx == NULL)
- break;
-
- tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
- kranal_post_fma(conn, tx);
+ rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr,
+ msg->ram_srcnid, conn, 1);
+ repost = rc < 0;
break;
case RANAL_MSG_GET_NAK:
LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
+#if 0
+ /* completion message should send rdma length if we ever allow
+ * GET truncation */
+ lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
+#endif
kranal_tx_done(tx, 0);
break;
}
out:
- if (conn->rac_rxmsg != NULL)
+ if (rc < 0) /* protocol/comms error */
+ kranal_close_conn (conn, rc);
+
+ if (repost && conn->rac_rxmsg != NULL)
kranal_consume_rxmsg(conn, NULL, 0);
/* check again later */
kranal_complete_closed_conn (kra_conn_t *conn)
{
kra_tx_t *tx;
+ int nfma;
+ int nreplies;
LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
- LASSERT (list_empty(&conn->rac_list));
- LASSERT (list_empty(&conn->rac_hashlist));
+ LASSERT (cfs_list_empty(&conn->rac_list));
+ LASSERT (cfs_list_empty(&conn->rac_hashlist));
- while (!list_empty(&conn->rac_fmaq)) {
- tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
+ for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) {
+ tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
- list_del(&tx->tx_list);
+ cfs_list_del(&tx->tx_list);
kranal_tx_done(tx, -ECONNABORTED);
}
- LASSERT (list_empty(&conn->rac_rdmaq));
+ LASSERT (cfs_list_empty(&conn->rac_rdmaq));
- while (!list_empty(&conn->rac_replyq)) {
- tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
+ for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) {
+ tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
- list_del(&tx->tx_list);
+ cfs_list_del(&tx->tx_list);
kranal_tx_done(tx, -ECONNABORTED);
}
+
+ CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
+ conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
+}
+
+int kranal_process_new_conn (kra_conn_t *conn)
+{
+ RAP_RETURN rrc;
+
+ rrc = RapkCompleteSync(conn->rac_rihandle, 1);
+ if (rrc == RAP_SUCCESS)
+ return 0;
+
+ LASSERT (rrc == RAP_NOT_DONE);
+ if (!cfs_time_aftereq(jiffies, conn->rac_last_tx +
+ msecs_to_jiffies(conn->rac_timeout*MSEC_PER_SEC)))
+ return -EAGAIN;
+
+ /* Too late */
+ rrc = RapkCompleteSync(conn->rac_rihandle, 0);
+ LASSERT (rrc == RAP_SUCCESS);
+ return -ETIMEDOUT;
}
int
kranal_scheduler (void *arg)
{
- kra_device_t *dev = (kra_device_t *)arg;
- wait_queue_t wait;
- char name[16];
- kra_conn_t *conn;
- unsigned long flags;
- int busy_loops = 0;
+ kra_device_t *dev = (kra_device_t *)arg;
+ wait_queue_t wait;
+ kra_conn_t *conn;
+ unsigned long flags;
+ unsigned long deadline;
+ unsigned long soonest;
+ int nsoonest;
+ long timeout;
+ cfs_list_t *tmp;
+ cfs_list_t *nxt;
+ int rc;
+ int dropped_lock;
+ int busy_loops = 0;
- snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
- kportal_daemonize(name);
- kportal_blockallsigs();
+ cfs_block_allsigs();
- dev->rad_scheduler = current;
- init_waitqueue_entry(&wait, current);
+ dev->rad_scheduler = current;
+ init_waitqueue_entry_current(&wait);
- spin_lock_irqsave(&dev->rad_lock, flags);
+ spin_lock_irqsave(&dev->rad_lock, flags);
while (!kranal_data.kra_shutdown) {
/* Safe: kra_shutdown only set when quiescent */
if (busy_loops++ >= RANAL_RESCHED) {
- spin_unlock_irqrestore(&dev->rad_lock, flags);
+ spin_unlock_irqrestore(&dev->rad_lock, flags);
- our_cond_resched();
- busy_loops = 0;
+ cond_resched();
+ busy_loops = 0;
- spin_lock_irqsave(&dev->rad_lock, flags);
+ spin_lock_irqsave(&dev->rad_lock, flags);
}
+ dropped_lock = 0;
+
if (dev->rad_ready) {
/* Device callback fired since I last checked it */
dev->rad_ready = 0;
- spin_unlock_irqrestore(&dev->rad_lock, flags);
+ spin_unlock_irqrestore(&dev->rad_lock, flags);
+ dropped_lock = 1;
kranal_check_rdma_cq(dev);
kranal_check_fma_cq(dev);
- spin_lock_irqsave(&dev->rad_lock, flags);
+ spin_lock_irqsave(&dev->rad_lock, flags);
}
- if (!list_empty(&dev->rad_connq)) {
- /* Connection needs attention */
- conn = list_entry(dev->rad_connq.next,
- kra_conn_t, rac_schedlist);
- list_del_init(&conn->rac_schedlist);
+ cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
+ conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
+
+ cfs_list_del_init(&conn->rac_schedlist);
LASSERT (conn->rac_scheduled);
conn->rac_scheduled = 0;
- spin_unlock_irqrestore(&dev->rad_lock, flags);
+ spin_unlock_irqrestore(&dev->rad_lock, flags);
+ dropped_lock = 1;
kranal_check_fma_rx(conn);
kranal_process_fmaq(conn);
kranal_complete_closed_conn(conn);
kranal_conn_decref(conn);
-
- spin_lock_irqsave(&dev->rad_lock, flags);
- continue;
+ spin_lock_irqsave(&dev->rad_lock, flags);
}
- /* recheck device callback fired before sleeping */
- if (dev->rad_ready)
- continue;
-
- add_wait_queue(&dev->rad_waitq, &wait);
- set_current_state(TASK_INTERRUPTIBLE);
-
- spin_unlock_irqrestore(&dev->rad_lock, flags);
-
- busy_loops = 0;
- schedule();
+ nsoonest = 0;
+ soonest = jiffies;
+
+ cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
+ conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
+
+ deadline = conn->rac_last_tx + conn->rac_keepalive;
+ if (cfs_time_aftereq(jiffies, deadline)) {
+ /* Time to process this new conn */
+ spin_unlock_irqrestore(&dev->rad_lock,
+ flags);
+ dropped_lock = 1;
+
+ rc = kranal_process_new_conn(conn);
+ if (rc != -EAGAIN) {
+ /* All done with this conn */
+ spin_lock_irqsave(&dev->rad_lock,
+ flags);
+ cfs_list_del_init(&conn->rac_schedlist);
+ spin_unlock_irqrestore(&dev-> \
+ rad_lock,
+ flags);
+
+ kranal_conn_decref(conn);
+ spin_lock_irqsave(&dev->rad_lock,
+ flags);
+ continue;
+ }
+
+ /* retry with exponential backoff until HZ */
+ if (conn->rac_keepalive == 0)
+ conn->rac_keepalive = 1;
+ else if (conn->rac_keepalive <=
+ msecs_to_jiffies(MSEC_PER_SEC))
+ conn->rac_keepalive *= 2;
+ else
+ conn->rac_keepalive +=
+ msecs_to_jiffies(MSEC_PER_SEC);
+
+ deadline = conn->rac_last_tx + conn->rac_keepalive;
+ spin_lock_irqsave(&dev->rad_lock, flags);
+ }
- set_current_state(TASK_RUNNING);
- remove_wait_queue(&dev->rad_waitq, &wait);
+ /* Does this conn need attention soonest? */
+ if (nsoonest++ == 0 ||
+ !cfs_time_aftereq(deadline, soonest))
+ soonest = deadline;
+ }
- spin_lock_irqsave(&dev->rad_lock, flags);
- }
+ if (dropped_lock) /* may sleep iff I didn't drop the lock */
+ continue;
- spin_unlock_irqrestore(&dev->rad_lock, flags);
+ set_current_state(TASK_INTERRUPTIBLE);
+ add_wait_queue_exclusive(&dev->rad_waitq, &wait);
+ spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+ if (nsoonest == 0) {
+ busy_loops = 0;
+ waitq_wait(&wait, TASK_INTERRUPTIBLE);
+ } else {
+ timeout = (long)(soonest - jiffies);
+ if (timeout > 0) {
+ busy_loops = 0;
+ waitq_timedwait(&wait,
+ TASK_INTERRUPTIBLE,
+ timeout);
+ }
+ }
+
+ remove_wait_queue(&dev->rad_waitq, &wait);
+ set_current_state(TASK_RUNNING);
+ spin_lock_irqsave(&dev->rad_lock, flags);
+ }
+
+ spin_unlock_irqrestore(&dev->rad_lock, flags);
dev->rad_scheduler = NULL;
kranal_thread_fini();
return 0;
}
-
-
-lib_nal_t kranal_lib = {
- libnal_data: &kranal_data, /* NAL private data */
- libnal_send: kranal_send,
- libnal_send_pages: kranal_send_pages,
- libnal_recv: kranal_recv,
- libnal_recv_pages: kranal_recv_pages,
- libnal_dist: kranal_dist
-};