X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Fklnds%2Fralnd%2Fralnd_cb.c;h=00f1fcbd28b4855efa8ca10f478247bcf48039ce;hp=8901a2d0f2b98e68f8e90f83d4b0a8cc34d17566;hb=59071a8334bbc1a3a6d31565b7474063438d1f43;hpb=0fbc0b5373a2237e277501e993a179c3b1726e0c diff --git a/lnet/klnds/ralnd/ralnd_cb.c b/lnet/klnds/ralnd/ralnd_cb.c index 8901a2d..00f1fcb 100644 --- a/lnet/klnds/ralnd/ralnd_cb.c +++ b/lnet/klnds/ralnd/ralnd_cb.c @@ -1,66 +1,71 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: +/* + * GPL HEADER START * - * Copyright (C) 2004 Cluster File Systems, Inc. - * Author: Eric Barton + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of Lustre, http://www.lustre.org. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. * + * GPL HEADER END + */ +/* + * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2012, Intel Corporation. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lnet/klnds/ralnd/ralnd_cb.c + * + * Author: Eric Barton */ -#include "ranal.h" - -int -kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist) -{ - /* I would guess that if kranal_get_peer (nid) == NULL, - and we're not routing, then 'nid' is very distant :) */ - if ( nal->libnal_ni.ni_pid.nid == nid ) { - *dist = 0; - } else { - *dist = 1; - } - - return 0; -} +#include "ralnd.h" void -kranal_device_callback(RAP_INT32 devid) +kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg) { kra_device_t *dev; int i; unsigned long flags; - + + CDEBUG(D_NET, "callback for device %d\n", devid); + for (i = 0; i < kranal_data.kra_ndevs; i++) { dev = &kranal_data.kra_devices[i]; if (dev->rad_id != devid) continue; - spin_lock_irqsave(&dev->rad_lock, flags); + spin_lock_irqsave(&dev->rad_lock, flags); - if (!dev->rad_ready) { - dev->rad_ready = 1; - wake_up(&dev->rad_waitq); - } + if (!dev->rad_ready) { + dev->rad_ready = 1; + wake_up(&dev->rad_waitq); + } - spin_unlock_irqrestore(&dev->rad_lock, flags); + spin_unlock_irqrestore(&dev->rad_lock, flags); return; } - + CWARN("callback for unknown device %d\n", devid); } @@ -69,70 +74,47 @@ kranal_schedule_conn(kra_conn_t *conn) { kra_device_t *dev = conn->rac_device; unsigned long flags; - - spin_lock_irqsave(&dev->rad_lock, flags); - - if (!conn->rac_scheduled) { - kranal_conn_addref(conn); /* +1 ref for scheduler */ - conn->rac_scheduled = 1; - list_add_tail(&conn->rac_schedlist, &dev->rad_connq); - wake_up(&dev->rad_waitq); - } - spin_unlock_irqrestore(&dev->rad_lock, flags); + spin_lock_irqsave(&dev->rad_lock, flags); + + if (!conn->rac_scheduled) { + kranal_conn_addref(conn); /* +1 ref for scheduler */ + conn->rac_scheduled = 1; + cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns); + wake_up(&dev->rad_waitq); + } + + spin_unlock_irqrestore(&dev->rad_lock, flags); } kra_tx_t * -kranal_get_idle_tx (int may_block) +kranal_get_idle_tx (void) { unsigned long flags; - kra_tx_t *tx = NULL; - - for (;;) { - spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); + kra_tx_t *tx; - /* "normal" descriptor is free */ - if (!list_empty(&kranal_data.kra_idle_txs)) { - tx = list_entry(kranal_data.kra_idle_txs.next, - kra_tx_t, tx_list); - break; - } + spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); - if (!may_block) { - /* may dip into reserve pool */ - if (list_empty(&kranal_data.kra_idle_nblk_txs)) { - CERROR("reserved tx desc pool exhausted\n"); - break; - } - - tx = list_entry(kranal_data.kra_idle_nblk_txs.next, - kra_tx_t, tx_list); - break; - } - - /* block for idle tx */ - spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); - - wait_event(kranal_data.kra_idle_tx_waitq, - !list_empty(&kranal_data.kra_idle_txs)); + if (cfs_list_empty(&kranal_data.kra_idle_txs)) { + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); + return NULL; } - if (tx != NULL) { - list_del(&tx->tx_list); + tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list); + cfs_list_del(&tx->tx_list); - /* Allocate a new completion cookie. It might not be - * needed, but we've got a lock right now... */ - tx->tx_cookie = kranal_data.kra_next_tx_cookie++; + /* Allocate a new completion cookie. It might not be needed, but we've + * got a lock right now... */ + tx->tx_cookie = kranal_data.kra_next_tx_cookie++; - LASSERT (tx->tx_buftype == RANAL_BUF_NONE); - LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE); - LASSERT (tx->tx_conn == NULL); - LASSERT (tx->tx_libmsg[0] == NULL); - LASSERT (tx->tx_libmsg[1] == NULL); - } + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); + + LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE); + LASSERT (tx->tx_conn == NULL); + LASSERT (tx->tx_lntmsg[0] == NULL); + LASSERT (tx->tx_lntmsg[1] == NULL); - spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); - return tx; } @@ -142,57 +124,64 @@ kranal_init_msg(kra_msg_t *msg, int type) msg->ram_magic = RANAL_MSG_MAGIC; msg->ram_version = RANAL_MSG_VERSION; msg->ram_type = type; - msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid; + msg->ram_srcnid = kranal_data.kra_ni->ni_nid; /* ram_connstamp gets set when FMA is sent */ } kra_tx_t * -kranal_new_tx_msg (int may_block, int type) +kranal_new_tx_msg (int type) { - kra_tx_t *tx = kranal_get_idle_tx(may_block); + kra_tx_t *tx = kranal_get_idle_tx(); - if (tx == NULL) - return NULL; + if (tx != NULL) + kranal_init_msg(&tx->tx_msg, type); - kranal_init_msg(&tx->tx_msg, type); return tx; } int -kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov, +kranal_setup_immediate_buffer (kra_tx_t *tx, + unsigned int niov, struct iovec *iov, int offset, int nob) - + { /* For now this is almost identical to kranal_setup_virt_buffer, but we * could "flatten" the payload into a single contiguous buffer ready * for sending direct over an FMA if we ever needed to. */ - LASSERT (nob > 0); - LASSERT (niov > 0); LASSERT (tx->tx_buftype == RANAL_BUF_NONE); + LASSERT (nob >= 0); - while (offset >= iov->iov_len) { - offset -= iov->iov_len; - niov--; - iov++; + if (nob == 0) { + tx->tx_buffer = NULL; + } else { LASSERT (niov > 0); - } - if (nob > iov->iov_len - offset) { - CERROR("Can't handle multiple vaddr fragments\n"); - return -EMSGSIZE; + while (offset >= iov->iov_len) { + offset -= iov->iov_len; + niov--; + iov++; + LASSERT (niov > 0); + } + + if (nob > iov->iov_len - offset) { + CERROR("Can't handle multiple vaddr fragments\n"); + return -EMSGSIZE; + } + + tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); } tx->tx_buftype = RANAL_BUF_IMMEDIATE; tx->tx_nob = nob; - tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset); return 0; } int -kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, +kranal_setup_virt_buffer (kra_tx_t *tx, + unsigned int niov, struct iovec *iov, int offset, int nob) - + { LASSERT (nob > 0); LASSERT (niov > 0); @@ -217,7 +206,7 @@ kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov, } int -kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, +kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov, int offset, int nob) { RAP_PHYS_REGION *phys = tx->tx_phys; @@ -239,9 +228,8 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED; tx->tx_nob = nob; tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset)); - - phys->Address = kranal_page2phys(kiov->kiov_page); - phys->Length = PAGE_SIZE; + + phys->Address = lnet_page2phys(kiov->kiov_page); phys++; resid = nob - (kiov->kiov_len - offset); @@ -251,23 +239,22 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, LASSERT (nkiov > 0); if (kiov->kiov_offset != 0 || - ((resid > PAGE_SIZE) && + ((resid > PAGE_SIZE) && kiov->kiov_len < PAGE_SIZE)) { /* Can't have gaps */ CERROR("Can't make payload contiguous in I/O VM:" - "page %d, offset %d, len %d \n", - phys - tx->tx_phys, - kiov->kiov_offset, kiov->kiov_len); + "page %d, offset %d, len %d \n", + (int)(phys - tx->tx_phys), + kiov->kiov_offset, kiov->kiov_len); return -EINVAL; } - if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) { - CERROR ("payload too big (%d)\n", phys - tx->tx_phys); + if ((phys - tx->tx_phys) == LNET_MAX_IOV) { + CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys)); return -EMSGSIZE; } - phys->Address = kranal_page2phys(kiov->kiov_page); - phys->Length = PAGE_SIZE; + phys->Address = lnet_page2phys(kiov->kiov_page); phys++; resid -= PAGE_SIZE; @@ -278,19 +265,19 @@ kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov, } static inline int -kranal_setup_rdma_buffer (kra_tx_t *tx, int niov, - struct iovec *iov, ptl_kiov_t *kiov, +kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov, + struct iovec *iov, lnet_kiov_t *kiov, int offset, int nob) { LASSERT ((iov == NULL) != (kiov == NULL)); - + if (kiov != NULL) return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob); - + return kranal_setup_virt_buffer(tx, niov, iov, offset, nob); } -void +int kranal_map_buffer (kra_tx_t *tx) { kra_conn_t *conn = tx->tx_conn; @@ -302,28 +289,50 @@ kranal_map_buffer (kra_tx_t *tx) switch (tx->tx_buftype) { default: LBUG(); - + case RANAL_BUF_NONE: case RANAL_BUF_IMMEDIATE: case RANAL_BUF_PHYS_MAPPED: case RANAL_BUF_VIRT_MAPPED: - break; - + return 0; + case RANAL_BUF_PHYS_UNMAPPED: rrc = RapkRegisterPhys(dev->rad_handle, tx->tx_phys, tx->tx_phys_npages, - dev->rad_ptag, &tx->tx_map_key); - LASSERT (rrc == RAP_SUCCESS); + &tx->tx_map_key); + if (rrc != RAP_SUCCESS) { + CERROR ("Can't map %d pages: dev %d " + "phys %u pp %u, virt %u nob %lu\n", + tx->tx_phys_npages, dev->rad_id, + dev->rad_nphysmap, dev->rad_nppphysmap, + dev->rad_nvirtmap, dev->rad_nobvirtmap); + return -ENOMEM; /* assume insufficient resources */ + } + + dev->rad_nphysmap++; + dev->rad_nppphysmap += tx->tx_phys_npages; + tx->tx_buftype = RANAL_BUF_PHYS_MAPPED; - break; + return 0; case RANAL_BUF_VIRT_UNMAPPED: rrc = RapkRegisterMemory(dev->rad_handle, tx->tx_buffer, tx->tx_nob, - dev->rad_ptag, &tx->tx_map_key); - LASSERT (rrc == RAP_SUCCESS); + &tx->tx_map_key); + if (rrc != RAP_SUCCESS) { + CERROR ("Can't map %d bytes: dev %d " + "phys %u pp %u, virt %u nob %lu\n", + tx->tx_nob, dev->rad_id, + dev->rad_nphysmap, dev->rad_nppphysmap, + dev->rad_nvirtmap, dev->rad_nobvirtmap); + return -ENOMEM; /* assume insufficient resources */ + } + + dev->rad_nvirtmap++; + dev->rad_nobvirtmap += tx->tx_nob; + tx->tx_buftype = RANAL_BUF_VIRT_MAPPED; - break; + return 0; } } @@ -336,20 +345,24 @@ kranal_unmap_buffer (kra_tx_t *tx) switch (tx->tx_buftype) { default: LBUG(); - + case RANAL_BUF_NONE: case RANAL_BUF_IMMEDIATE: case RANAL_BUF_PHYS_UNMAPPED: case RANAL_BUF_VIRT_UNMAPPED: break; - + case RANAL_BUF_PHYS_MAPPED: LASSERT (tx->tx_conn != NULL); dev = tx->tx_conn->rac_device; LASSERT (current == dev->rad_scheduler); rrc = RapkDeregisterMemory(dev->rad_handle, NULL, - dev->rad_ptag, &tx->tx_map_key); + &tx->tx_map_key); LASSERT (rrc == RAP_SUCCESS); + + dev->rad_nphysmap--; + dev->rad_nppphysmap -= tx->tx_phys_npages; + tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED; break; @@ -358,8 +371,12 @@ kranal_unmap_buffer (kra_tx_t *tx) dev = tx->tx_conn->rac_device; LASSERT (current == dev->rad_scheduler); rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer, - dev->rad_ptag, &tx->tx_map_key); + &tx->tx_map_key); LASSERT (rrc == RAP_SUCCESS); + + dev->rad_nvirtmap--; + dev->rad_nobvirtmap -= tx->tx_nob; + tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED; break; } @@ -368,47 +385,44 @@ kranal_unmap_buffer (kra_tx_t *tx) void kranal_tx_done (kra_tx_t *tx, int completion) { - ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL; + lnet_msg_t *lnetmsg[2]; unsigned long flags; int i; - LASSERT (!in_interrupt()); + LASSERT (!cfs_in_interrupt()); kranal_unmap_buffer(tx); - for (i = 0; i < 2; i++) { - /* tx may have up to 2 libmsgs to finalise */ - if (tx->tx_libmsg[i] == NULL) - continue; - - lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc); - tx->tx_libmsg[i] = NULL; - } + lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL; + lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL; tx->tx_buftype = RANAL_BUF_NONE; tx->tx_msg.ram_type = RANAL_MSG_NONE; tx->tx_conn = NULL; - spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); + spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); - if (tx->tx_isnblk) { - list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs); - } else { - list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs); - wake_up(&kranal_data.kra_idle_tx_waitq); - } + cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs); - spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); + spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); + + /* finalize AFTER freeing lnet msgs */ + for (i = 0; i < 2; i++) { + if (lnetmsg[i] == NULL) + continue; + + lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion); + } } kra_conn_t * kranal_find_conn_locked (kra_peer_t *peer) { - struct list_head *tmp; + cfs_list_t *tmp; /* just return the first connection */ - list_for_each (tmp, &peer->rap_conns) { - return list_entry(tmp, kra_conn_t, rac_list); + cfs_list_for_each (tmp, &peer->rap_conns) { + return cfs_list_entry(tmp, kra_conn_t, rac_list); } return NULL; @@ -421,95 +435,109 @@ kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx) tx->tx_conn = conn; - spin_lock_irqsave(&conn->rac_lock, flags); - list_add_tail(&tx->tx_list, &conn->rac_fmaq); + spin_lock_irqsave(&conn->rac_lock, flags); + cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq); tx->tx_qtime = jiffies; - spin_unlock_irqrestore(&conn->rac_lock, flags); + spin_unlock_irqrestore(&conn->rac_lock, flags); kranal_schedule_conn(conn); } void -kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid) +kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid) { unsigned long flags; kra_peer_t *peer; kra_conn_t *conn; - unsigned long now; - rwlock_t *g_lock = &kranal_data.kra_global_lock; + int rc; + int retry; + rwlock_t *g_lock = &kranal_data.kra_global_lock; /* If I get here, I've committed to send, so I complete the tx with * failure on any problems */ - - LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */ - read_lock(g_lock); - - peer = kranal_find_peer_locked(nid); - if (peer == NULL) { - read_unlock(g_lock); - kranal_tx_done(tx, -EHOSTUNREACH); - return; - } + LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */ - conn = kranal_find_conn_locked(peer); - if (conn != NULL) { - kranal_post_fma(conn, tx); - read_unlock(g_lock); - return; + for (retry = 0; ; retry = 1) { + + read_lock(g_lock); + + peer = kranal_find_peer_locked(nid); + if (peer != NULL) { + conn = kranal_find_conn_locked(peer); + if (conn != NULL) { + kranal_post_fma(conn, tx); + read_unlock(g_lock); + return; + } + } + + /* Making connections; I'll need a write lock... */ + read_unlock(g_lock); + write_lock_irqsave(g_lock, flags); + + peer = kranal_find_peer_locked(nid); + if (peer != NULL) + break; + + write_unlock_irqrestore(g_lock, flags); + + if (retry) { + CERROR("Can't find peer %s\n", libcfs_nid2str(nid)); + kranal_tx_done(tx, -EHOSTUNREACH); + return; + } + + rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid), + lnet_acceptor_port()); + if (rc != 0) { + CERROR("Can't add peer %s: %d\n", + libcfs_nid2str(nid), rc); + kranal_tx_done(tx, rc); + return; + } } - /* Making one or more connections; I'll need a write lock... */ - read_unlock(g_lock); - write_lock_irqsave(g_lock, flags); - - peer = kranal_find_peer_locked(nid); - if (peer == NULL) { - write_unlock_irqrestore(g_lock, flags); - kranal_tx_done(tx, -EHOSTUNREACH); - return; - } - conn = kranal_find_conn_locked(peer); if (conn != NULL) { /* Connection exists; queue message on it */ kranal_post_fma(conn, tx); - write_unlock_irqrestore(g_lock, flags); + write_unlock_irqrestore(g_lock, flags); return; } - + LASSERT (peer->rap_persistence > 0); if (!peer->rap_connecting) { - LASSERT (list_empty(&peer->rap_tx_queue)); - - now = CURRENT_TIME; - if (now < peer->rap_reconnect_time) { - write_unlock_irqrestore(g_lock, flags); + LASSERT (cfs_list_empty(&peer->rap_tx_queue)); + + if (!(peer->rap_reconnect_interval == 0 || /* first attempt */ + cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) { + write_unlock_irqrestore(g_lock, flags); kranal_tx_done(tx, -EHOSTUNREACH); return; } - + peer->rap_connecting = 1; kranal_peer_addref(peer); /* extra ref for connd */ - - spin_lock(&kranal_data.kra_connd_lock); - - list_add_tail(&peer->rap_connd_list, - &kranal_data.kra_connd_peers); - wake_up(&kranal_data.kra_connd_waitq); - - spin_unlock(&kranal_data.kra_connd_lock); + + spin_lock(&kranal_data.kra_connd_lock); + + cfs_list_add_tail(&peer->rap_connd_list, + &kranal_data.kra_connd_peers); + wake_up(&kranal_data.kra_connd_waitq); + + spin_unlock(&kranal_data.kra_connd_lock); } - + /* A connection is being established; queue the message... */ - list_add_tail(&tx->tx_list, &peer->rap_tx_queue); + cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue); - write_unlock_irqrestore(g_lock, flags); + write_unlock_irqrestore(g_lock, flags); } void -kranal_rdma(kra_tx_t *tx, int type, +kranal_rdma(kra_tx_t *tx, int type, kra_rdma_desc_t *sink, int nob, __u64 cookie) { kra_conn_t *conn = tx->tx_conn; @@ -522,7 +550,7 @@ kranal_rdma(kra_tx_t *tx, int type, /* No actual race with scheduler sending CLOSE (I'm she!) */ LASSERT (current == conn->rac_device->rad_scheduler); - + memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc)); tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_rdma_desc.SrcKey = tx->tx_map_key; @@ -545,10 +573,10 @@ kranal_rdma(kra_tx_t *tx, int type, rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc); LASSERT (rrc == RAP_SUCCESS); - spin_lock_irqsave(&conn->rac_lock, flags); - list_add_tail(&tx->tx_list, &conn->rac_rdmaq); + spin_lock_irqsave(&conn->rac_lock, flags); + cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq); tx->tx_qtime = jiffies; - spin_unlock_irqrestore(&conn->rac_lock, flags); + spin_unlock_irqrestore(&conn->rac_lock, flags); } int @@ -558,241 +586,238 @@ kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob) RAP_RETURN rrc; LASSERT (conn->rac_rxmsg != NULL); + CDEBUG(D_NET, "Consuming %p\n", conn); - rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer, - &nob_received, sizeof(kra_msg_t)); + rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer, + &nob_received, sizeof(kra_msg_t)); LASSERT (rrc == RAP_SUCCESS); conn->rac_rxmsg = NULL; - if (nob_received != nob) { - CWARN("Expected %d immediate bytes but got %d\n", + if (nob_received < nob) { + CWARN("Incomplete immediate msg from %s: expected %d, got %d\n", + libcfs_nid2str(conn->rac_peer->rap_nid), nob, nob_received); return -EPROTO; } - + return 0; } -ptl_err_t -kranal_do_send (lib_nal_t *nal, - void *private, - lib_msg_t *libmsg, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - unsigned int niov, - struct iovec *iov, - ptl_kiov_t *kiov, - size_t offset, - size_t nob) +int +kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) { - kra_conn_t *conn; - kra_tx_t *tx; - int rc; + lnet_hdr_t *hdr = &lntmsg->msg_hdr; + int type = lntmsg->msg_type; + lnet_process_id_t target = lntmsg->msg_target; + int target_is_router = lntmsg->msg_target_is_router; + int routing = lntmsg->msg_routing; + unsigned int niov = lntmsg->msg_niov; + struct iovec *iov = lntmsg->msg_iov; + lnet_kiov_t *kiov = lntmsg->msg_kiov; + unsigned int offset = lntmsg->msg_offset; + unsigned int nob = lntmsg->msg_len; + kra_tx_t *tx; + int rc; /* NB 'private' is different depending on what we're sending.... */ - CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64 - " pid %d\n", nob, niov, nid , pid); + CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n", + nob, niov, libcfs_id2str(target)); LASSERT (nob == 0 || niov > 0); - LASSERT (niov <= PTL_MD_MAX_IOV); + LASSERT (niov <= LNET_MAX_IOV); - LASSERT (!in_interrupt()); + LASSERT (!cfs_in_interrupt()); /* payload is either all vaddrs or all pages */ LASSERT (!(kiov != NULL && iov != NULL)); + if (routing) { + CERROR ("Can't route\n"); + return -EIO; + } + switch(type) { default: LBUG(); - - case PTL_MSG_REPLY: { - /* reply's 'private' is the conn that received the GET_REQ */ - conn = private; - LASSERT (conn->rac_rxmsg != NULL); - - if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) { - if (nob > RANAL_FMA_MAX_DATA) { - CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n", - nob, nid); - return PTL_FAIL; - } - break; /* RDMA not expected */ - } - - /* Incoming message consistent with immediate reply? */ - if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) { - CERROR("REPLY to "LPX64" bad msg type %x!!!\n", - nid, conn->rac_rxmsg->ram_type); - return PTL_FAIL; - } - - tx = kranal_get_idle_tx(0); - if (tx == NULL) - return PTL_FAIL; - rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob); - if (rc != 0) { - kranal_tx_done(tx, rc); - return PTL_FAIL; - } - - tx->tx_conn = conn; - tx->tx_libmsg[0] = libmsg; - - kranal_map_buffer(tx); - kranal_rdma(tx, RANAL_MSG_GET_DONE, - &conn->rac_rxmsg->ram_u.get.ragm_desc, nob, - conn->rac_rxmsg->ram_u.get.ragm_cookie); - return PTL_OK; - } + case LNET_MSG_ACK: + LASSERT (nob == 0); + break; - case PTL_MSG_GET: + case LNET_MSG_GET: LASSERT (niov == 0); LASSERT (nob == 0); /* We have to consider the eventual sink buffer rather than any * payload passed here (there isn't any, and strictly, looking - * inside libmsg is a layering violation). We send a simple + * inside lntmsg is a layering violation). We send a simple * IMMEDIATE GET if the sink buffer is mapped already and small * enough for FMA */ - if ((libmsg->md->options & PTL_MD_KIOV) == 0 && - libmsg->md->length <= RANAL_FMA_MAX_DATA && - libmsg->md->length <= kranal_tunables.kra_max_immediate) - break; + if (routing || target_is_router) + break; /* send IMMEDIATE */ + + if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 && + lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA && + lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate) + break; /* send IMMEDIATE */ - tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ); + tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ); if (tx == NULL) - return PTL_NO_SPACE; + return -ENOMEM; - if ((libmsg->md->options & PTL_MD_KIOV) == 0) - rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov, - libmsg->md->md_iov.iov, - 0, libmsg->md->length); + if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0) + rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov, + lntmsg->msg_md->md_iov.iov, + 0, lntmsg->msg_md->md_length); else - rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov, - libmsg->md->md_iov.kiov, - 0, libmsg->md->length); + rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov, + lntmsg->msg_md->md_iov.kiov, + 0, lntmsg->msg_md->md_length); if (rc != 0) { kranal_tx_done(tx, rc); - return PTL_FAIL; + return -EIO; } - tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg); - if (tx->tx_libmsg[1] == NULL) { - CERROR("Can't create reply for GET to "LPX64"\n", nid); + tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg); + if (tx->tx_lntmsg[1] == NULL) { + CERROR("Can't create reply for GET to %s\n", + libcfs_nid2str(target.nid)); kranal_tx_done(tx, rc); - return PTL_FAIL; + return -EIO; } - tx->tx_libmsg[0] = libmsg; + tx->tx_lntmsg[0] = lntmsg; tx->tx_msg.ram_u.get.ragm_hdr = *hdr; /* rest of tx_msg is setup just before it is sent */ - kranal_launch_tx(tx, nid); - return PTL_OK; - - case PTL_MSG_ACK: - LASSERT (nob == 0); - break; + kranal_launch_tx(tx, target.nid); + return 0; - case PTL_MSG_PUT: + case LNET_MSG_REPLY: + case LNET_MSG_PUT: if (kiov == NULL && /* not paged */ - nob <= RANAL_MAX_IMMEDIATE && /* small enough */ - nob <= kranal_tunables.kra_max_immediate) + nob <= RANAL_FMA_MAX_DATA && /* small enough */ + nob <= *kranal_tunables.kra_max_immediate) break; /* send IMMEDIATE */ - - tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ); + + tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ); if (tx == NULL) - return PTL_NO_SPACE; + return -ENOMEM; rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob); if (rc != 0) { kranal_tx_done(tx, rc); - return PTL_FAIL; + return -EIO; } - tx->tx_libmsg[0] = libmsg; + tx->tx_lntmsg[0] = lntmsg; tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr; /* rest of tx_msg is setup just before it is sent */ - kranal_launch_tx(tx, nid); - return PTL_OK; + kranal_launch_tx(tx, target.nid); + return 0; } + /* send IMMEDIATE */ + LASSERT (kiov == NULL); - LASSERT (nob <= RANAL_MAX_IMMEDIATE); + LASSERT (nob <= RANAL_FMA_MAX_DATA); - tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK || - type == PTL_MSG_REPLY || - in_interrupt()), - RANAL_MSG_IMMEDIATE); + tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE); if (tx == NULL) - return PTL_NO_SPACE; + return -ENOMEM; rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob); if (rc != 0) { kranal_tx_done(tx, rc); - return PTL_FAIL; + return -EIO; } - + tx->tx_msg.ram_u.immediate.raim_hdr = *hdr; - tx->tx_libmsg[0] = libmsg; - kranal_launch_tx(tx, nid); - return PTL_OK; + tx->tx_lntmsg[0] = lntmsg; + kranal_launch_tx(tx, target.nid); + return 0; } -ptl_err_t -kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, struct iovec *iov, - size_t offset, size_t len) +void +kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg) { - return kranal_do_send(nal, private, cookie, - hdr, type, nid, pid, - niov, iov, NULL, - offset, len); + kra_msg_t *rxmsg = conn->rac_rxmsg; + unsigned int niov = lntmsg->msg_niov; + struct iovec *iov = lntmsg->msg_iov; + lnet_kiov_t *kiov = lntmsg->msg_kiov; + unsigned int offset = lntmsg->msg_offset; + unsigned int nob = lntmsg->msg_len; + kra_tx_t *tx; + int rc; + + tx = kranal_get_idle_tx(); + if (tx == NULL) + goto failed_0; + + rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob); + if (rc != 0) + goto failed_1; + + tx->tx_conn = conn; + + rc = kranal_map_buffer(tx); + if (rc != 0) + goto failed_1; + + tx->tx_lntmsg[0] = lntmsg; + + kranal_rdma(tx, RANAL_MSG_GET_DONE, + &rxmsg->ram_u.get.ragm_desc, nob, + rxmsg->ram_u.get.ragm_cookie); + return; + + failed_1: + kranal_tx_done(tx, -EIO); + failed_0: + lnet_finalize(ni, lntmsg, -EIO); } -ptl_err_t -kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, ptl_kiov_t *kiov, - size_t offset, size_t len) +int +kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, + void **new_private) { - return kranal_do_send(nal, private, cookie, - hdr, type, nid, pid, - niov, NULL, kiov, - offset, len); + kra_conn_t *conn = (kra_conn_t *)private; + + LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n", + libcfs_nid2str(conn->rac_peer->rap_nid)); + + return -EDEADLK; } -ptl_err_t -kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg, - unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov, - size_t offset, size_t mlen, size_t rlen) +int +kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, + int delayed, unsigned int niov, + struct iovec *iov, lnet_kiov_t *kiov, + unsigned int offset, unsigned int mlen, unsigned int rlen) { kra_conn_t *conn = private; kra_msg_t *rxmsg = conn->rac_rxmsg; kra_tx_t *tx; void *buffer; int rc; - + LASSERT (mlen <= rlen); - LASSERT (!in_interrupt()); + LASSERT (!cfs_in_interrupt()); /* Either all pages or all vaddrs */ LASSERT (!(kiov != NULL && iov != NULL)); + CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg); + switch(rxmsg->ram_type) { default: LBUG(); - return PTL_FAIL; - + case RANAL_MSG_IMMEDIATE: if (mlen == 0) { buffer = NULL; } else if (kiov != NULL) { CERROR("Can't recv immediate into paged buffer\n"); - return PTL_FAIL; + return -EIO; } else { LASSERT (niov > 0); while (offset >= iov->iov_len) { @@ -803,101 +828,89 @@ kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg, } if (mlen > iov->iov_len - offset) { CERROR("Can't handle immediate frags\n"); - return PTL_FAIL; + return -EIO; } buffer = ((char *)iov->iov_base) + offset; } rc = kranal_consume_rxmsg(conn, buffer, mlen); - lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL); - return PTL_OK; - - case RANAL_MSG_GET_REQ: - /* If the GET matched, we've already handled it in - * kranal_do_send which is called to send the REPLY. We're - * only called here to complete the GET receive (if we needed - * it which we don't, but I digress...) */ - LASSERT (libmsg == NULL); - lib_finalize(nal, NULL, libmsg, PTL_OK); - return PTL_OK; + lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO); + return 0; case RANAL_MSG_PUT_REQ: - if (libmsg == NULL) { /* PUT didn't match... */ - lib_finalize(nal, NULL, libmsg, PTL_OK); - return PTL_OK; + tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK); + if (tx == NULL) { + kranal_consume_rxmsg(conn, NULL, 0); + return -ENOMEM; } - tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK); - if (tx == NULL) - return PTL_NO_SPACE; - rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen); if (rc != 0) { kranal_tx_done(tx, rc); - return PTL_FAIL; + kranal_consume_rxmsg(conn, NULL, 0); + return -EIO; } tx->tx_conn = conn; - kranal_map_buffer(tx); - - tx->tx_msg.ram_u.putack.rapam_src_cookie = + rc = kranal_map_buffer(tx); + if (rc != 0) { + kranal_tx_done(tx, rc); + kranal_consume_rxmsg(conn, NULL, 0); + return -EIO; + } + + tx->tx_msg.ram_u.putack.rapam_src_cookie = conn->rac_rxmsg->ram_u.putreq.raprm_cookie; tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie; tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key; - tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = + tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen; - tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */ + tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */ kranal_post_fma(conn, tx); - - /* flag matched by consuming rx message */ kranal_consume_rxmsg(conn, NULL, 0); - return PTL_OK; - } -} - -ptl_err_t -kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, struct iovec *iov, - size_t offset, size_t mlen, size_t rlen) -{ - return kranal_recvmsg(nal, private, msg, niov, iov, NULL, - offset, mlen, rlen); -} + return 0; -ptl_err_t -kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg, - unsigned int niov, ptl_kiov_t *kiov, - size_t offset, size_t mlen, size_t rlen) -{ - return kranal_recvmsg(nal, private, msg, niov, NULL, kiov, - offset, mlen, rlen); + case RANAL_MSG_GET_REQ: + if (lntmsg != NULL) { + /* Matched! */ + kranal_reply(ni, conn, lntmsg); + } else { + /* No match */ + tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK); + if (tx != NULL) { + tx->tx_msg.ram_u.completion.racm_cookie = + rxmsg->ram_u.get.ragm_cookie; + kranal_post_fma(conn, tx); + } + } + kranal_consume_rxmsg(conn, NULL, 0); + return 0; + } } int -kranal_thread_start (int(*fn)(void *arg), void *arg) +kranal_thread_start(int(*fn)(void *arg), void *arg, char *name) { - long pid = kernel_thread(fn, arg, 0); + struct task_struct *task = cfs_thread_run(fn, arg, name); - if (pid < 0) - return(int)pid; - - atomic_inc(&kranal_data.kra_nthreads); - return 0; + if (!IS_ERR(task)) + cfs_atomic_inc(&kranal_data.kra_nthreads); + return PTR_ERR(task); } void kranal_thread_fini (void) { - atomic_dec(&kranal_data.kra_nthreads); + cfs_atomic_dec(&kranal_data.kra_nthreads); } int kranal_check_conn_timeouts (kra_conn_t *conn) { kra_tx_t *tx; - struct list_head *ttmp; + cfs_list_t *ttmp; unsigned long flags; long timeout; unsigned long now = jiffies; @@ -905,72 +918,81 @@ kranal_check_conn_timeouts (kra_conn_t *conn) LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED || conn->rac_state == RANAL_CONN_CLOSING); - if (!conn->rac_close_sent && - time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) { - /* not sent in a while; schedule conn so scheduler sends a keepalive */ - kranal_schedule_conn(conn); - } - - timeout = conn->rac_timeout * HZ; - - if (!conn->rac_close_recvd && - time_after_eq(now, conn->rac_last_rx + timeout)) { - CERROR("Nothing received from "LPX64" within %lu seconds\n", - conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ); - return -ETIMEDOUT; - } + if (!conn->rac_close_sent && + cfs_time_aftereq(now, conn->rac_last_tx + conn->rac_keepalive * + HZ)) { + /* not sent in a while; schedule conn so scheduler sends a keepalive */ + CDEBUG(D_NET, "Scheduling keepalive %p->%s\n", + conn, libcfs_nid2str(conn->rac_peer->rap_nid)); + kranal_schedule_conn(conn); + } + + timeout = conn->rac_timeout * HZ; + + if (!conn->rac_close_recvd && + cfs_time_aftereq(now, conn->rac_last_rx + timeout)) { + CERROR("%s received from %s within %lu seconds\n", + (conn->rac_state == RANAL_CONN_ESTABLISHED) ? + "Nothing" : "CLOSE not", + libcfs_nid2str(conn->rac_peer->rap_nid), + (now - conn->rac_last_rx)/HZ); + return -ETIMEDOUT; + } if (conn->rac_state != RANAL_CONN_ESTABLISHED) return 0; - + /* Check the conn's queues are moving. These are "belt+braces" checks, * in case of hardware/software errors that make this conn seem * responsive even though it isn't progressing its message queues. */ - spin_lock_irqsave(&conn->rac_lock, flags); - - list_for_each (ttmp, &conn->rac_fmaq) { - tx = list_entry(ttmp, kra_tx_t, tx_list); - - if (time_after_eq(now, tx->tx_qtime + timeout)) { - spin_unlock_irqrestore(&conn->rac_lock, flags); - CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n", - conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ); - return -ETIMEDOUT; - } - } - - list_for_each (ttmp, &conn->rac_rdmaq) { - tx = list_entry(ttmp, kra_tx_t, tx_list); - - if (time_after_eq(now, tx->tx_qtime + timeout)) { - spin_unlock_irqrestore(&conn->rac_lock, flags); - CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n", - conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ); - return -ETIMEDOUT; - } - } - - list_for_each (ttmp, &conn->rac_replyq) { - tx = list_entry(ttmp, kra_tx_t, tx_list); - - if (time_after_eq(now, tx->tx_qtime + timeout)) { - spin_unlock_irqrestore(&conn->rac_lock, flags); - CERROR("tx on replyq for "LPX64" blocked %lu seconds\n", - conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ); - return -ETIMEDOUT; - } - } - - spin_unlock_irqrestore(&conn->rac_lock, flags); + spin_lock_irqsave(&conn->rac_lock, flags); + + cfs_list_for_each (ttmp, &conn->rac_fmaq) { + tx = cfs_list_entry(ttmp, kra_tx_t, tx_list); + + if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) { + spin_unlock_irqrestore(&conn->rac_lock, flags); + CERROR("tx on fmaq for %s blocked %lu seconds\n", + libcfs_nid2str(conn->rac_peer->rap_nid), + (now - tx->tx_qtime)/HZ); + return -ETIMEDOUT; + } + } + + cfs_list_for_each (ttmp, &conn->rac_rdmaq) { + tx = cfs_list_entry(ttmp, kra_tx_t, tx_list); + + if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) { + spin_unlock_irqrestore(&conn->rac_lock, flags); + CERROR("tx on rdmaq for %s blocked %lu seconds\n", + libcfs_nid2str(conn->rac_peer->rap_nid), + (now - tx->tx_qtime)/HZ); + return -ETIMEDOUT; + } + } + + cfs_list_for_each (ttmp, &conn->rac_replyq) { + tx = cfs_list_entry(ttmp, kra_tx_t, tx_list); + + if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) { + spin_unlock_irqrestore(&conn->rac_lock, flags); + CERROR("tx on replyq for %s blocked %lu seconds\n", + libcfs_nid2str(conn->rac_peer->rap_nid), + (now - tx->tx_qtime)/HZ); + return -ETIMEDOUT; + } + } + + spin_unlock_irqrestore(&conn->rac_lock, flags); return 0; } void kranal_reaper_check (int idx, unsigned long *min_timeoutp) { - struct list_head *conns = &kranal_data.kra_conns[idx]; - struct list_head *ctmp; + cfs_list_t *conns = &kranal_data.kra_conns[idx]; + cfs_list_t *ctmp; kra_conn_t *conn; unsigned long flags; int rc; @@ -978,10 +1000,10 @@ kranal_reaper_check (int idx, unsigned long *min_timeoutp) again: /* NB. We expect to check all the conns and not find any problems, so * we just use a shared lock while we take a look... */ - read_lock(&kranal_data.kra_global_lock); + read_lock(&kranal_data.kra_global_lock); - list_for_each (ctmp, conns) { - conn = list_entry(ctmp, kra_conn_t, rac_hashlist); + cfs_list_for_each (ctmp, conns) { + conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist); if (conn->rac_timeout < *min_timeoutp ) *min_timeoutp = conn->rac_timeout; @@ -993,12 +1015,13 @@ kranal_reaper_check (int idx, unsigned long *min_timeoutp) continue; kranal_conn_addref(conn); - read_unlock(&kranal_data.kra_global_lock); + read_unlock(&kranal_data.kra_global_lock); - CERROR("Conn to "LPX64", cqid %d timed out\n", - conn->rac_peer->rap_nid, conn->rac_cqid); + CERROR("Conn to %s, cqid %d timed out\n", + libcfs_nid2str(conn->rac_peer->rap_nid), + conn->rac_cqid); - write_lock_irqsave(&kranal_data.kra_global_lock, flags); + write_lock_irqsave(&kranal_data.kra_global_lock, flags); switch (conn->rac_state) { default: @@ -1007,13 +1030,14 @@ kranal_reaper_check (int idx, unsigned long *min_timeoutp) case RANAL_CONN_ESTABLISHED: kranal_close_conn_locked(conn, -ETIMEDOUT); break; - + case RANAL_CONN_CLOSING: kranal_terminate_conn_locked(conn); break; } - - write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + + write_unlock_irqrestore(&kranal_data.kra_global_lock, + flags); kranal_conn_decref(conn); @@ -1021,191 +1045,218 @@ kranal_reaper_check (int idx, unsigned long *min_timeoutp) goto again; } - read_unlock(&kranal_data.kra_global_lock); + read_unlock(&kranal_data.kra_global_lock); } int kranal_connd (void *arg) { - char name[16]; - wait_queue_t wait; - unsigned long flags; - kra_peer_t *peer; + long id = (long)arg; + wait_queue_t wait; + unsigned long flags; + kra_peer_t *peer; + kra_acceptsock_t *ras; + int did_something; - snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg); - kportal_daemonize(name); - kportal_blockallsigs(); + cfs_block_allsigs(); - init_waitqueue_entry(&wait, current); + init_waitqueue_entry_current(&wait); - spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); - while (!kranal_data.kra_shutdown) { - /* Safe: kra_shutdown only set when quiescent */ + while (!kranal_data.kra_shutdown) { + did_something = 0; - if (!list_empty(&kranal_data.kra_connd_peers)) { - peer = list_entry(kranal_data.kra_connd_peers.next, - kra_peer_t, rap_connd_list); - - list_del_init(&peer->rap_connd_list); - spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) { + ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next, + kra_acceptsock_t, ras_list); + cfs_list_del(&ras->ras_list); - kranal_connect(peer); - kranal_peer_decref(peer); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, + flags); - spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); - continue; - } + CDEBUG(D_NET,"About to handshake someone\n"); - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&kranal_data.kra_connd_waitq, &wait); - - spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + kranal_conn_handshake(ras->ras_sock, NULL); + kranal_free_acceptsock(ras); - schedule (); - - set_current_state(TASK_RUNNING); - remove_wait_queue(&kranal_data.kra_connd_waitq, &wait); + CDEBUG(D_NET,"Finished handshaking someone\n"); - spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); - } + spin_lock_irqsave(&kranal_data.kra_connd_lock, + flags); + did_something = 1; + } - spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); + if (!cfs_list_empty(&kranal_data.kra_connd_peers)) { + peer = cfs_list_entry(kranal_data.kra_connd_peers.next, + kra_peer_t, rap_connd_list); - kranal_thread_fini(); - return 0; -} + cfs_list_del_init(&peer->rap_connd_list); + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, + flags); -void -kranal_update_reaper_timeout(long timeout) -{ - unsigned long flags; + kranal_connect(peer); + kranal_peer_decref(peer); - LASSERT (timeout > 0); - - spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - - if (timeout < kranal_data.kra_new_min_timeout) - kranal_data.kra_new_min_timeout = timeout; + spin_lock_irqsave(&kranal_data.kra_connd_lock, + flags); + did_something = 1; + } - spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); -} + if (did_something) + continue; -int -kranal_reaper (void *arg) -{ - wait_queue_t wait; - unsigned long flags; - long timeout; - int i; - int conn_entries = kranal_data.kra_conn_hash_size; - int conn_index = 0; - int base_index = conn_entries - 1; - unsigned long next_check_time = jiffies; - long next_min_timeout = MAX_SCHEDULE_TIMEOUT; - long current_min_timeout = 1; - - kportal_daemonize("kranal_reaper"); - kportal_blockallsigs(); - - init_waitqueue_entry(&wait, current); - - spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - - while (!kranal_data.kra_shutdown) { - - /* careful with the jiffy wrap... */ - timeout = (long)(next_check_time - jiffies); - if (timeout <= 0) { - - /* I wake up every 'p' seconds to check for - * timeouts on some more peers. I try to check - * every connection 'n' times within the global - * minimum of all keepalive and timeout intervals, - * to ensure I attend to every connection within - * (n+1)/n times its timeout intervals. */ - - const int p = 1; - const int n = 3; - unsigned long min_timeout; - int chunk; - - if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) { - /* new min timeout set: restart min timeout scan */ - next_min_timeout = MAX_SCHEDULE_TIMEOUT; - base_index = conn_index - 1; - if (base_index < 0) - base_index = conn_entries - 1; - - if (kranal_data.kra_new_min_timeout < current_min_timeout) { - current_min_timeout = kranal_data.kra_new_min_timeout; - CWARN("Set new min timeout %ld\n", - current_min_timeout); - } + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait); - kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT; - } - min_timeout = current_min_timeout; - - spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, - flags); - - LASSERT (min_timeout > 0); - - /* Compute how many table entries to check now so I - * get round the whole table fast enough (NB I do - * this at fixed intervals of 'p' seconds) */ - chunk = conn_entries; - if (min_timeout > n * p) - chunk = (chunk * n * p) / min_timeout; - if (chunk == 0) - chunk = 1; - - for (i = 0; i < chunk; i++) { - kranal_reaper_check(conn_index, - &next_min_timeout); - conn_index = (conn_index + 1) % conn_entries; - } + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); - next_check_time += p * HZ; + waitq_wait(&wait, TASK_INTERRUPTIBLE); - spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + set_current_state(TASK_RUNNING); + remove_wait_queue(&kranal_data.kra_connd_waitq, &wait); - if (((conn_index - chunk <= base_index && - base_index < conn_index) || - (conn_index - conn_entries - chunk <= base_index && - base_index < conn_index - conn_entries))) { + spin_lock_irqsave(&kranal_data.kra_connd_lock, flags); + } - /* Scanned all conns: set current_min_timeout... */ - if (current_min_timeout != next_min_timeout) { - current_min_timeout = next_min_timeout; - CWARN("Set new min timeout %ld\n", - current_min_timeout); - } + spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags); - /* ...and restart min timeout scan */ - next_min_timeout = MAX_SCHEDULE_TIMEOUT; - base_index = conn_index - 1; - if (base_index < 0) - base_index = conn_entries - 1; - } - } + kranal_thread_fini(); + return 0; +} - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&kranal_data.kra_reaper_waitq, &wait); +void +kranal_update_reaper_timeout(long timeout) +{ + unsigned long flags; - spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); + LASSERT (timeout > 0); - schedule_timeout(timeout); + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); - spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + if (timeout < kranal_data.kra_new_min_timeout) + kranal_data.kra_new_min_timeout = timeout; - set_current_state(TASK_RUNNING); - remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait); - } + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); +} - kranal_thread_fini(); - return 0; +int +kranal_reaper (void *arg) +{ + wait_queue_t wait; + unsigned long flags; + long timeout; + int i; + int conn_entries = kranal_data.kra_conn_hash_size; + int conn_index = 0; + int base_index = conn_entries - 1; + unsigned long next_check_time = jiffies; + long next_min_timeout = MAX_SCHEDULE_TIMEOUT; + long current_min_timeout = 1; + + cfs_block_allsigs(); + + init_waitqueue_entry_current(&wait); + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + + while (!kranal_data.kra_shutdown) { + /* I wake up every 'p' seconds to check for timeouts on some + * more peers. I try to check every connection 'n' times + * within the global minimum of all keepalive and timeout + * intervals, to ensure I attend to every connection within + * (n+1)/n times its timeout intervals. */ + const int p = 1; + const int n = 3; + unsigned long min_timeout; + int chunk; + + /* careful with the jiffy wrap... */ + timeout = (long)(next_check_time - jiffies); + if (timeout > 0) { + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, + flags); + + waitq_timedwait(&wait, TASK_INTERRUPTIBLE, + timeout); + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, + flags); + + set_current_state(TASK_RUNNING); + remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait); + continue; + } + + if (kranal_data.kra_new_min_timeout != + MAX_SCHEDULE_TIMEOUT) { + /* new min timeout set: restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + + if (kranal_data.kra_new_min_timeout < + current_min_timeout) { + current_min_timeout = + kranal_data.kra_new_min_timeout; + CDEBUG(D_NET, "Set new min timeout %ld\n", + current_min_timeout); + } + + kranal_data.kra_new_min_timeout = + MAX_SCHEDULE_TIMEOUT; + } + min_timeout = current_min_timeout; + + spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags); + + LASSERT (min_timeout > 0); + + /* Compute how many table entries to check now so I get round + * the whole table fast enough given that I do this at fixed + * intervals of 'p' seconds) */ + chunk = conn_entries; + if (min_timeout > n * p) + chunk = (chunk * n * p) / min_timeout; + if (chunk == 0) + chunk = 1; + + for (i = 0; i < chunk; i++) { + kranal_reaper_check(conn_index, + &next_min_timeout); + conn_index = (conn_index + 1) % conn_entries; + } + + next_check_time += p * HZ; + + spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags); + + if (((conn_index - chunk <= base_index && + base_index < conn_index) || + (conn_index - conn_entries - chunk <= base_index && + base_index < conn_index - conn_entries))) { + + /* Scanned all conns: set current_min_timeout... */ + if (current_min_timeout != next_min_timeout) { + current_min_timeout = next_min_timeout; + CDEBUG(D_NET, "Set new min timeout %ld\n", + current_min_timeout); + } + + /* ...and restart min timeout scan */ + next_min_timeout = MAX_SCHEDULE_TIMEOUT; + base_index = conn_index - 1; + if (base_index < 0) + base_index = conn_entries - 1; + } + } + + kranal_thread_fini(); + return 0; } void @@ -1220,46 +1271,51 @@ kranal_check_rdma_cq (kra_device_t *dev) __u32 event_type; for (;;) { - rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type); - if (rrc == RAP_NOT_DONE) + rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type); + if (rrc == RAP_NOT_DONE) { + CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id); return; + } LASSERT (rrc == RAP_SUCCESS); LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0); - read_lock(&kranal_data.kra_global_lock); + read_lock(&kranal_data.kra_global_lock); conn = kranal_cqid2conn_locked(cqid); if (conn == NULL) { /* Conn was destroyed? */ - CWARN("RDMA CQID lookup %d failed\n", cqid); - read_unlock(&kranal_data.kra_global_lock); + CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid); + read_unlock(&kranal_data.kra_global_lock); continue; } rrc = RapkRdmaDone(conn->rac_rihandle, &desc); LASSERT (rrc == RAP_SUCCESS); - spin_lock_irqsave(&conn->rac_lock, flags); + CDEBUG(D_NET, "Completed %p\n", + cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list)); - LASSERT (!list_empty(&conn->rac_rdmaq)); - tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list); - list_del(&tx->tx_list); + spin_lock_irqsave(&conn->rac_lock, flags); + + LASSERT (!cfs_list_empty(&conn->rac_rdmaq)); + tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list); + cfs_list_del(&tx->tx_list); LASSERT(desc->AppPtr == (void *)tx); LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE || tx->tx_msg.ram_type == RANAL_MSG_GET_DONE); - list_add_tail(&tx->tx_list, &conn->rac_fmaq); + cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq); tx->tx_qtime = jiffies; - - spin_unlock_irqrestore(&conn->rac_lock, flags); + + spin_unlock_irqrestore(&conn->rac_lock, flags); /* Get conn's fmaq processed, now I've just put something * there */ kranal_schedule_conn(conn); - read_unlock(&kranal_data.kra_global_lock); + read_unlock(&kranal_data.kra_global_lock); } } @@ -1270,50 +1326,57 @@ kranal_check_fma_cq (kra_device_t *dev) RAP_RETURN rrc; __u32 cqid; __u32 event_type; - struct list_head *conns; - struct list_head *tmp; + cfs_list_t *conns; + cfs_list_t *tmp; int i; for (;;) { - rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type); - if (rrc != RAP_NOT_DONE) + rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type); + if (rrc == RAP_NOT_DONE) { + CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id); return; - + } + LASSERT (rrc == RAP_SUCCESS); if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) { - read_lock(&kranal_data.kra_global_lock); - + read_lock(&kranal_data.kra_global_lock); + conn = kranal_cqid2conn_locked(cqid); - if (conn == NULL) - CWARN("FMA CQID lookup %d failed\n", cqid); - else + if (conn == NULL) { + CDEBUG(D_NET, "FMA CQID lookup %d failed\n", + cqid); + } else { + CDEBUG(D_NET, "FMA completed: %p CQID %d\n", + conn, cqid); kranal_schedule_conn(conn); + } - read_unlock(&kranal_data.kra_global_lock); + read_unlock(&kranal_data.kra_global_lock); continue; } /* FMA CQ has overflowed: check ALL conns */ - CWARN("Scheduling ALL conns on device %d\n", dev->rad_id); + CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n", + dev->rad_id); for (i = 0; i < kranal_data.kra_conn_hash_size; i++) { - - read_lock(&kranal_data.kra_global_lock); - + + read_lock(&kranal_data.kra_global_lock); + conns = &kranal_data.kra_conns[i]; - list_for_each (tmp, conns) { - conn = list_entry(tmp, kra_conn_t, - rac_hashlist); - + cfs_list_for_each (tmp, conns) { + conn = cfs_list_entry(tmp, kra_conn_t, + rac_hashlist); + if (conn->rac_device == dev) kranal_schedule_conn(conn); } /* don't block write lockers for too long... */ - read_unlock(&kranal_data.kra_global_lock); + read_unlock(&kranal_data.kra_global_lock); } } } @@ -1324,7 +1387,11 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, { int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0; RAP_RETURN rrc; - + + CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n", + conn, msg, msg->ram_type, sync ? "(sync)" : "", + immediate, immediatenob); + LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX); LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ? immediatenob <= RANAL_FMA_MAX_DATA : @@ -1334,11 +1401,11 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, msg->ram_seq = conn->rac_tx_seq; if (sync) - rrc = RapkFmaSyncSend(conn->rac_device->rad_handle, + rrc = RapkFmaSyncSend(conn->rac_rihandle, immediate, immediatenob, msg, sizeof(*msg)); else - rrc = RapkFmaSend(conn->rac_device->rad_handle, + rrc = RapkFmaSend(conn->rac_rihandle, immediate, immediatenob, msg, sizeof(*msg)); @@ -1350,14 +1417,20 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg, conn->rac_last_tx = jiffies; conn->rac_tx_seq++; return 0; - + case RAP_NOT_DONE: - return -EAGAIN; + if (cfs_time_aftereq(jiffies, + conn->rac_last_tx + conn->rac_keepalive * + HZ)) + CWARN("EAGAIN sending %02x (idle %lu secs)\n", + msg->ram_type, + (jiffies - conn->rac_last_tx)/HZ); + return -EAGAIN; } } void -kranal_process_fmaq (kra_conn_t *conn) +kranal_process_fmaq (kra_conn_t *conn) { unsigned long flags; int more_to_do; @@ -1366,30 +1439,36 @@ kranal_process_fmaq (kra_conn_t *conn) int expect_reply; /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now. - * However I will be rescheduled some by a rad_fma_cq event when - * I eventually get some. - * NB 2. Sampling rac_state here, races with setting it elsewhere - * kranal_close_conn_locked. But it doesn't matter if I try to - * send a "real" message just as I start closing because I'll get - * scheduled to send the close anyway. */ + * However I will be rescheduled by an FMA completion event + * when I eventually get some. + * NB 2. Sampling rac_state here races with setting it elsewhere. + * But it doesn't matter if I try to send a "real" message just + * as I start closing because I'll get scheduled to send the + * close anyway. */ + + /* Not racing with incoming message processing! */ + LASSERT (current == conn->rac_device->rad_scheduler); if (conn->rac_state != RANAL_CONN_ESTABLISHED) { - if (!list_empty(&conn->rac_rdmaq)) { + if (!cfs_list_empty(&conn->rac_rdmaq)) { /* RDMAs in progress */ LASSERT (!conn->rac_close_sent); - - if (time_after_eq(jiffies, - conn->rac_last_tx + - conn->rac_keepalive)) { - kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); - kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); - } + + if (cfs_time_aftereq(jiffies, + conn->rac_last_tx + + conn->rac_keepalive * HZ)) { + CDEBUG(D_NET, "sending NOOP (rdma in progress)\n"); + kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); + kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); + } return; } - + if (conn->rac_close_sent) return; + CWARN("sending CLOSE to %s\n", + libcfs_nid2str(conn->rac_peer->rap_nid)); kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE); rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); if (rc != 0) @@ -1398,55 +1477,69 @@ kranal_process_fmaq (kra_conn_t *conn) conn->rac_close_sent = 1; if (!conn->rac_close_recvd) return; - - write_lock_irqsave(&kranal_data.kra_global_lock, flags); + + write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (conn->rac_state == RANAL_CONN_CLOSING) kranal_terminate_conn_locked(conn); - write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + write_unlock_irqrestore(&kranal_data.kra_global_lock, + flags); return; } - spin_lock_irqsave(&conn->rac_lock, flags); + spin_lock_irqsave(&conn->rac_lock, flags); - if (list_empty(&conn->rac_fmaq)) { + if (cfs_list_empty(&conn->rac_fmaq)) { - spin_unlock_irqrestore(&conn->rac_lock, flags); + spin_unlock_irqrestore(&conn->rac_lock, flags); - if (time_after_eq(jiffies, - conn->rac_last_tx + conn->rac_keepalive)) { - kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); - kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); - } + if (cfs_time_aftereq(jiffies, + conn->rac_last_tx + conn->rac_keepalive * + HZ)) { + CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n", + libcfs_nid2str(conn->rac_peer->rap_nid), conn, + (jiffies - conn->rac_last_tx)/HZ, + conn->rac_keepalive); + kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP); + kranal_sendmsg(conn, &conn->rac_msg, NULL, 0); + } return; } - - tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); - list_del(&tx->tx_list); - more_to_do = !list_empty(&conn->rac_fmaq); - spin_unlock_irqrestore(&conn->rac_lock, flags); + tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); + cfs_list_del(&tx->tx_list); + more_to_do = !cfs_list_empty(&conn->rac_fmaq); + + spin_unlock_irqrestore(&conn->rac_lock, flags); expect_reply = 0; + CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n", + tx, tx->tx_msg.ram_type, tx->tx_cookie); switch (tx->tx_msg.ram_type) { default: LBUG(); - + case RANAL_MSG_IMMEDIATE: + rc = kranal_sendmsg(conn, &tx->tx_msg, + tx->tx_buffer, tx->tx_nob); + break; + case RANAL_MSG_PUT_NAK: case RANAL_MSG_PUT_DONE: case RANAL_MSG_GET_NAK: case RANAL_MSG_GET_DONE: - rc = kranal_sendmsg(conn, &tx->tx_msg, - tx->tx_buffer, tx->tx_nob); - expect_reply = 0; + rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); break; - + case RANAL_MSG_PUT_REQ: + rc = kranal_map_buffer(tx); + LASSERT (rc != -EAGAIN); + if (rc != 0) + break; + tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie; rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); - kranal_map_buffer(tx); expect_reply = 1; break; @@ -1456,10 +1549,14 @@ kranal_process_fmaq (kra_conn_t *conn) break; case RANAL_MSG_GET_REQ: - kranal_map_buffer(tx); + rc = kranal_map_buffer(tx); + LASSERT (rc != -EAGAIN); + if (rc != 0) + break; + tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie; tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key; - tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = + tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob; rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); @@ -1470,25 +1567,28 @@ kranal_process_fmaq (kra_conn_t *conn) if (rc == -EAGAIN) { /* I need credits to send this. Replace tx at the head of the * fmaq and I'll get rescheduled when credits appear */ - spin_lock_irqsave(&conn->rac_lock, flags); - list_add(&tx->tx_list, &conn->rac_fmaq); - spin_unlock_irqrestore(&conn->rac_lock, flags); + CDEBUG(D_NET, "EAGAIN on %p\n", conn); + spin_lock_irqsave(&conn->rac_lock, flags); + cfs_list_add(&tx->tx_list, &conn->rac_fmaq); + spin_unlock_irqrestore(&conn->rac_lock, flags); return; } - LASSERT (rc == 0); - - if (!expect_reply) { - kranal_tx_done(tx, 0); + if (!expect_reply || rc != 0) { + kranal_tx_done(tx, rc); } else { - spin_lock_irqsave(&conn->rac_lock, flags); - list_add_tail(&tx->tx_list, &conn->rac_replyq); + /* LASSERT(current) above ensures this doesn't race with reply + * processing */ + spin_lock_irqsave(&conn->rac_lock, flags); + cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq); tx->tx_qtime = jiffies; - spin_unlock_irqrestore(&conn->rac_lock, flags); + spin_unlock_irqrestore(&conn->rac_lock, flags); } - if (more_to_do) + if (more_to_do) { + CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn); kranal_schedule_conn(conn); + } } static inline void @@ -1505,25 +1605,38 @@ kranal_swab_rdma_desc (kra_rdma_desc_t *d) kra_tx_t * kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie) { - struct list_head *ttmp; + cfs_list_t *ttmp; kra_tx_t *tx; - - list_for_each(ttmp, &conn->rac_replyq) { - tx = list_entry(ttmp, kra_tx_t, tx_list); - + unsigned long flags; + + spin_lock_irqsave(&conn->rac_lock, flags); + + cfs_list_for_each(ttmp, &conn->rac_replyq) { + tx = cfs_list_entry(ttmp, kra_tx_t, tx_list); + + CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n", + tx, tx->tx_msg.ram_type, tx->tx_cookie); + if (tx->tx_cookie != cookie) continue; - + if (tx->tx_msg.ram_type != type) { + spin_unlock_irqrestore(&conn->rac_lock, flags); CWARN("Unexpected type %x (%x expected) " - "matched reply from "LPX64"\n", + "matched reply from %s\n", tx->tx_msg.ram_type, type, - conn->rac_peer->rap_nid); + libcfs_nid2str(conn->rac_peer->rap_nid)); return NULL; } + + cfs_list_del(&tx->tx_list); + spin_unlock_irqrestore(&conn->rac_lock, flags); + return tx; } - - CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid); + + spin_unlock_irqrestore(&conn->rac_lock, flags); + CWARN("Unmatched reply %02x/"LPX64" from %s\n", + type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid)); return NULL; } @@ -1537,19 +1650,29 @@ kranal_check_fma_rx (kra_conn_t *conn) void *prefix; RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix); kra_peer_t *peer = conn->rac_peer; + int rc = 0; + int repost = 1; if (rrc == RAP_NOT_DONE) return; - + + CDEBUG(D_NET, "RX on %p\n", conn); + LASSERT (rrc == RAP_SUCCESS); conn->rac_last_rx = jiffies; seq = conn->rac_rx_seq++; msg = (kra_msg_t *)prefix; + /* stash message for portals callbacks they'll NULL + * rac_rxmsg if they consume it */ + LASSERT (conn->rac_rxmsg == NULL); + conn->rac_rxmsg = msg; + if (msg->ram_magic != RANAL_MSG_MAGIC) { if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) { - CERROR("Unexpected magic %08x from "LPX64"\n", - msg->ram_magic, peer->rap_nid); + CERROR("Unexpected magic %08x from %s\n", + msg->ram_magic, libcfs_nid2str(peer->rap_nid)); + rc = -EPROTO; goto out; } @@ -1569,116 +1692,126 @@ kranal_check_fma_rx (kra_conn_t *conn) case RANAL_MSG_GET_REQ: kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc); break; - + default: break; } } if (msg->ram_version != RANAL_MSG_VERSION) { - CERROR("Unexpected protocol version %d from "LPX64"\n", - msg->ram_version, peer->rap_nid); + CERROR("Unexpected protocol version %d from %s\n", + msg->ram_version, libcfs_nid2str(peer->rap_nid)); + rc = -EPROTO; goto out; } if (msg->ram_srcnid != peer->rap_nid) { - CERROR("Unexpected peer "LPX64" from "LPX64"\n", - msg->ram_srcnid, peer->rap_nid); + CERROR("Unexpected peer %s from %s\n", + libcfs_nid2str(msg->ram_srcnid), + libcfs_nid2str(peer->rap_nid)); + rc = -EPROTO; goto out; } - + if (msg->ram_connstamp != conn->rac_peer_connstamp) { CERROR("Unexpected connstamp "LPX64"("LPX64 - " expected) from "LPX64"\n", + " expected) from %s\n", msg->ram_connstamp, conn->rac_peer_connstamp, - peer->rap_nid); + libcfs_nid2str(peer->rap_nid)); + rc = -EPROTO; goto out; } - + if (msg->ram_seq != seq) { - CERROR("Unexpected sequence number %d(%d expected) from " - LPX64"\n", msg->ram_seq, seq, peer->rap_nid); + CERROR("Unexpected sequence number %d(%d expected) from %s\n", + msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid)); + rc = -EPROTO; goto out; } if ((msg->ram_type & RANAL_MSG_FENCE) != 0) { /* This message signals RDMA completion... */ rrc = RapkFmaSyncWait(conn->rac_rihandle); - LASSERT (rrc == RAP_SUCCESS); + if (rrc != RAP_SUCCESS) { + CERROR("RapkFmaSyncWait failed: %d\n", rrc); + rc = -ENETDOWN; + goto out; + } } if (conn->rac_close_recvd) { - CERROR("Unexpected message %d after CLOSE from "LPX64"\n", - msg->ram_type, conn->rac_peer->rap_nid); + CERROR("Unexpected message %d after CLOSE from %s\n", + msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid)); + rc = -EPROTO; goto out; } if (msg->ram_type == RANAL_MSG_CLOSE) { + CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid)); conn->rac_close_recvd = 1; - write_lock_irqsave(&kranal_data.kra_global_lock, flags); + write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (conn->rac_state == RANAL_CONN_ESTABLISHED) kranal_close_conn_locked(conn, 0); - else if (conn->rac_close_sent) + else if (conn->rac_state == RANAL_CONN_CLOSING && + conn->rac_close_sent) kranal_terminate_conn_locked(conn); - write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); + write_unlock_irqrestore(&kranal_data.kra_global_lock, + flags); goto out; } if (conn->rac_state != RANAL_CONN_ESTABLISHED) goto out; - - conn->rac_rxmsg = msg; /* stash message for portals callbacks */ - /* they'll NULL rac_rxmsg if they consume it */ + switch (msg->ram_type) { case RANAL_MSG_NOOP: /* Nothing to do; just a keepalive */ + CDEBUG(D_NET, "RX NOOP on %p\n", conn); break; - + case RANAL_MSG_IMMEDIATE: - lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn); + CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn); + rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr, + msg->ram_srcnid, conn, 0); + repost = rc < 0; break; - - case RANAL_MSG_PUT_REQ: - lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn); - if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ - break; - - tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK); - if (tx == NULL) - break; - - tx->tx_msg.ram_u.completion.racm_cookie = - msg->ram_u.putreq.raprm_cookie; - kranal_post_fma(conn, tx); + case RANAL_MSG_PUT_REQ: + CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn); + rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr, + msg->ram_srcnid, conn, 1); + repost = rc < 0; break; case RANAL_MSG_PUT_NAK: + CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; - + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, -ENOENT); /* no match */ break; - + case RANAL_MSG_PUT_ACK: + CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, msg->ram_u.putack.rapam_src_cookie); if (tx == NULL) break; kranal_rdma(tx, RANAL_MSG_PUT_DONE, - &msg->ram_u.putack.rapam_desc, + &msg->ram_u.putack.rapam_desc, msg->ram_u.putack.rapam_desc.rard_nob, msg->ram_u.putack.rapam_dst_cookie); break; case RANAL_MSG_PUT_DONE: + CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK, msg->ram_u.completion.racm_cookie); if (tx == NULL) @@ -1690,44 +1823,47 @@ kranal_check_fma_rx (kra_conn_t *conn) break; case RANAL_MSG_GET_REQ: - lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn); - - if (conn->rac_rxmsg == NULL) /* lib_parse matched something */ - break; - - tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK); - if (tx == NULL) - break; - - tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie; - kranal_post_fma(conn, tx); + CDEBUG(D_NET, "RX GET_REQ on %p\n", conn); + rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr, + msg->ram_srcnid, conn, 1); + repost = rc < 0; break; - + case RANAL_MSG_GET_NAK: + CDEBUG(D_NET, "RX GET_NAK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; - + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, -ENOENT); /* no match */ break; - + case RANAL_MSG_GET_DONE: + CDEBUG(D_NET, "RX GET_DONE on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; - + LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); +#if 0 + /* completion message should send rdma length if we ever allow + * GET truncation */ + lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???); +#endif kranal_tx_done(tx, 0); break; } out: - if (conn->rac_rxmsg != NULL) + if (rc < 0) /* protocol/comms error */ + kranal_close_conn (conn, rc); + + if (repost && conn->rac_rxmsg != NULL) kranal_consume_rxmsg(conn, NULL, 0); /* check again later */ @@ -1735,79 +1871,113 @@ kranal_check_fma_rx (kra_conn_t *conn) } void -kranal_complete_closed_conn (kra_conn_t *conn) +kranal_complete_closed_conn (kra_conn_t *conn) { kra_tx_t *tx; + int nfma; + int nreplies; LASSERT (conn->rac_state == RANAL_CONN_CLOSED); + LASSERT (cfs_list_empty(&conn->rac_list)); + LASSERT (cfs_list_empty(&conn->rac_hashlist)); - while (!list_empty(&conn->rac_fmaq)) { - tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); - - list_del(&tx->tx_list); + for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) { + tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); + + cfs_list_del(&tx->tx_list); kranal_tx_done(tx, -ECONNABORTED); } - - LASSERT (list_empty(&conn->rac_rdmaq)); - while (!list_empty(&conn->rac_replyq)) { - tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); - - list_del(&tx->tx_list); + LASSERT (cfs_list_empty(&conn->rac_rdmaq)); + + for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) { + tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); + + cfs_list_del(&tx->tx_list); kranal_tx_done(tx, -ECONNABORTED); } + + CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n", + conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies); } -int -kranal_scheduler (void *arg) +int kranal_process_new_conn (kra_conn_t *conn) { - kra_device_t *dev = (kra_device_t *)arg; - wait_queue_t wait; - char name[16]; - kra_conn_t *conn; - unsigned long flags; - int busy_loops = 0; + RAP_RETURN rrc; - snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx); - kportal_daemonize(name); - kportal_blockallsigs(); + rrc = RapkCompleteSync(conn->rac_rihandle, 1); + if (rrc == RAP_SUCCESS) + return 0; - dev->rad_scheduler = current; - init_waitqueue_entry(&wait, current); + LASSERT (rrc == RAP_NOT_DONE); + if (!cfs_time_aftereq(jiffies, conn->rac_last_tx + + conn->rac_timeout * HZ)) + return -EAGAIN; - spin_lock_irqsave(&dev->rad_lock, flags); + /* Too late */ + rrc = RapkCompleteSync(conn->rac_rihandle, 0); + LASSERT (rrc == RAP_SUCCESS); + return -ETIMEDOUT; +} + +int +kranal_scheduler (void *arg) +{ + kra_device_t *dev = (kra_device_t *)arg; + wait_queue_t wait; + kra_conn_t *conn; + unsigned long flags; + unsigned long deadline; + unsigned long soonest; + int nsoonest; + long timeout; + cfs_list_t *tmp; + cfs_list_t *nxt; + int rc; + int dropped_lock; + int busy_loops = 0; + + cfs_block_allsigs(); + + dev->rad_scheduler = current; + init_waitqueue_entry_current(&wait); + + spin_lock_irqsave(&dev->rad_lock, flags); while (!kranal_data.kra_shutdown) { /* Safe: kra_shutdown only set when quiescent */ - + if (busy_loops++ >= RANAL_RESCHED) { - spin_unlock_irqrestore(&dev->rad_lock, flags); + spin_unlock_irqrestore(&dev->rad_lock, flags); - our_cond_resched(); - busy_loops = 0; + cond_resched(); + busy_loops = 0; - spin_lock_irqsave(&dev->rad_lock, flags); + spin_lock_irqsave(&dev->rad_lock, flags); } + dropped_lock = 0; + if (dev->rad_ready) { /* Device callback fired since I last checked it */ dev->rad_ready = 0; - spin_unlock_irqrestore(&dev->rad_lock, flags); + spin_unlock_irqrestore(&dev->rad_lock, flags); + dropped_lock = 1; kranal_check_rdma_cq(dev); kranal_check_fma_cq(dev); - spin_lock_irqsave(&dev->rad_lock, flags); + spin_lock_irqsave(&dev->rad_lock, flags); } - - if (!list_empty(&dev->rad_connq)) { - /* Connection needs attention */ - conn = list_entry(dev->rad_connq.next, - kra_conn_t, rac_schedlist); - list_del_init(&conn->rac_schedlist); + + cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) { + conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist); + + cfs_list_del_init(&conn->rac_schedlist); LASSERT (conn->rac_scheduled); conn->rac_scheduled = 0; - spin_unlock_irqrestore(&dev->rad_lock, flags); + spin_unlock_irqrestore(&dev->rad_lock, flags); + dropped_lock = 1; kranal_check_fma_rx(conn); kranal_process_fmaq(conn); @@ -1816,38 +1986,84 @@ kranal_scheduler (void *arg) kranal_complete_closed_conn(conn); kranal_conn_decref(conn); - - spin_lock_irqsave(&dev->rad_lock, flags); - continue; + spin_lock_irqsave(&dev->rad_lock, flags); } - add_wait_queue(&dev->rad_waitq, &wait); - set_current_state(TASK_INTERRUPTIBLE); + nsoonest = 0; + soonest = jiffies; + + cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) { + conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist); + + deadline = conn->rac_last_tx + conn->rac_keepalive; + if (cfs_time_aftereq(jiffies, deadline)) { + /* Time to process this new conn */ + spin_unlock_irqrestore(&dev->rad_lock, + flags); + dropped_lock = 1; + + rc = kranal_process_new_conn(conn); + if (rc != -EAGAIN) { + /* All done with this conn */ + spin_lock_irqsave(&dev->rad_lock, + flags); + cfs_list_del_init(&conn->rac_schedlist); + spin_unlock_irqrestore(&dev-> \ + rad_lock, + flags); + + kranal_conn_decref(conn); + spin_lock_irqsave(&dev->rad_lock, + flags); + continue; + } - spin_unlock_irqrestore(&dev->rad_lock, flags); + /* retry with exponential backoff until HZ */ + if (conn->rac_keepalive == 0) + conn->rac_keepalive = 1; + else if (conn->rac_keepalive <= HZ) + conn->rac_keepalive *= 2; + else + conn->rac_keepalive += HZ; - busy_loops = 0; - schedule(); + deadline = conn->rac_last_tx + conn->rac_keepalive; + spin_lock_irqsave(&dev->rad_lock, flags); + } - set_current_state(TASK_RUNNING); - remove_wait_queue(&dev->rad_waitq, &wait); + /* Does this conn need attention soonest? */ + if (nsoonest++ == 0 || + !cfs_time_aftereq(deadline, soonest)) + soonest = deadline; + } - spin_lock_irqsave(&dev->rad_lock, flags); - } + if (dropped_lock) /* may sleep iff I didn't drop the lock */ + continue; - spin_unlock_irqrestore(&dev->rad_lock, flags); + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue_exclusive(&dev->rad_waitq, &wait); + spin_unlock_irqrestore(&dev->rad_lock, flags); + + if (nsoonest == 0) { + busy_loops = 0; + waitq_wait(&wait, TASK_INTERRUPTIBLE); + } else { + timeout = (long)(soonest - jiffies); + if (timeout > 0) { + busy_loops = 0; + waitq_timedwait(&wait, + TASK_INTERRUPTIBLE, + timeout); + } + } + + remove_wait_queue(&dev->rad_waitq, &wait); + set_current_state(TASK_RUNNING); + spin_lock_irqsave(&dev->rad_lock, flags); + } + + spin_unlock_irqrestore(&dev->rad_lock, flags); dev->rad_scheduler = NULL; kranal_thread_fini(); return 0; } - - -lib_nal_t kranal_lib = { - libnal_data: &kranal_data, /* NAL private data */ - libnal_send: kranal_send, - libnal_send_pages: kranal_send_pages, - libnal_recv: kranal_recv, - libnal_recv_pages: kranal_recv_pages, - libnal_dist: kranal_dist -};