X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Fklnds%2Fgmlnd%2Fgmlnd_cb.c;h=503bedff16430e7b81a86d8fd71a1f539b669d5f;hb=b48ab0632ba0c88326c8d9466760bf56301b3676;hp=3d4c86dac71a98bdd04f38c719b70d4895ea73d3;hpb=96ec6856f91f7f9031cfce4273c714d72cfe59ae;p=fs%2Flustre-release.git diff --git a/lnet/klnds/gmlnd/gmlnd_cb.c b/lnet/klnds/gmlnd/gmlnd_cb.c index 3d4c86d..503bedf 100644 --- a/lnet/klnds/gmlnd/gmlnd_cb.c +++ b/lnet/klnds/gmlnd/gmlnd_cb.c @@ -1,517 +1,161 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Based on ksocknal and qswnal + * Copyright (c) 2003 Los Alamos National Laboratory (LANL) * - * Copyright (C) 2002 Cluster File Systems, Inc. - * Author: Robert Read + * This file is part of Lustre, http://www.lustre.org/ * - * This file is part of Portals, http://www.sf.net/projects/sandiaportals/ - * - * Portals is free software; you can redistribute it and/or + * Lustre is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public * License as published by the Free Software Foundation. * - * Portals is distributed in the hope that it will be useful, + * Lustre is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with Portals; if not, write to the Free Software + * along with Lustre; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -/* TODO - * preallocate send buffers, store on list - * put receive buffers on queue, handle with receive threads - * use routing - */ - -#include "gmnal.h" - -extern kgmnal_rx_t *kgm_add_recv(kgmnal_data_t *,int); - -static kgmnal_tx_t * -get_trans(void) -{ - kgmnal_tx_t *t; - PORTAL_ALLOC(t, (sizeof(kgmnal_tx_t))); - return t; -} - -static void -put_trans(kgmnal_tx_t *t) -{ - PORTAL_FREE(t, sizeof(kgmnal_tx_t)); -} - -int -kgmnal_ispeer (ptl_nid_t nid) -{ - unsigned int gmnid = (unsigned int)nid; - unsigned int nnids; - - gm_max_node_id_in_use(kgmnal_data.kgm_port, &nnids); - - return ((ptl_nid_t)gmnid == nid &&/* didn't lose high bits on conversion ? */ - gmnid < nnids); /* it's in this machine */ -} /* - * LIB functions follow - * + * This file implements the nal cb functions */ -static int -kgmnal_read (nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr, - size_t len) -{ - CDEBUG(D_NET, "0x%Lx: reading %ld bytes from %p -> %p\n", - nal->ni.nid, (long)len, src_addr, dst_addr ); - memcpy( dst_addr, src_addr, len ); - return 0; -} - -static int -kgmnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr, - size_t len) -{ - CDEBUG(D_NET, "0x%Lx: writing %ld bytes from %p -> %p\n", - nal->ni.nid, (long)len, src_addr, dst_addr ); - memcpy( dst_addr, src_addr, len ); - return 0; -} - -static void * -kgmnal_malloc(nal_cb_t *nal, size_t len) -{ - void *buf; - - PORTAL_ALLOC(buf, len); - return buf; -} -static void -kgmnal_free(nal_cb_t *nal, void *buf, size_t len) -{ - PORTAL_FREE(buf, len); -} -static void -kgmnal_printf(nal_cb_t *nal, const char *fmt, ...) -{ - va_list ap; - char msg[256]; +#include "gmlnd.h" - if (portal_debug & D_NET) { - va_start( ap, fmt ); - vsnprintf( msg, sizeof(msg), fmt, ap ); - va_end( ap ); - - printk("CPUId: %d %s",smp_processor_id(), msg); - } -} - - -static void -kgmnal_cli(nal_cb_t *nal, unsigned long *flags) -{ - kgmnal_data_t *data= nal->nal_data; - - spin_lock_irqsave(&data->kgm_dispatch_lock,*flags); -} - - -static void -kgmnal_sti(nal_cb_t *nal, unsigned long *flags) -{ - kgmnal_data_t *data= nal->nal_data; - - spin_unlock_irqrestore(&data->kgm_dispatch_lock,*flags); -} - - -static int -kgmnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist) -{ - /* network distance doesn't mean much for this nal */ - if ( nal->ni.nid == nid ) { - *dist = 0; - } else { - *dist = 1; +int +gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, + int delayed, unsigned int niov, + struct iovec *iov, lnet_kiov_t *kiov, + unsigned int offset, unsigned int mlen, unsigned int rlen) +{ + gmnal_ni_t *gmni = ni->ni_data; + gmnal_rx_t *rx = (gmnal_rx_t*)private; + gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf); + int npages = rx->rx_islarge ? gmni->gmni_large_pages : 1; + int payload_offset = offsetof(gmnal_msg_t, + gmm_u.immediate.gmim_payload[0]); + int nob = payload_offset + mlen; + + LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE); + LASSERT (iov == NULL || kiov == NULL); + + if (rx->rx_recv_nob < nob) { + CERROR("Short message from nid %s: got %d, need %d\n", + libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob); + gmnal_post_rx(gmni, rx); + return -EIO; } - return 0; -} - -/* FIXME rmr: add rounting code here */ -static void -kgmnal_tx_done(kgmnal_tx_t *trans, int error) -{ - lib_finalize(trans->ktx_nal, trans->ktx_private, trans->ktx_cookie); - - gm_dma_free(kgmnal_data.kgm_port, trans->ktx_buffer); - - trans->ktx_buffer = NULL; - trans->ktx_len = 0; - - put_trans(trans); -} -static char * gm_error_strings[GM_NUM_STATUS_CODES] = { - [GM_SUCCESS] = "GM_SUCCESS", - [GM_SEND_TIMED_OUT] = "GM_SEND_TIMED_OUT", - [GM_SEND_REJECTED] = "GM_SEND_REJECTED", - [GM_SEND_TARGET_PORT_CLOSED] = "GM_SEND_TARGET_PORT_CLOSED", - [GM_SEND_TARGET_NODE_UNREACHABLE] = "GM_SEND_TARGET_NODE_UNREACHABLE", - [GM_SEND_DROPPED] = "GM_SEND_DROPPED", - [GM_SEND_PORT_CLOSED] = "GM_SEND_PORT_CLOSED", -}; - -inline char * get_error(int status) -{ - if (gm_error_strings[status] != NULL) - return gm_error_strings[status]; + if (kiov != NULL) + lnet_copy_kiov2kiov(niov, kiov, offset, + npages, rx->rx_buf.nb_kiov, payload_offset, + mlen); else - return "Unknown error"; -} - -static void -kgmnal_errhandler(struct gm_port *p, void *context, gm_status_t status) -{ - CDEBUG(D_NET,"error callback: ktx %p status %d\n", context, status); -} + lnet_copy_kiov2iov(niov, iov, offset, + npages, rx->rx_buf.nb_kiov, payload_offset, + mlen); -static void -kgmnal_txhandler(struct gm_port *p, void *context, gm_status_t status) -{ - kgmnal_tx_t *ktx = (kgmnal_tx_t *)context; - int err = 0; - - LASSERT (p != NULL); - LASSERT (ktx != NULL); - - CDEBUG(D_NET,"ktx %p status %d nid 0x%x pid %d\n", ktx, status, - ktx->ktx_tgt_node, ktx->ktx_tgt_port_id); - - switch((int)status) { - case GM_SUCCESS: /* normal */ - break; - case GM_SEND_TIMED_OUT: /* application error */ - case GM_SEND_REJECTED: /* size of msg unacceptable */ - case GM_SEND_TARGET_PORT_CLOSED: - CERROR("%s (%d):\n", get_error(status), status); - gm_resume_sending(kgmnal_data.kgm_port, ktx->ktx_priority, - ktx->ktx_tgt_node, ktx->ktx_tgt_port_id, - kgmnal_errhandler, NULL); - err = -EIO; - break; - case GM_SEND_TARGET_NODE_UNREACHABLE: - case GM_SEND_PORT_CLOSED: - CERROR("%s (%d):\n", get_error(status), status); - gm_drop_sends(kgmnal_data.kgm_port, ktx->ktx_priority, - ktx->ktx_tgt_node, ktx->ktx_tgt_port_id, - kgmnal_errhandler, NULL); - err = -EIO; - break; - case GM_SEND_DROPPED: - CERROR("%s (%d):\n", get_error(status), status); - err = -EIO; - break; - default: - CERROR("Unknown status: %d\n", status); - err = -EIO; - break; - } - - kgmnal_tx_done(ktx, err); + lnet_finalize(ni, lntmsg, 0); + gmnal_post_rx(gmni, rx); + return 0; } -/* - */ - -static int -kgmnal_send(nal_cb_t *nal, - void *private, - lib_msg_t *cookie, - ptl_hdr_t *hdr, - int type, - ptl_nid_t nid, - ptl_pid_t pid, - int options, - unsigned int niov, - lib_md_iov_t *iov, - size_t len) -{ - /* - * ipnal assumes that this is the private as passed to lib_dispatch.. - * so do we :/ - */ - kgmnal_tx_t *ktx=NULL; - int rc=0; - void * buf; - int buf_len = sizeof(ptl_hdr_t) + len; - int buf_size = 0; - - LASSERT ((options & PTL_MD_KIOV) == 0); - - PROF_START(gmnal_send); - - - CDEBUG(D_NET, "sending %d bytes from %p to nid: 0x%Lx pid %d\n", - len, iov, nid, KGM_PORT_NUM); - - /* ensure there is an available tx handle */ - - /* save transaction info to trans for later finalize and cleanup */ - ktx = get_trans(); - if (ktx == NULL) { - rc = -ENOMEM; - goto send_exit; +int +gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) +{ + lnet_hdr_t *hdr= &lntmsg->msg_hdr; + int type = lntmsg->msg_type; + lnet_process_id_t target = lntmsg->msg_target; + unsigned int niov = lntmsg->msg_niov; + struct iovec *iov = lntmsg->msg_iov; + lnet_kiov_t *kiov = lntmsg->msg_kiov; + unsigned int offset = lntmsg->msg_offset; + unsigned int len = lntmsg->msg_len; + gmnal_ni_t *gmni = ni->ni_data; + gm_status_t gmrc; + gmnal_tx_t *tx; + + LASSERT (iov == NULL || kiov == NULL); + + /* I may not block for a tx if I'm responding to an incoming message */ + tx = gmnal_get_tx(gmni); + if (tx == NULL) { + if (!gmni->gmni_shutdown) + CERROR ("Can't get tx for msg type %d for %s\n", + type, libcfs_nid2str(target.nid)); + return -EIO; } - /* hmmm... GM doesn't support vectored write, so need to allocate buffer to coalesce - header and data. - Also, memory must be dma'able or registered with GM. */ + tx->tx_nid = target.nid; - if (buf_len <= MSG_LEN_SMALL) { - buf_size = MSG_SIZE_SMALL; - } else if (buf_len <= MSG_LEN_LARGE) { - buf_size = MSG_SIZE_LARGE; - } else { - printk("kgmnal:request exceeds TX MTU size (%d).\n", - MSG_SIZE_LARGE); - rc = -1; - goto send_exit; + gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid), + &tx->tx_gmlid); + if (gmrc != GM_SUCCESS) { + CERROR("Can't map Nid %s to a GM local ID: %d\n", + libcfs_nid2str(target.nid), gmrc); + /* NB tx_lntmsg not set => doesn't finalize */ + gmnal_tx_done(tx, -EIO); + return -EIO; } - buf = gm_dma_malloc(kgmnal_data.kgm_port, buf_len); - if (buf == NULL) { - rc = -ENOMEM; - goto send_exit; - } - memcpy(buf, hdr, sizeof(ptl_hdr_t)); - - if (len != 0) - lib_copy_iov2buf(((char *)buf) + sizeof (ptl_hdr_t), - options, niov, iov, len); - - ktx->ktx_nal = nal; - ktx->ktx_private = private; - ktx->ktx_cookie = cookie; - ktx->ktx_len = buf_len; - ktx->ktx_size = buf_size; - ktx->ktx_buffer = buf; - ktx->ktx_priority = GM_LOW_PRIORITY; - ktx->ktx_tgt_node = nid; - ktx->ktx_tgt_port_id = KGM_PORT_NUM; - - CDEBUG(D_NET, "gm_send %d bytes (size %d) from %p to nid: 0x%Lx " - "pid %d pri %d\n", buf_len, buf_size, iov, nid, KGM_PORT_NUM, - GM_LOW_PRIORITY); - - gm_send_with_callback(kgmnal_data.kgm_port, buf, buf_size, - buf_len, GM_LOW_PRIORITY, - nid, KGM_PORT_NUM, - kgmnal_txhandler, ktx); - - PROF_FINISH(gmnal_send); - send_exit: - return rc; -} -void -kgmnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) -{ - CERROR ("forwarding not implemented\n"); -} - -void -kqswnal_fwd_callback (void *arg, int error) -{ - CERROR ("forwarding not implemented\n"); -} - - -static inline void -kgmnal_requeue_rx(kgmnal_rx_t *krx) -{ - gm_provide_receive_buffer(kgmnal_data.kgm_port, krx->krx_buffer, - krx->krx_size, krx->krx_priority); -} - -/* Process a received portals packet */ - -/* Receive Interrupt Handler */ -static void kgmnal_rx(kgmnal_data_t *kgm, unsigned long len, unsigned int size, - void * buf, unsigned int pri) -{ - ptl_hdr_t *hdr = buf; - kgmnal_rx_t krx; - - CDEBUG(D_NET,"buf %p, len %ld\n", buf, len); - - if ( len < sizeof( ptl_hdr_t ) ) { - /* XXX what's this for? */ - if (kgm->kgm_shuttingdown) - return; - CERROR("kgmnal: did not receive complete portal header, " - "len= %ld", len); - gm_provide_receive_buffer(kgm->kgm_port, buf, size, pri); - return; + gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf), + target.nid, GMNAL_MSG_IMMEDIATE); + GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr; + tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]); + + if (the_lnet.ln_testprotocompat != 0) { + /* single-shot proto test */ + LNET_LOCK(); + if ((the_lnet.ln_testprotocompat & 1) != 0) { + GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++; + the_lnet.ln_testprotocompat &= ~1; + } + if ((the_lnet.ln_testprotocompat & 2) != 0) { + GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic = + LNET_PROTO_MAGIC; + the_lnet.ln_testprotocompat &= ~2; + } + LNET_UNLOCK(); } - /* might want to use seperate threads to handle receive */ - krx.krx_buffer = buf; - krx.krx_len = len; - krx.krx_size = size; - krx.krx_priority = pri; - - if ( hdr->dest_nid == kgmnal_lib.ni.nid ) { - PROF_START(lib_parse); - lib_parse(&kgmnal_lib, (ptl_hdr_t *)krx.krx_buffer, &krx); - PROF_FINISH(lib_parse); - } else if (kgmnal_ispeer(hdr->dest_nid)) { - /* should have gone direct to peer */ - CERROR("dropping packet from 0x%llx to 0x%llx: target is " - "a peer", hdr->src_nid, hdr->dest_nid); - kgmnal_requeue_rx(&krx); + if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) { + /* whole message fits in tx_buf */ + char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]); + + if (iov != NULL) + lnet_copy_iov2flat(len, buffer, 0, + niov, iov, offset, len); + else + lnet_copy_kiov2flat(len, buffer, 0, + niov, kiov, offset, len); + + tx->tx_msgnob += len; + tx->tx_large_nob = 0; } else { - /* forward to gateway */ - CERROR("forwarding not implemented yet"); - kgmnal_requeue_rx(&krx); + /* stash payload pts to copy later */ + tx->tx_large_nob = len; + tx->tx_large_iskiov = (kiov != NULL); + tx->tx_large_niov = niov; + if (tx->tx_large_iskiov) + tx->tx_large_frags.kiov = kiov; + else + tx->tx_large_frags.iov = iov; } - return; -} - - -static int kgmnal_recv(nal_cb_t *nal, - void *private, - lib_msg_t *cookie, - int options, - unsigned int niov, - lib_md_iov_t *iov, - size_t mlen, - size_t rlen) -{ - kgmnal_rx_t *krx = private; - - LASSERT ((options & PTL_MD_KIOV) == 0); - - CDEBUG(D_NET,"mlen=%d, rlen=%d\n", mlen, rlen); - - /* What was actually received must be >= what sender claims to - * have sent. This is an LASSERT, since lib-move doesn't - * check cb return code yet. */ - LASSERT (krx->krx_len >= sizeof (ptl_hdr_t) + rlen); - LASSERT (mlen <= rlen); - - PROF_START(gmnal_recv); - - if(mlen != 0) { - PROF_START(memcpy); - lib_copy_buf2iov (options, niov, iov, - krx->krx_buffer + sizeof (ptl_hdr_t), mlen); - PROF_FINISH(memcpy); - } - - PROF_START(lib_finalize); - lib_finalize(nal, private, cookie); - PROF_FINISH(lib_finalize); - - kgmnal_requeue_rx(krx); - - PROF_FINISH(gmnal_recv); - - return rlen; -} - - -static void kgmnal_shutdown(void * none) -{ - CERROR("called\n"); - return; -} - -/* - * Set terminate and use alarm to wake up the recv thread. - */ -static void recv_shutdown(kgmnal_data_t *kgm) -{ - gm_alarm_t alarm; - - kgm->kgm_shuttingdown = 1; - gm_initialize_alarm(&alarm); - gm_set_alarm(kgm->kgm_port, &alarm, 1, kgmnal_shutdown, NULL); -} - -int kgmnal_end(kgmnal_data_t *kgm) -{ - - /* wait for sends to finish ? */ - /* remove receive buffers */ - /* shutdown receive thread */ - - recv_shutdown(kgm); - - return 0; -} - -/* Used only for the spinner */ -int kgmnal_recv_thread(void *arg) -{ - kgmnal_data_t *kgm = arg; - - LASSERT(kgm != NULL); - - kportal_daemonize("kgmnal_rx"); + LASSERT(tx->tx_lntmsg == NULL); + tx->tx_lntmsg = lntmsg; - while(1) { - gm_recv_event_t *e; - int priority = GM_LOW_PRIORITY; - if (kgm->kgm_shuttingdown) - break; + spin_lock(&gmni->gmni_tx_lock); - e = gm_blocking_receive_no_spin(kgm->kgm_port); - if (e == NULL) { - CERROR("gm_blocking_receive returned NULL\n"); - break; - } + list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq); + gmnal_check_txqueues_locked(gmni); - switch(gm_ntohc(e->recv.type)) { - case GM_HIGH_RECV_EVENT: - priority = GM_HIGH_PRIORITY; - /* fall through */ - case GM_RECV_EVENT: - kgmnal_rx(kgm, gm_ntohl(e->recv.length), - gm_ntohc(e->recv.size), - gm_ntohp(e->recv.buffer), priority); - break; - case GM_ALARM_EVENT: - CERROR("received alarm"); - gm_unknown(kgm->kgm_port, e); - break; - case GM_BAD_SEND_DETECTED_EVENT: /* ?? */ - CERROR("received bad send!\n"); - break; - default: - gm_unknown(kgm->kgm_port, e); - } - } + spin_unlock(&gmni->gmni_tx_lock); - CERROR("shuttting down.\n"); return 0; } - -nal_cb_t kgmnal_lib = { - nal_data: &kgmnal_data, /* NAL private data */ - cb_send: kgmnal_send, - cb_recv: kgmnal_recv, - cb_read: kgmnal_read, - cb_write: kgmnal_write, - cb_malloc: kgmnal_malloc, - cb_free: kgmnal_free, - cb_printf: kgmnal_printf, - cb_cli: kgmnal_cli, - cb_sti: kgmnal_sti, - cb_dist: kgmnal_dist -};